2018-10-29 01:49:45 +03:00
|
|
|
package server
|
2016-03-05 02:08:16 +03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
2020-09-23 02:19:49 +03:00
|
|
|
"math"
|
2016-03-05 02:08:16 +03:00
|
|
|
"net"
|
|
|
|
"os"
|
2019-01-09 10:23:53 +03:00
|
|
|
"sort"
|
2016-03-05 02:08:16 +03:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2016-09-11 17:49:48 +03:00
|
|
|
"github.com/tidwall/buntdb"
|
2019-01-09 10:23:53 +03:00
|
|
|
"github.com/tidwall/gjson"
|
2017-10-05 02:15:20 +03:00
|
|
|
"github.com/tidwall/redcon"
|
2016-03-28 18:57:41 +03:00
|
|
|
"github.com/tidwall/resp"
|
2018-10-11 00:25:40 +03:00
|
|
|
"github.com/tidwall/tile38/internal/log"
|
2016-03-05 02:08:16 +03:00
|
|
|
)
|
|
|
|
|
2016-03-19 17:16:19 +03:00
|
|
|
type errAOFHook struct {
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (err errAOFHook) Error() string {
|
|
|
|
return fmt.Sprintf("hook: %v", err.err)
|
|
|
|
}
|
|
|
|
|
2020-10-07 19:52:32 +03:00
|
|
|
func (s *Server) loadAOF() (err error) {
|
2019-10-30 20:17:59 +03:00
|
|
|
fi, err := s.aof.Stat()
|
2016-04-01 02:26:36 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
start := time.Now()
|
|
|
|
var count int
|
|
|
|
defer func() {
|
2021-03-31 18:13:44 +03:00
|
|
|
d := time.Since(start)
|
2016-03-05 02:08:16 +03:00
|
|
|
ps := float64(count) / (float64(d) / float64(time.Second))
|
2016-04-01 02:26:36 +03:00
|
|
|
suf := []string{"bytes/s", "KB/s", "MB/s", "GB/s", "TB/s"}
|
|
|
|
bps := float64(fi.Size()) / (float64(d) / float64(time.Second))
|
2022-09-25 13:54:22 +03:00
|
|
|
for i := 0; bps > 1024 && len(suf) > 1; i++ {
|
2016-04-01 02:26:36 +03:00
|
|
|
bps /= 1024
|
|
|
|
suf = suf[1:]
|
|
|
|
}
|
|
|
|
byteSpeed := fmt.Sprintf("%.0f %s", bps, suf[0])
|
|
|
|
log.Infof("AOF loaded %d commands: %.2fs, %.0f/s, %s",
|
|
|
|
count, float64(d)/float64(time.Second), ps, byteSpeed)
|
2016-03-05 02:08:16 +03:00
|
|
|
}()
|
2017-10-05 02:15:20 +03:00
|
|
|
var buf []byte
|
|
|
|
var args [][]byte
|
|
|
|
var packet [0xFFFF]byte
|
2016-03-05 02:08:16 +03:00
|
|
|
for {
|
2019-10-30 20:17:59 +03:00
|
|
|
n, err := s.aof.Read(packet[:])
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
2022-04-13 01:43:44 +03:00
|
|
|
if err != io.EOF {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(buf) > 0 {
|
|
|
|
// There was an incomplete command or other data at the end of
|
|
|
|
// the AOF file. Attempt to recover the file by truncating the
|
|
|
|
// file at the end position of the last complete command.
|
|
|
|
log.Warnf("Truncating %d bytes due to an incomplete command\n",
|
|
|
|
len(buf))
|
|
|
|
s.aofsz -= len(buf)
|
|
|
|
if err := s.aof.Truncate(int64(s.aofsz)); err != nil {
|
|
|
|
return err
|
2017-10-05 02:15:20 +03:00
|
|
|
}
|
2022-04-13 01:43:44 +03:00
|
|
|
if _, err := s.aof.Seek(int64(s.aofsz), 0); err != nil {
|
|
|
|
return err
|
2020-10-07 19:52:32 +03:00
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-04-13 01:43:44 +03:00
|
|
|
return nil
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofsz += n
|
2017-10-05 02:15:20 +03:00
|
|
|
data := packet[:n]
|
|
|
|
if len(buf) > 0 {
|
|
|
|
data = append(buf, data...)
|
2016-11-09 23:43:26 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
var complete bool
|
|
|
|
for {
|
2022-04-13 01:43:44 +03:00
|
|
|
if len(data) > 0 && data[0] == 0 {
|
|
|
|
// Zeros found in AOF file (issue #230).
|
|
|
|
// Just ignore it and move the next byte.
|
|
|
|
data = data[1:]
|
|
|
|
continue
|
2020-10-07 19:52:32 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
complete, args, _, data, err = redcon.ReadNextCommand(data, args[:0])
|
2016-11-09 23:43:26 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
if !complete {
|
|
|
|
break
|
2016-11-09 23:43:26 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
if len(args) > 0 {
|
2018-11-15 19:15:39 +03:00
|
|
|
var msg Message
|
2018-10-29 01:49:45 +03:00
|
|
|
msg.Args = msg.Args[:0]
|
2017-10-05 02:15:20 +03:00
|
|
|
for _, arg := range args {
|
2018-10-29 01:49:45 +03:00
|
|
|
msg.Args = append(msg.Args, string(arg))
|
2017-10-05 02:15:20 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
if _, _, err := s.command(&msg, nil); err != nil {
|
2017-10-05 02:15:20 +03:00
|
|
|
if commandErrIsFatal(err) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
count++
|
2016-11-09 23:43:26 +03:00
|
|
|
}
|
2016-03-19 17:16:19 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
if len(data) > 0 {
|
|
|
|
buf = append(buf[:0], data...)
|
|
|
|
} else if len(buf) > 0 {
|
|
|
|
buf = buf[:0]
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
|
2016-03-30 19:32:38 +03:00
|
|
|
func commandErrIsFatal(err error) bool {
|
|
|
|
// FSET (and other writable commands) may return errors that we need
|
|
|
|
// to ignore during the loading process. These errors may occur (though unlikely)
|
|
|
|
// due to the aof rewrite operation.
|
2022-09-25 13:54:22 +03:00
|
|
|
return !(err == errKeyNotFound || err == errIDNotFound)
|
2016-03-30 19:32:38 +03:00
|
|
|
}
|
|
|
|
|
2019-09-04 03:01:26 +03:00
|
|
|
// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the
|
|
|
|
// fsync the file.
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) flushAOF(sync bool) {
|
|
|
|
if len(s.aofbuf) > 0 {
|
|
|
|
_, err := s.aof.Write(s.aofbuf)
|
2018-10-29 01:49:45 +03:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2019-03-10 20:48:14 +03:00
|
|
|
if sync {
|
2019-10-30 20:17:59 +03:00
|
|
|
if err := s.aof.Sync(); err != nil {
|
2019-03-10 20:48:14 +03:00
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
if cap(s.aofbuf) > 1024*1024*32 {
|
|
|
|
s.aofbuf = make([]byte, 0, 1024*1024*32)
|
2019-09-04 03:01:26 +03:00
|
|
|
} else {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofbuf = s.aofbuf[:0]
|
2019-09-04 03:01:26 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) writeAOF(args []string, d *commandDetails) error {
|
2018-08-14 03:05:30 +03:00
|
|
|
if d != nil && !d.updated {
|
|
|
|
// just ignore writes if the command did not update
|
|
|
|
return nil
|
2016-03-19 17:16:19 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.shrinking {
|
2018-10-29 01:49:45 +03:00
|
|
|
nargs := make([]string, len(args))
|
|
|
|
copy(nargs, args)
|
2019-10-30 20:17:59 +03:00
|
|
|
s.shrinklog = append(s.shrinklog, nargs)
|
2016-03-28 18:57:41 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.aof != nil {
|
2022-11-03 20:07:17 +03:00
|
|
|
s.aofdirty.Store(true) // prewrite optimization flag
|
2019-10-30 20:17:59 +03:00
|
|
|
n := len(s.aofbuf)
|
|
|
|
s.aofbuf = redcon.AppendArray(s.aofbuf, len(args))
|
2018-10-29 01:49:45 +03:00
|
|
|
for _, arg := range args {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofbuf = redcon.AppendBulkString(s.aofbuf, arg)
|
2018-04-11 20:53:36 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofsz += len(s.aofbuf) - n
|
2016-03-20 04:31:59 +03:00
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
|
|
|
|
// notify aof live connections that we have new data
|
2019-10-30 20:17:59 +03:00
|
|
|
s.fcond.L.Lock()
|
|
|
|
s.fcond.Broadcast()
|
|
|
|
s.fcond.L.Unlock()
|
2016-03-05 02:08:16 +03:00
|
|
|
|
2018-08-14 03:05:30 +03:00
|
|
|
// process geofences
|
2016-03-05 02:08:16 +03:00
|
|
|
if d != nil {
|
2018-08-14 03:05:30 +03:00
|
|
|
// webhook geofences
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.config.followHost() == "" {
|
2018-08-14 03:05:30 +03:00
|
|
|
// for leader only
|
|
|
|
if d.parent {
|
|
|
|
// queue children
|
|
|
|
for _, d := range d.children {
|
2019-10-30 20:17:59 +03:00
|
|
|
if err := s.queueHooks(d); err != nil {
|
2018-08-14 03:05:30 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// queue parent
|
2019-10-30 20:17:59 +03:00
|
|
|
if err := s.queueHooks(d); err != nil {
|
2018-08-14 03:05:30 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-11-23 11:39:04 +03:00
|
|
|
|
2018-08-14 03:05:30 +03:00
|
|
|
// live geofences
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lcond.L.Lock()
|
|
|
|
if len(s.lives) > 0 {
|
2018-11-24 01:38:49 +03:00
|
|
|
if d.parent {
|
|
|
|
// queue children
|
2021-03-31 18:13:44 +03:00
|
|
|
s.lstack = append(s.lstack, d.children...)
|
2018-11-24 01:38:49 +03:00
|
|
|
} else {
|
|
|
|
// queue parent
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lstack = append(s.lstack, d)
|
2016-12-29 17:53:01 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lcond.Broadcast()
|
2016-12-29 17:53:01 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lcond.L.Unlock()
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-02 17:20:30 +03:00
|
|
|
return nil
|
|
|
|
}
|
2016-03-19 17:16:19 +03:00
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) getQueueCandidates(d *commandDetails) []*Hook {
|
2020-10-23 19:51:27 +03:00
|
|
|
candidates := make(map[*Hook]bool)
|
2018-11-24 04:15:14 +03:00
|
|
|
// add the hooks with "outside" detection
|
2021-09-13 20:02:36 +03:00
|
|
|
s.hooksOut.Ascend(nil, func(v interface{}) bool {
|
|
|
|
hook := v.(*Hook)
|
2020-10-23 19:51:27 +03:00
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
2018-11-24 04:15:14 +03:00
|
|
|
}
|
2021-09-13 20:02:36 +03:00
|
|
|
return true
|
|
|
|
})
|
2020-10-23 19:51:27 +03:00
|
|
|
// look for candidates that might "cross" geofences
|
2022-09-21 00:20:53 +03:00
|
|
|
if d.old != nil && d.obj != nil && s.hookCross.Len() > 0 {
|
|
|
|
r1, r2 := d.old.Rect(), d.obj.Rect()
|
2020-10-23 19:51:27 +03:00
|
|
|
s.hookCross.Search(
|
|
|
|
[2]float64{
|
|
|
|
math.Min(r1.Min.X, r2.Min.X),
|
|
|
|
math.Min(r1.Min.Y, r2.Min.Y),
|
|
|
|
},
|
|
|
|
[2]float64{
|
|
|
|
math.Max(r1.Max.X, r2.Max.X),
|
|
|
|
math.Max(r1.Max.Y, r2.Max.Y),
|
|
|
|
},
|
|
|
|
func(min, max [2]float64, value interface{}) bool {
|
|
|
|
hook := value.(*Hook)
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
})
|
2018-11-24 04:15:14 +03:00
|
|
|
}
|
2020-10-23 19:51:27 +03:00
|
|
|
// look for candidates that overlap the old object
|
2022-09-21 00:20:53 +03:00
|
|
|
if d.old != nil {
|
|
|
|
r1 := d.old.Rect()
|
2020-10-23 19:51:27 +03:00
|
|
|
s.hookTree.Search(
|
|
|
|
[2]float64{r1.Min.X, r1.Min.Y},
|
|
|
|
[2]float64{r1.Max.X, r1.Max.Y},
|
|
|
|
func(min, max [2]float64, value interface{}) bool {
|
|
|
|
hook := value.(*Hook)
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
|
|
|
}
|
2020-09-23 02:19:49 +03:00
|
|
|
return true
|
2020-10-23 19:51:27 +03:00
|
|
|
})
|
|
|
|
}
|
|
|
|
// look for candidates that overlap the new object
|
|
|
|
if d.obj != nil {
|
|
|
|
r1 := d.obj.Rect()
|
|
|
|
s.hookTree.Search(
|
|
|
|
[2]float64{r1.Min.X, r1.Min.Y},
|
|
|
|
[2]float64{r1.Max.X, r1.Max.Y},
|
|
|
|
func(min, max [2]float64, value interface{}) bool {
|
|
|
|
hook := value.(*Hook)
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
2020-09-23 02:19:49 +03:00
|
|
|
}
|
2020-10-23 19:51:27 +03:00
|
|
|
return true
|
|
|
|
})
|
|
|
|
}
|
|
|
|
if len(candidates) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// return the candidates as a slice
|
|
|
|
ret := make([]*Hook, 0, len(candidates))
|
|
|
|
for hook := range candidates {
|
|
|
|
ret = append(ret, hook)
|
|
|
|
}
|
|
|
|
return ret
|
2018-11-24 04:15:14 +03:00
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) queueHooks(d *commandDetails) error {
|
2019-01-09 10:23:53 +03:00
|
|
|
// Create the slices that will store all messages and hooks
|
|
|
|
var cmsgs, wmsgs []string
|
|
|
|
var whooks []*Hook
|
2018-11-24 04:15:14 +03:00
|
|
|
|
2019-01-09 10:23:53 +03:00
|
|
|
// Compile a slice of potential hook recipients
|
2019-10-30 20:17:59 +03:00
|
|
|
candidates := s.getQueueCandidates(d)
|
2018-11-24 04:15:14 +03:00
|
|
|
for _, hook := range candidates {
|
2019-01-09 10:23:53 +03:00
|
|
|
// Calculate all matching fence messages for all candidates and append
|
|
|
|
// them to the appropriate message slice
|
2018-11-24 04:15:14 +03:00
|
|
|
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, hook.Metas, d)
|
|
|
|
if len(msgs) > 0 {
|
|
|
|
if hook.channel {
|
2019-01-09 10:23:53 +03:00
|
|
|
cmsgs = append(cmsgs, msgs...)
|
2018-11-24 04:15:14 +03:00
|
|
|
} else {
|
2019-01-09 10:23:53 +03:00
|
|
|
wmsgs = append(wmsgs, msgs...)
|
|
|
|
whooks = append(whooks, hook)
|
2016-09-11 17:49:48 +03:00
|
|
|
}
|
2016-04-02 17:20:30 +03:00
|
|
|
}
|
|
|
|
}
|
2019-01-09 10:23:53 +03:00
|
|
|
|
|
|
|
// Return nil if there are no messages to be sent
|
|
|
|
if len(cmsgs)+len(wmsgs) == 0 {
|
2016-09-11 17:49:48 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-01-09 10:23:53 +03:00
|
|
|
// Sort both message channel and webhook message slices
|
2019-01-09 20:35:50 +03:00
|
|
|
if len(cmsgs) > 1 {
|
|
|
|
sortMsgs(cmsgs)
|
|
|
|
}
|
|
|
|
if len(wmsgs) > 1 {
|
|
|
|
sortMsgs(wmsgs)
|
|
|
|
}
|
2019-01-09 10:23:53 +03:00
|
|
|
|
|
|
|
// Publish all channel messages if any exist
|
|
|
|
if len(cmsgs) > 0 {
|
|
|
|
for _, m := range cmsgs {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.Publish(gjson.Get(m, "hook").String(), m)
|
2019-01-09 10:23:53 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Queue the webhook messages in the buntdb database
|
2019-10-30 20:17:59 +03:00
|
|
|
err := s.qdb.Update(func(tx *buntdb.Tx) error {
|
2019-01-09 10:23:53 +03:00
|
|
|
for _, msg := range wmsgs {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.qidx++ // increment the log id
|
|
|
|
key := hookLogPrefix + uint64ToString(s.qidx)
|
2019-01-09 10:23:53 +03:00
|
|
|
_, _, err := tx.Set(key, msg, hookLogSetDefaults)
|
2016-09-11 17:49:48 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
log.Debugf("queued hook: %d", s.qidx)
|
2016-09-11 17:49:48 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
_, _, err := tx.Set("hook:idx", uint64ToString(s.qidx), nil)
|
2016-09-11 17:49:48 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// all the messages have been queued.
|
|
|
|
// notify the hooks
|
2019-01-09 10:23:53 +03:00
|
|
|
for _, hook := range whooks {
|
2016-09-11 17:49:48 +03:00
|
|
|
hook.Signal()
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
return nil
|
|
|
|
}
|
2019-01-09 10:23:53 +03:00
|
|
|
|
2019-01-09 20:35:50 +03:00
|
|
|
// sortMsgs sorts passed notification messages by their detect and hook fields
|
2019-01-09 10:23:53 +03:00
|
|
|
func sortMsgs(msgs []string) {
|
2019-01-09 20:35:50 +03:00
|
|
|
sort.SliceStable(msgs, func(i, j int) bool {
|
2019-01-09 10:23:53 +03:00
|
|
|
detectI := msgDetectCode(gjson.Get(msgs[i], "detect").String())
|
|
|
|
detectJ := msgDetectCode(gjson.Get(msgs[j], "detect").String())
|
|
|
|
if detectI < detectJ {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
if detectI > detectJ {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
hookI := gjson.Get(msgs[i], "hook").String()
|
|
|
|
hookJ := gjson.Get(msgs[j], "hook").String()
|
|
|
|
return hookI < hookJ
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-01-09 20:35:50 +03:00
|
|
|
// msgDetectCode returns a weight value for the passed detect value
|
|
|
|
func msgDetectCode(detect string) int {
|
|
|
|
switch detect {
|
2019-01-09 10:23:53 +03:00
|
|
|
case "exit":
|
|
|
|
return 1
|
|
|
|
case "outside":
|
|
|
|
return 2
|
|
|
|
case "enter":
|
|
|
|
return 3
|
|
|
|
case "inside":
|
|
|
|
return 4
|
|
|
|
default:
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
|
2016-09-11 17:49:48 +03:00
|
|
|
// Converts string to an integer
|
|
|
|
func stringToUint64(s string) uint64 {
|
|
|
|
n, _ := strconv.ParseUint(s, 10, 64)
|
|
|
|
return n
|
|
|
|
}
|
|
|
|
|
|
|
|
// Converts a uint to a string
|
|
|
|
func uint64ToString(u uint64) string {
|
|
|
|
s := strings.Repeat("0", 20) + strconv.FormatUint(u, 10)
|
|
|
|
return s[len(s)-20:]
|
|
|
|
}
|
|
|
|
|
2016-03-05 02:08:16 +03:00
|
|
|
type liveAOFSwitches struct {
|
|
|
|
pos int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s liveAOFSwitches) Error() string {
|
2018-08-14 03:05:30 +03:00
|
|
|
return goingLive
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
|
2022-09-27 01:43:14 +03:00
|
|
|
// AOFMD5 pos size
|
|
|
|
func (s *Server) cmdAOFMD5(msg *Message) (resp.Value, error) {
|
2016-03-05 02:08:16 +03:00
|
|
|
start := time.Now()
|
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
|
|
|
|
2022-09-27 01:43:14 +03:00
|
|
|
// >> Args
|
|
|
|
|
|
|
|
args := msg.Args
|
|
|
|
if len(args) != 3 {
|
|
|
|
return retrerr(errInvalidNumberOfArguments)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
pos, err := strconv.ParseInt(args[1], 10, 64)
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil || pos < 0 {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(errInvalidArgument(args[1]))
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
size, err := strconv.ParseInt(args[2], 10, 64)
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil || size < 0 {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(errInvalidArgument(args[2]))
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
|
|
|
|
// >> Operation
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
sum, err := s.checksum(pos, size)
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(err)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
|
|
|
|
// >> Response
|
|
|
|
|
|
|
|
if msg.OutputType == JSON {
|
|
|
|
return resp.StringValue(fmt.Sprintf(
|
|
|
|
`{"ok":true,"md5":"%s","elapsed":"%s"}`,
|
|
|
|
sum, time.Since(start))), nil
|
2016-04-01 02:26:36 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
return resp.SimpleStringValue(sum), nil
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
|
2022-09-27 01:43:14 +03:00
|
|
|
// AOF pos
|
|
|
|
func (s *Server) cmdAOF(msg *Message) (resp.Value, error) {
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.aof == nil {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(errors.New("aof disabled"))
|
2018-04-11 20:53:36 +03:00
|
|
|
}
|
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
|
|
|
|
2022-09-27 01:43:14 +03:00
|
|
|
// >> Args
|
|
|
|
|
|
|
|
args := msg.Args
|
|
|
|
if len(args) != 2 {
|
|
|
|
return retrerr(errInvalidNumberOfArguments)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
|
|
|
|
pos, err := strconv.ParseInt(args[1], 10, 64)
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil || pos < 0 {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(errInvalidArgument(args[1]))
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
|
|
|
|
// >> Operation
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
f, err := os.Open(s.aof.Name())
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(err)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
defer f.Close()
|
2022-09-27 01:43:14 +03:00
|
|
|
|
2016-03-05 02:08:16 +03:00
|
|
|
n, err := f.Seek(0, 2)
|
|
|
|
if err != nil {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(err)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
|
2016-03-05 02:08:16 +03:00
|
|
|
if n < pos {
|
2022-09-27 01:43:14 +03:00
|
|
|
return retrerr(errors.New(
|
|
|
|
"pos is too big, must be less that the aof_size of leader"))
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
|
|
|
|
// >> Response
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
var ls liveAOFSwitches
|
|
|
|
ls.pos = pos
|
|
|
|
return NOMessage, ls
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error {
|
2021-06-13 17:53:27 +03:00
|
|
|
s.mu.RLock()
|
|
|
|
f, err := os.Open(s.aof.Name())
|
|
|
|
s.mu.RUnlock()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.mu.Lock()
|
2021-06-13 17:53:27 +03:00
|
|
|
s.aofconnM[conn] = f
|
2019-10-30 20:17:59 +03:00
|
|
|
s.mu.Unlock()
|
2016-04-01 04:20:42 +03:00
|
|
|
defer func() {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.mu.Lock()
|
|
|
|
delete(s.aofconnM, conn)
|
|
|
|
s.mu.Unlock()
|
2016-04-01 04:20:42 +03:00
|
|
|
conn.Close()
|
2022-09-27 01:43:14 +03:00
|
|
|
f.Close()
|
2016-04-01 04:20:42 +03:00
|
|
|
}()
|
|
|
|
|
2016-04-01 02:26:36 +03:00
|
|
|
if _, err := conn.Write([]byte("+OK\r\n")); err != nil {
|
|
|
|
return err
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
if _, err := f.Seek(pos, 0); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
2016-03-05 02:08:16 +03:00
|
|
|
go func() {
|
|
|
|
defer func() {
|
2022-09-27 01:43:14 +03:00
|
|
|
f.Close()
|
|
|
|
conn.Close()
|
|
|
|
wg.Done()
|
2016-03-05 02:08:16 +03:00
|
|
|
}()
|
2022-09-27 01:43:14 +03:00
|
|
|
// Any incoming message should end the connection
|
|
|
|
rd.ReadMessages()
|
2016-03-05 02:08:16 +03:00
|
|
|
}()
|
2022-09-27 01:43:14 +03:00
|
|
|
_, err = io.Copy(conn, f)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
b := make([]byte, 4096*2)
|
|
|
|
for {
|
|
|
|
n, err := f.Read(b)
|
|
|
|
if n > 0 {
|
|
|
|
if _, err := conn.Write(b[:n]); err != nil {
|
2016-03-05 02:08:16 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2022-09-27 01:43:14 +03:00
|
|
|
if err == io.EOF {
|
|
|
|
time.Sleep(time.Second / 4)
|
|
|
|
} else if err != nil {
|
|
|
|
return err
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|