2018-10-29 01:49:45 +03:00
|
|
|
package server
|
2016-03-05 02:08:16 +03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
2020-09-23 02:19:49 +03:00
|
|
|
"math"
|
2016-03-05 02:08:16 +03:00
|
|
|
"net"
|
|
|
|
"os"
|
2019-01-09 10:23:53 +03:00
|
|
|
"sort"
|
2016-03-05 02:08:16 +03:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
2018-11-11 02:16:04 +03:00
|
|
|
"sync/atomic"
|
2016-03-05 02:08:16 +03:00
|
|
|
"time"
|
|
|
|
|
2016-09-11 17:49:48 +03:00
|
|
|
"github.com/tidwall/buntdb"
|
2019-01-09 10:23:53 +03:00
|
|
|
"github.com/tidwall/gjson"
|
2017-10-05 02:15:20 +03:00
|
|
|
"github.com/tidwall/redcon"
|
2016-03-28 18:57:41 +03:00
|
|
|
"github.com/tidwall/resp"
|
2018-10-11 00:25:40 +03:00
|
|
|
"github.com/tidwall/tile38/internal/log"
|
2016-03-05 02:08:16 +03:00
|
|
|
)
|
|
|
|
|
2016-03-19 17:16:19 +03:00
|
|
|
type errAOFHook struct {
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (err errAOFHook) Error() string {
|
|
|
|
return fmt.Sprintf("hook: %v", err.err)
|
|
|
|
}
|
|
|
|
|
2020-10-07 19:52:32 +03:00
|
|
|
func (s *Server) loadAOF() (err error) {
|
2019-10-30 20:17:59 +03:00
|
|
|
fi, err := s.aof.Stat()
|
2016-04-01 02:26:36 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
start := time.Now()
|
|
|
|
var count int
|
|
|
|
defer func() {
|
2021-03-31 18:13:44 +03:00
|
|
|
d := time.Since(start)
|
2016-03-05 02:08:16 +03:00
|
|
|
ps := float64(count) / (float64(d) / float64(time.Second))
|
2016-04-01 02:26:36 +03:00
|
|
|
suf := []string{"bytes/s", "KB/s", "MB/s", "GB/s", "TB/s"}
|
|
|
|
bps := float64(fi.Size()) / (float64(d) / float64(time.Second))
|
|
|
|
for i := 0; bps > 1024; i++ {
|
|
|
|
if len(suf) == 1 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
bps /= 1024
|
|
|
|
suf = suf[1:]
|
|
|
|
}
|
|
|
|
byteSpeed := fmt.Sprintf("%.0f %s", bps, suf[0])
|
|
|
|
log.Infof("AOF loaded %d commands: %.2fs, %.0f/s, %s",
|
|
|
|
count, float64(d)/float64(time.Second), ps, byteSpeed)
|
2016-03-05 02:08:16 +03:00
|
|
|
}()
|
2017-10-05 02:15:20 +03:00
|
|
|
var buf []byte
|
|
|
|
var args [][]byte
|
|
|
|
var packet [0xFFFF]byte
|
2020-10-07 19:52:32 +03:00
|
|
|
var zeros int
|
2016-03-05 02:08:16 +03:00
|
|
|
for {
|
2019-10-30 20:17:59 +03:00
|
|
|
n, err := s.aof.Read(packet[:])
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
|
|
|
if err == io.EOF {
|
2017-10-05 02:15:20 +03:00
|
|
|
if len(buf) > 0 {
|
|
|
|
return io.ErrUnexpectedEOF
|
|
|
|
}
|
2020-10-07 19:52:32 +03:00
|
|
|
if zeros > 0 {
|
|
|
|
// Trailing zeros in AOF. Truncate the file so it's sane.
|
|
|
|
// See issue #230 for more information. Force a warning.
|
|
|
|
log.Infof("Truncating %d zeros from AOF (issue #230)", zeros)
|
|
|
|
s.aofsz -= zeros
|
|
|
|
if err := s.aof.Truncate(int64(s.aofsz)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := s.aof.Seek(int64(s.aofsz), 0); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofsz += n
|
2017-10-05 02:15:20 +03:00
|
|
|
data := packet[:n]
|
|
|
|
if len(buf) > 0 {
|
|
|
|
data = append(buf, data...)
|
2016-11-09 23:43:26 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
var complete bool
|
|
|
|
for {
|
2020-10-07 19:52:32 +03:00
|
|
|
if len(data) > 0 {
|
|
|
|
if data[0] == 0 {
|
|
|
|
zeros++
|
|
|
|
data = data[1:]
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if zeros > 0 {
|
2021-03-31 18:13:44 +03:00
|
|
|
return clientErrorf("Zeros found in AOF file (issue #230)")
|
2020-10-07 19:52:32 +03:00
|
|
|
}
|
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
complete, args, _, data, err = redcon.ReadNextCommand(data, args[:0])
|
2016-11-09 23:43:26 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
if !complete {
|
|
|
|
break
|
2016-11-09 23:43:26 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
if len(args) > 0 {
|
2018-11-15 19:15:39 +03:00
|
|
|
var msg Message
|
2018-10-29 01:49:45 +03:00
|
|
|
msg.Args = msg.Args[:0]
|
2017-10-05 02:15:20 +03:00
|
|
|
for _, arg := range args {
|
2018-10-29 01:49:45 +03:00
|
|
|
msg.Args = append(msg.Args, string(arg))
|
2017-10-05 02:15:20 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
if _, _, err := s.command(&msg, nil); err != nil {
|
2017-10-05 02:15:20 +03:00
|
|
|
if commandErrIsFatal(err) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
count++
|
2016-11-09 23:43:26 +03:00
|
|
|
}
|
2016-03-19 17:16:19 +03:00
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
if len(data) > 0 {
|
|
|
|
buf = append(buf[:0], data...)
|
|
|
|
} else if len(buf) > 0 {
|
|
|
|
buf = buf[:0]
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-10-05 02:15:20 +03:00
|
|
|
|
2016-03-30 19:32:38 +03:00
|
|
|
func commandErrIsFatal(err error) bool {
|
|
|
|
// FSET (and other writable commands) may return errors that we need
|
|
|
|
// to ignore during the loading process. These errors may occur (though unlikely)
|
|
|
|
// due to the aof rewrite operation.
|
|
|
|
switch err {
|
|
|
|
case errKeyNotFound, errIDNotFound:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2019-09-04 03:01:26 +03:00
|
|
|
// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the
|
|
|
|
// fsync the file.
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) flushAOF(sync bool) {
|
|
|
|
if len(s.aofbuf) > 0 {
|
|
|
|
_, err := s.aof.Write(s.aofbuf)
|
2018-10-29 01:49:45 +03:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2019-03-10 20:48:14 +03:00
|
|
|
if sync {
|
2019-10-30 20:17:59 +03:00
|
|
|
if err := s.aof.Sync(); err != nil {
|
2019-03-10 20:48:14 +03:00
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
if cap(s.aofbuf) > 1024*1024*32 {
|
|
|
|
s.aofbuf = make([]byte, 0, 1024*1024*32)
|
2019-09-04 03:01:26 +03:00
|
|
|
} else {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofbuf = s.aofbuf[:0]
|
2019-09-04 03:01:26 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) writeAOF(args []string, d *commandDetails) error {
|
2018-08-14 03:05:30 +03:00
|
|
|
if d != nil && !d.updated {
|
|
|
|
// just ignore writes if the command did not update
|
|
|
|
return nil
|
2016-03-19 17:16:19 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.shrinking {
|
2018-10-29 01:49:45 +03:00
|
|
|
nargs := make([]string, len(args))
|
|
|
|
copy(nargs, args)
|
2019-10-30 20:17:59 +03:00
|
|
|
s.shrinklog = append(s.shrinklog, nargs)
|
2016-03-28 18:57:41 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.aof != nil {
|
|
|
|
atomic.StoreInt32(&s.aofdirty, 1) // prewrite optimization flag
|
|
|
|
n := len(s.aofbuf)
|
|
|
|
s.aofbuf = redcon.AppendArray(s.aofbuf, len(args))
|
2018-10-29 01:49:45 +03:00
|
|
|
for _, arg := range args {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofbuf = redcon.AppendBulkString(s.aofbuf, arg)
|
2018-04-11 20:53:36 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.aofsz += len(s.aofbuf) - n
|
2016-03-20 04:31:59 +03:00
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
|
|
|
|
// notify aof live connections that we have new data
|
2019-10-30 20:17:59 +03:00
|
|
|
s.fcond.L.Lock()
|
|
|
|
s.fcond.Broadcast()
|
|
|
|
s.fcond.L.Unlock()
|
2016-03-05 02:08:16 +03:00
|
|
|
|
2018-08-14 03:05:30 +03:00
|
|
|
// process geofences
|
2016-03-05 02:08:16 +03:00
|
|
|
if d != nil {
|
2018-08-14 03:05:30 +03:00
|
|
|
// webhook geofences
|
2019-10-30 20:17:59 +03:00
|
|
|
if s.config.followHost() == "" {
|
2018-08-14 03:05:30 +03:00
|
|
|
// for leader only
|
|
|
|
if d.parent {
|
|
|
|
// queue children
|
|
|
|
for _, d := range d.children {
|
2019-10-30 20:17:59 +03:00
|
|
|
if err := s.queueHooks(d); err != nil {
|
2018-08-14 03:05:30 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// queue parent
|
2019-10-30 20:17:59 +03:00
|
|
|
if err := s.queueHooks(d); err != nil {
|
2018-08-14 03:05:30 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-11-23 11:39:04 +03:00
|
|
|
|
2018-08-14 03:05:30 +03:00
|
|
|
// live geofences
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lcond.L.Lock()
|
|
|
|
if len(s.lives) > 0 {
|
2018-11-24 01:38:49 +03:00
|
|
|
if d.parent {
|
|
|
|
// queue children
|
2021-03-31 18:13:44 +03:00
|
|
|
s.lstack = append(s.lstack, d.children...)
|
2018-11-24 01:38:49 +03:00
|
|
|
} else {
|
|
|
|
// queue parent
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lstack = append(s.lstack, d)
|
2016-12-29 17:53:01 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lcond.Broadcast()
|
2016-12-29 17:53:01 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.lcond.L.Unlock()
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-02 17:20:30 +03:00
|
|
|
return nil
|
|
|
|
}
|
2016-03-19 17:16:19 +03:00
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) getQueueCandidates(d *commandDetails) []*Hook {
|
2020-10-23 19:51:27 +03:00
|
|
|
candidates := make(map[*Hook]bool)
|
2018-11-24 04:15:14 +03:00
|
|
|
// add the hooks with "outside" detection
|
2020-10-23 19:51:27 +03:00
|
|
|
for _, hook := range s.hooksOut {
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
2018-11-24 04:15:14 +03:00
|
|
|
}
|
|
|
|
}
|
2020-10-23 19:51:27 +03:00
|
|
|
// look for candidates that might "cross" geofences
|
|
|
|
if d.oldObj != nil && d.obj != nil && s.hookCross.Len() > 0 {
|
|
|
|
r1, r2 := d.oldObj.Rect(), d.obj.Rect()
|
|
|
|
s.hookCross.Search(
|
|
|
|
[2]float64{
|
|
|
|
math.Min(r1.Min.X, r2.Min.X),
|
|
|
|
math.Min(r1.Min.Y, r2.Min.Y),
|
|
|
|
},
|
|
|
|
[2]float64{
|
|
|
|
math.Max(r1.Max.X, r2.Max.X),
|
|
|
|
math.Max(r1.Max.Y, r2.Max.Y),
|
|
|
|
},
|
|
|
|
func(min, max [2]float64, value interface{}) bool {
|
|
|
|
hook := value.(*Hook)
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
})
|
2018-11-24 04:15:14 +03:00
|
|
|
}
|
2020-10-23 19:51:27 +03:00
|
|
|
// look for candidates that overlap the old object
|
|
|
|
if d.oldObj != nil {
|
|
|
|
r1 := d.oldObj.Rect()
|
|
|
|
s.hookTree.Search(
|
|
|
|
[2]float64{r1.Min.X, r1.Min.Y},
|
|
|
|
[2]float64{r1.Max.X, r1.Max.Y},
|
|
|
|
func(min, max [2]float64, value interface{}) bool {
|
|
|
|
hook := value.(*Hook)
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
|
|
|
}
|
2020-09-23 02:19:49 +03:00
|
|
|
return true
|
2020-10-23 19:51:27 +03:00
|
|
|
})
|
|
|
|
}
|
|
|
|
// look for candidates that overlap the new object
|
|
|
|
if d.obj != nil {
|
|
|
|
r1 := d.obj.Rect()
|
|
|
|
s.hookTree.Search(
|
|
|
|
[2]float64{r1.Min.X, r1.Min.Y},
|
|
|
|
[2]float64{r1.Max.X, r1.Max.Y},
|
|
|
|
func(min, max [2]float64, value interface{}) bool {
|
|
|
|
hook := value.(*Hook)
|
|
|
|
if hook.Key == d.key {
|
|
|
|
candidates[hook] = true
|
2020-09-23 02:19:49 +03:00
|
|
|
}
|
2020-10-23 19:51:27 +03:00
|
|
|
return true
|
|
|
|
})
|
|
|
|
}
|
|
|
|
if len(candidates) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// return the candidates as a slice
|
|
|
|
ret := make([]*Hook, 0, len(candidates))
|
|
|
|
for hook := range candidates {
|
|
|
|
ret = append(ret, hook)
|
|
|
|
}
|
|
|
|
return ret
|
2018-11-24 04:15:14 +03:00
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) queueHooks(d *commandDetails) error {
|
2019-01-09 10:23:53 +03:00
|
|
|
// Create the slices that will store all messages and hooks
|
|
|
|
var cmsgs, wmsgs []string
|
|
|
|
var whooks []*Hook
|
2018-11-24 04:15:14 +03:00
|
|
|
|
2019-01-09 10:23:53 +03:00
|
|
|
// Compile a slice of potential hook recipients
|
2019-10-30 20:17:59 +03:00
|
|
|
candidates := s.getQueueCandidates(d)
|
2018-11-24 04:15:14 +03:00
|
|
|
for _, hook := range candidates {
|
2019-01-09 10:23:53 +03:00
|
|
|
// Calculate all matching fence messages for all candidates and append
|
|
|
|
// them to the appropriate message slice
|
2018-11-24 04:15:14 +03:00
|
|
|
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, hook.Metas, d)
|
|
|
|
if len(msgs) > 0 {
|
|
|
|
if hook.channel {
|
2019-01-09 10:23:53 +03:00
|
|
|
cmsgs = append(cmsgs, msgs...)
|
2018-11-24 04:15:14 +03:00
|
|
|
} else {
|
2019-01-09 10:23:53 +03:00
|
|
|
wmsgs = append(wmsgs, msgs...)
|
|
|
|
whooks = append(whooks, hook)
|
2016-09-11 17:49:48 +03:00
|
|
|
}
|
2016-04-02 17:20:30 +03:00
|
|
|
}
|
|
|
|
}
|
2019-01-09 10:23:53 +03:00
|
|
|
|
|
|
|
// Return nil if there are no messages to be sent
|
|
|
|
if len(cmsgs)+len(wmsgs) == 0 {
|
2016-09-11 17:49:48 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-01-09 10:23:53 +03:00
|
|
|
// Sort both message channel and webhook message slices
|
2019-01-09 20:35:50 +03:00
|
|
|
if len(cmsgs) > 1 {
|
|
|
|
sortMsgs(cmsgs)
|
|
|
|
}
|
|
|
|
if len(wmsgs) > 1 {
|
|
|
|
sortMsgs(wmsgs)
|
|
|
|
}
|
2019-01-09 10:23:53 +03:00
|
|
|
|
|
|
|
// Publish all channel messages if any exist
|
|
|
|
if len(cmsgs) > 0 {
|
|
|
|
for _, m := range cmsgs {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.Publish(gjson.Get(m, "hook").String(), m)
|
2019-01-09 10:23:53 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Queue the webhook messages in the buntdb database
|
2019-10-30 20:17:59 +03:00
|
|
|
err := s.qdb.Update(func(tx *buntdb.Tx) error {
|
2019-01-09 10:23:53 +03:00
|
|
|
for _, msg := range wmsgs {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.qidx++ // increment the log id
|
|
|
|
key := hookLogPrefix + uint64ToString(s.qidx)
|
2019-01-09 10:23:53 +03:00
|
|
|
_, _, err := tx.Set(key, msg, hookLogSetDefaults)
|
2016-09-11 17:49:48 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
log.Debugf("queued hook: %d", s.qidx)
|
2016-09-11 17:49:48 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
_, _, err := tx.Set("hook:idx", uint64ToString(s.qidx), nil)
|
2016-09-11 17:49:48 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// all the messages have been queued.
|
|
|
|
// notify the hooks
|
2019-01-09 10:23:53 +03:00
|
|
|
for _, hook := range whooks {
|
2016-09-11 17:49:48 +03:00
|
|
|
hook.Signal()
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
return nil
|
|
|
|
}
|
2019-01-09 10:23:53 +03:00
|
|
|
|
2019-01-09 20:35:50 +03:00
|
|
|
// sortMsgs sorts passed notification messages by their detect and hook fields
|
2019-01-09 10:23:53 +03:00
|
|
|
func sortMsgs(msgs []string) {
|
2019-01-09 20:35:50 +03:00
|
|
|
sort.SliceStable(msgs, func(i, j int) bool {
|
2019-01-09 10:23:53 +03:00
|
|
|
detectI := msgDetectCode(gjson.Get(msgs[i], "detect").String())
|
|
|
|
detectJ := msgDetectCode(gjson.Get(msgs[j], "detect").String())
|
|
|
|
if detectI < detectJ {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
if detectI > detectJ {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
hookI := gjson.Get(msgs[i], "hook").String()
|
|
|
|
hookJ := gjson.Get(msgs[j], "hook").String()
|
|
|
|
return hookI < hookJ
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-01-09 20:35:50 +03:00
|
|
|
// msgDetectCode returns a weight value for the passed detect value
|
|
|
|
func msgDetectCode(detect string) int {
|
|
|
|
switch detect {
|
2019-01-09 10:23:53 +03:00
|
|
|
case "exit":
|
|
|
|
return 1
|
|
|
|
case "outside":
|
|
|
|
return 2
|
|
|
|
case "enter":
|
|
|
|
return 3
|
|
|
|
case "inside":
|
|
|
|
return 4
|
|
|
|
default:
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
|
2016-09-11 17:49:48 +03:00
|
|
|
// Converts string to an integer
|
|
|
|
func stringToUint64(s string) uint64 {
|
|
|
|
n, _ := strconv.ParseUint(s, 10, 64)
|
|
|
|
return n
|
|
|
|
}
|
|
|
|
|
|
|
|
// Converts a uint to a string
|
|
|
|
func uint64ToString(u uint64) string {
|
|
|
|
s := strings.Repeat("0", 20) + strconv.FormatUint(u, 10)
|
|
|
|
return s[len(s)-20:]
|
|
|
|
}
|
|
|
|
|
2016-03-05 02:08:16 +03:00
|
|
|
type liveAOFSwitches struct {
|
|
|
|
pos int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s liveAOFSwitches) Error() string {
|
2018-08-14 03:05:30 +03:00
|
|
|
return goingLive
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) cmdAOFMD5(msg *Message) (res resp.Value, err error) {
|
2016-03-05 02:08:16 +03:00
|
|
|
start := time.Now()
|
2018-10-29 01:49:45 +03:00
|
|
|
vs := msg.Args[1:]
|
2016-04-01 02:26:36 +03:00
|
|
|
var ok bool
|
2016-03-05 02:08:16 +03:00
|
|
|
var spos, ssize string
|
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
|
|
|
|
2016-04-01 02:26:36 +03:00
|
|
|
if vs, spos, ok = tokenval(vs); !ok || spos == "" {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidNumberOfArguments
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-01 02:26:36 +03:00
|
|
|
if vs, ssize, ok = tokenval(vs); !ok || ssize == "" {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidNumberOfArguments
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-01 02:26:36 +03:00
|
|
|
if len(vs) != 0 {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidNumberOfArguments
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
pos, err := strconv.ParseInt(spos, 10, 64)
|
|
|
|
if err != nil || pos < 0 {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidArgument(spos)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
size, err := strconv.ParseInt(ssize, 10, 64)
|
|
|
|
if err != nil || size < 0 {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidArgument(ssize)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
sum, err := s.checksum(pos, size)
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, err
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-01 02:26:36 +03:00
|
|
|
switch msg.OutputType {
|
2018-10-29 01:49:45 +03:00
|
|
|
case JSON:
|
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
|
|
|
res = resp.StringValue(
|
2021-03-31 18:13:44 +03:00
|
|
|
fmt.Sprintf(`{"ok":true,"md5":"%s","elapsed":"%s"}`, sum, time.Since(start)))
|
2018-10-29 01:49:45 +03:00
|
|
|
case RESP:
|
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
|
|
|
res = resp.SimpleStringValue(sum)
|
2016-04-01 02:26:36 +03:00
|
|
|
}
|
|
|
|
return res, nil
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
|
|
|
|
if s.aof == nil {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errors.New("aof disabled")
|
2018-04-11 20:53:36 +03:00
|
|
|
}
|
2018-10-29 01:49:45 +03:00
|
|
|
vs := msg.Args[1:]
|
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
|
|
|
|
2016-04-01 02:26:36 +03:00
|
|
|
var ok bool
|
2016-03-05 02:08:16 +03:00
|
|
|
var spos string
|
2016-04-01 02:26:36 +03:00
|
|
|
if vs, spos, ok = tokenval(vs); !ok || spos == "" {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidNumberOfArguments
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-01 02:26:36 +03:00
|
|
|
if len(vs) != 0 {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidNumberOfArguments
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
pos, err := strconv.ParseInt(spos, 10, 64)
|
|
|
|
if err != nil || pos < 0 {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errInvalidArgument(spos)
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
f, err := os.Open(s.aof.Name())
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, err
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
n, err := f.Seek(0, 2)
|
|
|
|
if err != nil {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, err
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
if n < pos {
|
2018-10-29 01:49:45 +03:00
|
|
|
return NOMessage, errors.New("pos is too big, must be less that the aof_size of leader")
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
var ls liveAOFSwitches
|
|
|
|
ls.pos = pos
|
|
|
|
return NOMessage, ls
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error {
|
2021-06-13 17:53:27 +03:00
|
|
|
s.mu.RLock()
|
|
|
|
f, err := os.Open(s.aof.Name())
|
|
|
|
s.mu.RUnlock()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
|
2019-10-30 20:17:59 +03:00
|
|
|
s.mu.Lock()
|
2021-06-13 17:53:27 +03:00
|
|
|
s.aofconnM[conn] = f
|
2019-10-30 20:17:59 +03:00
|
|
|
s.mu.Unlock()
|
2016-04-01 04:20:42 +03:00
|
|
|
defer func() {
|
2019-10-30 20:17:59 +03:00
|
|
|
s.mu.Lock()
|
|
|
|
delete(s.aofconnM, conn)
|
|
|
|
s.mu.Unlock()
|
2016-04-01 04:20:42 +03:00
|
|
|
conn.Close()
|
|
|
|
}()
|
|
|
|
|
2016-04-01 02:26:36 +03:00
|
|
|
if _, err := conn.Write([]byte("+OK\r\n")); err != nil {
|
|
|
|
return err
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
2016-04-01 02:26:36 +03:00
|
|
|
|
2016-03-05 02:08:16 +03:00
|
|
|
if _, err := f.Seek(pos, 0); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
cond := sync.NewCond(&sync.Mutex{})
|
|
|
|
var mustQuit bool
|
|
|
|
go func() {
|
|
|
|
defer func() {
|
|
|
|
cond.L.Lock()
|
|
|
|
mustQuit = true
|
|
|
|
cond.Broadcast()
|
|
|
|
cond.L.Unlock()
|
|
|
|
}()
|
|
|
|
for {
|
2017-10-01 05:34:25 +03:00
|
|
|
vs, err := rd.ReadMessages()
|
2016-03-05 02:08:16 +03:00
|
|
|
if err != nil {
|
|
|
|
if err != io.EOF {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
2017-10-01 05:34:25 +03:00
|
|
|
for _, v := range vs {
|
2018-10-29 01:49:45 +03:00
|
|
|
switch v.Command() {
|
2017-10-01 05:34:25 +03:00
|
|
|
default:
|
|
|
|
log.Error("received a live command that was not QUIT")
|
|
|
|
return
|
|
|
|
case "quit", "":
|
|
|
|
return
|
|
|
|
}
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
go func() {
|
|
|
|
defer func() {
|
|
|
|
cond.L.Lock()
|
|
|
|
mustQuit = true
|
|
|
|
cond.Broadcast()
|
|
|
|
cond.L.Unlock()
|
|
|
|
}()
|
|
|
|
err := func() error {
|
|
|
|
_, err := io.Copy(conn, f)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-04-01 03:58:02 +03:00
|
|
|
|
|
|
|
b := make([]byte, 4096)
|
|
|
|
// The reader needs to be OK with the eof not
|
2016-03-05 02:08:16 +03:00
|
|
|
for {
|
2016-04-01 03:58:02 +03:00
|
|
|
n, err := f.Read(b)
|
2021-06-13 17:53:27 +03:00
|
|
|
if n > 0 {
|
|
|
|
if _, err := conn.Write(b[:n]); err != nil {
|
2016-03-28 18:57:41 +03:00
|
|
|
return err
|
|
|
|
}
|
2021-06-13 17:53:27 +03:00
|
|
|
}
|
|
|
|
if err != io.EOF {
|
|
|
|
if err != nil {
|
2016-03-05 02:08:16 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
2019-10-30 20:17:59 +03:00
|
|
|
s.fcond.L.Lock()
|
|
|
|
s.fcond.Wait()
|
|
|
|
s.fcond.L.Unlock()
|
2016-03-05 02:08:16 +03:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
if err != nil {
|
|
|
|
if !strings.Contains(err.Error(), "use of closed network connection") &&
|
|
|
|
!strings.Contains(err.Error(), "bad file descriptor") {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
for {
|
|
|
|
cond.L.Lock()
|
|
|
|
if mustQuit {
|
|
|
|
cond.L.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
cond.Wait()
|
|
|
|
cond.L.Unlock()
|
|
|
|
}
|
|
|
|
}
|