tile38/internal/server/aof.go

476 lines
10 KiB
Go
Raw Normal View History

package server
2016-03-05 02:08:16 +03:00
import (
"errors"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
2018-11-11 02:16:04 +03:00
"sync/atomic"
2016-03-05 02:08:16 +03:00
"time"
2016-09-11 17:49:48 +03:00
"github.com/tidwall/buntdb"
2018-11-24 04:15:14 +03:00
"github.com/tidwall/geojson"
2017-10-05 02:15:20 +03:00
"github.com/tidwall/redcon"
2016-03-28 18:57:41 +03:00
"github.com/tidwall/resp"
"github.com/tidwall/tile38/internal/log"
2016-03-05 02:08:16 +03:00
)
2016-03-19 17:16:19 +03:00
type errAOFHook struct {
err error
}
func (err errAOFHook) Error() string {
return fmt.Sprintf("hook: %v", err.err)
}
var errInvalidAOF = errors.New("invalid aof file")
func (server *Server) loadAOF() error {
fi, err := server.aof.Stat()
2016-04-01 02:26:36 +03:00
if err != nil {
return err
}
2016-03-05 02:08:16 +03:00
start := time.Now()
var count int
defer func() {
d := time.Now().Sub(start)
ps := float64(count) / (float64(d) / float64(time.Second))
2016-04-01 02:26:36 +03:00
suf := []string{"bytes/s", "KB/s", "MB/s", "GB/s", "TB/s"}
bps := float64(fi.Size()) / (float64(d) / float64(time.Second))
for i := 0; bps > 1024; i++ {
if len(suf) == 1 {
break
}
bps /= 1024
suf = suf[1:]
}
byteSpeed := fmt.Sprintf("%.0f %s", bps, suf[0])
log.Infof("AOF loaded %d commands: %.2fs, %.0f/s, %s",
count, float64(d)/float64(time.Second), ps, byteSpeed)
2016-03-05 02:08:16 +03:00
}()
2017-10-05 02:15:20 +03:00
var buf []byte
var args [][]byte
var packet [0xFFFF]byte
2016-03-05 02:08:16 +03:00
for {
n, err := server.aof.Read(packet[:])
2016-03-05 02:08:16 +03:00
if err != nil {
if err == io.EOF {
2017-10-05 02:15:20 +03:00
if len(buf) > 0 {
return io.ErrUnexpectedEOF
}
2016-03-05 02:08:16 +03:00
return nil
}
return err
}
server.aofsz += n
2017-10-05 02:15:20 +03:00
data := packet[:n]
if len(buf) > 0 {
data = append(buf, data...)
}
2017-10-05 02:15:20 +03:00
var complete bool
for {
complete, args, _, data, err = redcon.ReadNextCommand(data, args[:0])
if err != nil {
return err
}
2017-10-05 02:15:20 +03:00
if !complete {
break
}
2017-10-05 02:15:20 +03:00
if len(args) > 0 {
2018-11-15 19:15:39 +03:00
var msg Message
msg.Args = msg.Args[:0]
2017-10-05 02:15:20 +03:00
for _, arg := range args {
msg.Args = append(msg.Args, string(arg))
2017-10-05 02:15:20 +03:00
}
if _, _, err := server.command(&msg, nil); err != nil {
2017-10-05 02:15:20 +03:00
if commandErrIsFatal(err) {
return err
}
}
count++
}
2016-03-19 17:16:19 +03:00
}
2017-10-05 02:15:20 +03:00
if len(data) > 0 {
buf = append(buf[:0], data...)
} else if len(buf) > 0 {
buf = buf[:0]
2016-03-05 02:08:16 +03:00
}
}
}
2017-10-05 02:15:20 +03:00
2016-03-30 19:32:38 +03:00
func commandErrIsFatal(err error) bool {
// FSET (and other writable commands) may return errors that we need
// to ignore during the loading process. These errors may occur (though unlikely)
// due to the aof rewrite operation.
switch err {
case errKeyNotFound, errIDNotFound:
return false
}
return true
}
func (server *Server) flushAOF() {
if len(server.aofbuf) > 0 {
_, err := server.aof.Write(server.aofbuf)
if err != nil {
panic(err)
}
server.aofbuf = server.aofbuf[:0]
}
}
2018-11-24 01:53:33 +03:00
func (server *Server) writeAOF(args []string, d *commandDetails) error {
2018-08-14 03:05:30 +03:00
if d != nil && !d.updated {
// just ignore writes if the command did not update
return nil
2016-03-19 17:16:19 +03:00
}
if server.shrinking {
nargs := make([]string, len(args))
copy(nargs, args)
server.shrinklog = append(server.shrinklog, nargs)
2016-03-28 18:57:41 +03:00
}
if server.aof != nil {
2018-11-11 02:16:04 +03:00
atomic.StoreInt32(&server.aofdirty, 1) // prewrite optimization flag
n := len(server.aofbuf)
server.aofbuf = redcon.AppendArray(server.aofbuf, len(args))
for _, arg := range args {
server.aofbuf = redcon.AppendBulkString(server.aofbuf, arg)
}
server.aofsz += len(server.aofbuf) - n
2016-03-20 04:31:59 +03:00
}
2016-03-05 02:08:16 +03:00
// notify aof live connections that we have new data
server.fcond.L.Lock()
server.fcond.Broadcast()
server.fcond.L.Unlock()
2016-03-05 02:08:16 +03:00
2018-08-14 03:05:30 +03:00
// process geofences
2016-03-05 02:08:16 +03:00
if d != nil {
2018-08-14 03:05:30 +03:00
// webhook geofences
if server.config.followHost() == "" {
2018-08-14 03:05:30 +03:00
// for leader only
if d.parent {
// queue children
for _, d := range d.children {
if err := server.queueHooks(d); err != nil {
2018-08-14 03:05:30 +03:00
return err
}
}
} else {
// queue parent
if err := server.queueHooks(d); err != nil {
2018-08-14 03:05:30 +03:00
return err
}
}
}
2018-11-23 11:39:04 +03:00
2018-08-14 03:05:30 +03:00
// live geofences
server.lcond.L.Lock()
2018-11-24 01:38:49 +03:00
if len(server.lives) > 0 {
if d.parent {
// queue children
for _, d := range d.children {
server.lstack = append(server.lstack, d)
}
} else {
// queue parent
server.lstack = append(server.lstack, d)
}
2018-11-24 01:38:49 +03:00
server.lcond.Broadcast()
}
server.lcond.L.Unlock()
2016-03-05 02:08:16 +03:00
}
2016-04-02 17:20:30 +03:00
return nil
}
2016-03-19 17:16:19 +03:00
2018-11-24 04:15:14 +03:00
func (server *Server) getQueueCandidates(d *commandDetails) []*Hook {
var candidates []*Hook
// add the hooks with "outside" detection
if len(server.hooksOut) > 0 {
for _, hook := range server.hooksOut {
if hook.Key == d.key {
candidates = append(candidates, hook)
}
}
}
// search the hook spatial tree
for _, obj := range []geojson.Object{d.obj, d.oldObj} {
if obj == nil {
continue
}
rect := obj.Rect()
server.hookTree.Search(
[]float64{rect.Min.X, rect.Min.Y},
[]float64{rect.Max.X, rect.Max.Y},
func(_, _ []float64, value interface{}) bool {
hook := value.(*Hook)
var found bool
for _, candidate := range candidates {
if candidate == hook {
found = true
break
}
}
if !found {
candidates = append(candidates, hook)
}
return true
},
)
}
return candidates
}
2018-11-24 01:53:33 +03:00
func (server *Server) queueHooks(d *commandDetails) error {
2016-09-11 17:49:48 +03:00
// big list of all of the messages
2018-08-14 03:05:30 +03:00
var hmsgs []string
2016-09-11 17:49:48 +03:00
var hooks []*Hook
2018-11-24 04:15:14 +03:00
candidates := server.getQueueCandidates(d)
for _, hook := range candidates {
// match the fence
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, hook.Metas, d)
if len(msgs) > 0 {
if hook.channel {
server.Publish(hook.Name, msgs...)
} else {
// append each msg to the big list
hmsgs = append(hmsgs, msgs...)
hooks = append(hooks, hook)
2016-09-11 17:49:48 +03:00
}
2016-04-02 17:20:30 +03:00
}
}
2016-09-11 17:49:48 +03:00
if len(hmsgs) == 0 {
return nil
}
// queue the message in the buntdb database
err := server.qdb.Update(func(tx *buntdb.Tx) error {
2016-09-11 17:49:48 +03:00
for _, msg := range hmsgs {
server.qidx++ // increment the log id
key := hookLogPrefix + uint64ToString(server.qidx)
2018-04-18 01:40:11 +03:00
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults)
2016-09-11 17:49:48 +03:00
if err != nil {
return err
}
log.Debugf("queued hook: %d", server.qidx)
2016-09-11 17:49:48 +03:00
}
_, _, err := tx.Set("hook:idx", uint64ToString(server.qidx), nil)
2016-09-11 17:49:48 +03:00
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
// all the messages have been queued.
// notify the hooks
for _, hook := range hooks {
hook.Signal()
}
2016-03-05 02:08:16 +03:00
return nil
}
2016-09-11 17:49:48 +03:00
// Converts string to an integer
func stringToUint64(s string) uint64 {
n, _ := strconv.ParseUint(s, 10, 64)
return n
}
// Converts a uint to a string
func uint64ToString(u uint64) string {
s := strings.Repeat("0", 20) + strconv.FormatUint(u, 10)
return s[len(s)-20:]
}
2016-03-05 02:08:16 +03:00
type liveAOFSwitches struct {
pos int64
}
func (s liveAOFSwitches) Error() string {
2018-08-14 03:05:30 +03:00
return goingLive
2016-03-05 02:08:16 +03:00
}
func (server *Server) cmdAOFMD5(msg *Message) (res resp.Value, err error) {
2016-03-05 02:08:16 +03:00
start := time.Now()
vs := msg.Args[1:]
2016-04-01 02:26:36 +03:00
var ok bool
2016-03-05 02:08:16 +03:00
var spos, ssize string
2017-10-05 18:20:40 +03:00
2016-04-01 02:26:36 +03:00
if vs, spos, ok = tokenval(vs); !ok || spos == "" {
return NOMessage, errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if vs, ssize, ok = tokenval(vs); !ok || ssize == "" {
return NOMessage, errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if len(vs) != 0 {
return NOMessage, errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
pos, err := strconv.ParseInt(spos, 10, 64)
if err != nil || pos < 0 {
return NOMessage, errInvalidArgument(spos)
2016-03-05 02:08:16 +03:00
}
size, err := strconv.ParseInt(ssize, 10, 64)
if err != nil || size < 0 {
return NOMessage, errInvalidArgument(ssize)
2016-03-05 02:08:16 +03:00
}
sum, err := server.checksum(pos, size)
2016-03-05 02:08:16 +03:00
if err != nil {
return NOMessage, err
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
switch msg.OutputType {
case JSON:
2017-10-05 18:20:40 +03:00
res = resp.StringValue(
fmt.Sprintf(`{"ok":true,"md5":"%s","elapsed":"%s"}`, sum, time.Now().Sub(start)))
case RESP:
2017-10-05 18:20:40 +03:00
res = resp.SimpleStringValue(sum)
2016-04-01 02:26:36 +03:00
}
return res, nil
2016-03-05 02:08:16 +03:00
}
func (server *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
if server.aof == nil {
return NOMessage, errors.New("aof disabled")
}
vs := msg.Args[1:]
2017-10-05 18:20:40 +03:00
2016-04-01 02:26:36 +03:00
var ok bool
2016-03-05 02:08:16 +03:00
var spos string
2016-04-01 02:26:36 +03:00
if vs, spos, ok = tokenval(vs); !ok || spos == "" {
return NOMessage, errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
if len(vs) != 0 {
return NOMessage, errInvalidNumberOfArguments
2016-03-05 02:08:16 +03:00
}
pos, err := strconv.ParseInt(spos, 10, 64)
if err != nil || pos < 0 {
return NOMessage, errInvalidArgument(spos)
2016-03-05 02:08:16 +03:00
}
f, err := os.Open(server.aof.Name())
2016-03-05 02:08:16 +03:00
if err != nil {
return NOMessage, err
2016-03-05 02:08:16 +03:00
}
defer f.Close()
n, err := f.Seek(0, 2)
if err != nil {
return NOMessage, err
2016-03-05 02:08:16 +03:00
}
if n < pos {
return NOMessage, errors.New("pos is too big, must be less that the aof_size of leader")
2016-03-05 02:08:16 +03:00
}
var s liveAOFSwitches
s.pos = pos
return NOMessage, s
2016-03-05 02:08:16 +03:00
}
func (server *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error {
server.mu.Lock()
server.aofconnM[conn] = true
server.mu.Unlock()
2016-04-01 04:20:42 +03:00
defer func() {
server.mu.Lock()
delete(server.aofconnM, conn)
server.mu.Unlock()
2016-04-01 04:20:42 +03:00
conn.Close()
}()
2016-04-01 02:26:36 +03:00
if _, err := conn.Write([]byte("+OK\r\n")); err != nil {
return err
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
server.mu.RLock()
f, err := os.Open(server.aof.Name())
server.mu.RUnlock()
2016-03-05 02:08:16 +03:00
if err != nil {
return err
}
defer f.Close()
if _, err := f.Seek(pos, 0); err != nil {
return err
}
cond := sync.NewCond(&sync.Mutex{})
var mustQuit bool
go func() {
defer func() {
cond.L.Lock()
mustQuit = true
cond.Broadcast()
cond.L.Unlock()
}()
for {
vs, err := rd.ReadMessages()
2016-03-05 02:08:16 +03:00
if err != nil {
if err != io.EOF {
log.Error(err)
}
return
}
for _, v := range vs {
switch v.Command() {
default:
log.Error("received a live command that was not QUIT")
return
case "quit", "":
return
}
2016-03-05 02:08:16 +03:00
}
}
}()
go func() {
defer func() {
cond.L.Lock()
mustQuit = true
cond.Broadcast()
cond.L.Unlock()
}()
err := func() error {
_, err := io.Copy(conn, f)
if err != nil {
return err
}
2016-04-01 03:58:02 +03:00
b := make([]byte, 4096)
// The reader needs to be OK with the eof not
2016-03-05 02:08:16 +03:00
for {
2016-04-01 03:58:02 +03:00
n, err := f.Read(b)
if err != io.EOF && n > 0 {
2016-03-28 18:57:41 +03:00
if err != nil {
return err
}
2016-04-01 03:58:02 +03:00
if _, err := conn.Write(b[:n]); err != nil {
2016-03-05 02:08:16 +03:00
return err
}
continue
}
server.fcond.L.Lock()
server.fcond.Wait()
server.fcond.L.Unlock()
2016-03-05 02:08:16 +03:00
}
}()
if err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") &&
!strings.Contains(err.Error(), "bad file descriptor") {
log.Error(err)
}
return
}
}()
for {
cond.L.Lock()
if mustQuit {
cond.L.Unlock()
return nil
}
cond.Wait()
cond.L.Unlock()
}
}