package controller import ( "bufio" "encoding/binary" "errors" "fmt" "io" "net" "os" "strconv" "strings" "sync" "time" "github.com/google/btree" "github.com/tidwall/resp" "github.com/tidwall/tile38/client" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" ) const backwardsBufferSize = 50000 var errCorruptedAOF = errors.New("corrupted aof file") type AOFReader struct { r io.Reader // reader rerr error // read error chunk []byte // chunk buffer buf []byte // main buffer l int // length of valid data in buffer p int // pointer } type errAOFHook struct { err error } func (err errAOFHook) Error() string { return fmt.Sprintf("hook: %v", err.err) } func (rd *AOFReader) ReadCommand() ([]byte, error) { if rd.l >= 4 { sz1 := int(binary.LittleEndian.Uint32(rd.buf[rd.p:])) if rd.l >= sz1+9 { // we have enough data for a record sz2 := int(binary.LittleEndian.Uint32(rd.buf[rd.p+4+sz1:])) if sz2 != sz1 || rd.buf[rd.p+4+sz1+4] != 0 { return nil, errCorruptedAOF } buf := rd.buf[rd.p+4 : rd.p+4+sz1] rd.p += sz1 + 9 rd.l -= sz1 + 9 return buf, nil } } // need more data if rd.rerr != nil { if rd.rerr == io.EOF { rd.rerr = nil // we want to return EOF, but we want to be able to try again if rd.l != 0 { return nil, io.ErrUnexpectedEOF } return nil, io.EOF } return nil, rd.rerr } if rd.p != 0 { // move p to the beginning copy(rd.buf, rd.buf[rd.p:rd.p+rd.l]) rd.p = 0 } var n int n, rd.rerr = rd.r.Read(rd.chunk) if n > 0 { cbuf := rd.chunk[:n] if len(rd.buf)-rd.l < n { if len(rd.buf) == 0 { rd.buf = make([]byte, len(cbuf)) copy(rd.buf, cbuf) } else { copy(rd.buf[rd.l:], cbuf[:len(rd.buf)-rd.l]) rd.buf = append(rd.buf, cbuf[len(rd.buf)-rd.l:]...) } } else { copy(rd.buf[rd.l:], cbuf) } rd.l += n } return rd.ReadCommand() } func NewAOFReader(r io.Reader) *AOFReader { rd := &AOFReader{r: r, chunk: make([]byte, 0xFFFF)} return rd } func (c *Controller) loadAOF() error { start := time.Now() var count int defer func() { d := time.Now().Sub(start) ps := float64(count) / (float64(d) / float64(time.Second)) log.Infof("AOF loaded %d commands: %s: %.0f/sec", count, d, ps) }() rd := resp.NewReader(c.f) for { v, _, n, err := rd.ReadMultiBulk() if err != nil { if err == io.EOF { return nil } return err } values := v.Array() if len(values) == 0 { return errors.New("multibulk missing command component") } msg := &server.Message{ Command: strings.ToLower(values[0].String()), Values: values, } if _, _, err := c.command(msg, nil); err != nil { return err } c.aofsz += n count++ } } func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { if d != nil { if !d.updated { return nil // just ignore writes if the command did not update } // process hooks if hm, ok := c.hookcols[d.key]; ok { for _, hook := range hm { if err := c.DoHook(hook, d); err != nil { return errAOFHook{err} } } } } data, err := value.MarshalRESP() if err != nil { return err } n, err := c.f.Write(data) if err != nil { return err } c.aofsz += n // notify aof live connections that we have new data c.fcond.L.Lock() c.fcond.Broadcast() c.fcond.L.Unlock() if d != nil { // write to live connection streams c.lcond.L.Lock() c.lstack = append(c.lstack, d) c.lcond.Broadcast() c.lcond.L.Unlock() } return nil } type liveAOFSwitches struct { pos int64 } func (s liveAOFSwitches) Error() string { return "going live" } func (c *Controller) cmdAOFMD5(line string) (string, error) { start := time.Now() var spos, ssize string if line, spos = token(line); spos == "" { return "", errInvalidNumberOfArguments } if line, ssize = token(line); ssize == "" { return "", errInvalidNumberOfArguments } if line != "" { return "", errInvalidNumberOfArguments } pos, err := strconv.ParseInt(spos, 10, 64) if err != nil || pos < 0 { return "", errInvalidArgument(spos) } size, err := strconv.ParseInt(ssize, 10, 64) if err != nil || size < 0 { return "", errInvalidArgument(ssize) } sum, err := c.checksum(pos, size) if err != nil { return "", err } return fmt.Sprintf(`{"ok":true,"md5":"%s","elapsed":"%s"}`, sum, time.Now().Sub(start)), nil } func (c *Controller) cmdAOF(line string, w io.Writer) error { var spos string if line, spos = token(line); spos == "" { return errInvalidNumberOfArguments } if line != "" { return errInvalidNumberOfArguments } pos, err := strconv.ParseInt(spos, 10, 64) if err != nil || pos < 0 { return errInvalidArgument(spos) } f, err := os.Open(c.f.Name()) if err != nil { return err } defer f.Close() n, err := f.Seek(0, 2) if err != nil { return err } if n < pos { return errors.New("pos is too big, must be less that the aof_size of leader") } var s liveAOFSwitches s.pos = pos return s } func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *bufio.Reader) error { defer conn.Close() if err := client.WriteMessage(conn, []byte(client.LiveJSON)); err != nil { return nil // nil return is fine here } c.mu.RLock() f, err := os.Open(c.f.Name()) c.mu.RUnlock() if err != nil { return err } defer f.Close() if _, err := f.Seek(pos, 0); err != nil { return err } cond := sync.NewCond(&sync.Mutex{}) var mustQuit bool go func() { defer func() { cond.L.Lock() mustQuit = true cond.Broadcast() cond.L.Unlock() }() for { command, _, _, err := client.ReadMessage(rd, nil) if err != nil { if err != io.EOF { log.Error(err) } return } cmd := string(command) if cmd != "" && strings.ToLower(cmd) != "quit" { log.Error("received a live command that was not QUIT") return } } }() go func() { defer func() { cond.L.Lock() mustQuit = true cond.Broadcast() cond.L.Unlock() }() err := func() error { _, err := io.Copy(conn, f) if err != nil { return err } rd := resp.NewReader(f) for { v, _, err := rd.ReadValue() if err != io.EOF { if err != nil { return err } data, err := v.MarshalRESP() if err != nil { return err } if _, err := conn.Write(data); err != nil { return err } continue } c.fcond.L.Lock() c.fcond.Wait() c.fcond.L.Unlock() } }() if err != nil { if !strings.Contains(err.Error(), "use of closed network connection") && !strings.Contains(err.Error(), "bad file descriptor") { log.Error(err) } return } }() for { cond.L.Lock() if mustQuit { cond.L.Unlock() return nil } cond.Wait() cond.L.Unlock() } } type treeKeyBoolT struct { key string } func (k *treeKeyBoolT) Less(item btree.Item) bool { return k.key < item.(*treeKeyBoolT).key } // aofshink shinks the aof file in the background // When completed the only command that should exist in a shrunken aof is SET. // We will read the commands backwards from last known position of the live aof // and use an ondisk key value store for state. For now we use BoltDB, in the future // we should use a store that is better performant. // The following commands may exist in the aof. // 'SET' // - Has this key been marked 'ignore'? // - Yes, then ignore // - No, Has this id been marked 'soft-ignore' or 'hard-ignore'? // - Yes, then ignore // - No // - Add command to key bucket // - Mark id as 'soft-ignore' // 'FSET' // - Has this key been marked 'ignore'? // - Yes, then ignore // - No, Has this id been marked 'hard-ignore'? // - Yes, then ignore // - No // - Add command to key bucket // 'DEL' // - Has this key been marked 'ignore'? // - Yes, then ignore // - No, Mark id as 'ignore'? // 'DROP' // - Has this key been marked 'ignore'? // - Yes, then ignore // - No, Mark key as 'ignore'? // 'SETHOOK' // - Direct copy from memory. // 'DELHOOK' // - Direct copy from memory. // 'FLUSHDB' // - Stop shrinking, nothing left to do func (c *Controller) aofshrink() { // c.mu.Lock() // c.f.Sync() // if c.shrinking { // c.mu.Unlock() // return // } // c.shrinking = true // endpos := int64(c.aofsz) // start := time.Now() // log.Infof("aof shrink started at pos %d", endpos) // var hooks []string // for _, hook := range c.hooks { // var orgs []string // for _, endpoint := range hook.Endpoints { // orgs = append(orgs, endpoint.Original) // } // hooks = append(hooks, "SETHOOK "+hook.Name+" "+strings.Join(orgs, ",")+" "+hook.Command) // } // c.mu.Unlock() // var err error // defer func() { // c.mu.Lock() // c.shrinking = false // c.mu.Unlock() // os.RemoveAll(c.dir + "/shrink.db") // os.RemoveAll(c.dir + "/shrink") // if err != nil { // log.Error("aof shrink failed: " + err.Error()) // } else { // log.Info("aof shrink completed: " + time.Now().Sub(start).String()) // } // }() // var db *bolt.DB // db, err = bolt.Open(c.dir+"/shrink.db", 0600, nil) // if err != nil { // return // } // defer db.Close() // var nf *os.File // nf, err = os.Create(c.dir + "/shrink") // if err != nil { // return // } // defer nf.Close() // defer func() { // c.mu.Lock() // defer c.mu.Unlock() // if err == nil { // c.f.Sync() // _, err = nf.Seek(0, 2) // if err == nil { // var f *os.File // f, err = os.Open(c.dir + "/aof") // if err != nil { // return // } // defer f.Close() // _, err = f.Seek(endpos, 0) // if err == nil { // _, err = io.Copy(nf, f) // if err == nil { // f.Close() // nf.Close() // // At this stage we need to kill all aof followers. To do so we will // // write a KILLAOF command to the stream. KILLAOF isn't really a command. // // This will cause the followers will close their connection and then // // automatically reconnect. The reconnection will force a sync of the aof. // err = c.writeAOF(resp.MultiBulkValue("KILLAOF"), nil) // if err == nil { // c.f.Close() // err = os.Rename(c.dir+"/shrink", c.dir+"/aof") // if err != nil { // log.Fatal("shink rename fatal operation") // } // c.f, err = os.OpenFile(c.dir+"/aof", os.O_CREATE|os.O_RDWR, 0600) // if err != nil { // log.Fatal("shink openfile fatal operation") // } // var n int64 // n, err = c.f.Seek(0, 2) // if err != nil { // log.Fatal("shink seek end fatal operation") // } // c.aofsz = int(n) // } // } // } // } // } // }() // var f *os.File // f, err = os.Open(c.dir + "/aof") // if err != nil { // return // } // defer f.Close() // var buf []byte // var pos int64 // pos, err = f.Seek(endpos, 0) // if err != nil { // return // } // var readPreviousCommand func() ([]byte, error) // readPreviousCommand = func() ([]byte, error) { // if len(buf) >= 5 { // if buf[len(buf)-1] != 0 { // return nil, errCorruptedAOF // } // sz2 := int(binary.LittleEndian.Uint32(buf[len(buf)-5:])) // if len(buf) >= sz2+9 { // sz1 := int(binary.LittleEndian.Uint32(buf[len(buf)-(sz2+9):])) // if sz1 != sz2 { // return nil, errCorruptedAOF // } // command := buf[len(buf)-(sz2+5) : len(buf)-5] // buf = buf[:len(buf)-(sz2+9)] // return command, nil // } // } // if pos == 0 { // if len(buf) > 0 { // return nil, io.ErrUnexpectedEOF // } else { // return nil, io.EOF // } // } // sz := int64(backwardsBufferSize) // offset := pos - sz // if offset < 0 { // sz = pos // offset = 0 // } // pos, err = f.Seek(offset, 0) // if err != nil { // return nil, err // } // nbuf := make([]byte, int(sz)) // _, err = io.ReadFull(f, nbuf) // if err != nil { // return nil, err // } // if len(buf) > 0 { // nbuf = append(nbuf, buf...) // } // buf = nbuf // return readPreviousCommand() // } // var tx *bolt.Tx // tx, err = db.Begin(true) // if err != nil { // return // } // defer func() { // tx.Rollback() // }() // var keyIgnoreM = map[string]bool{} // var keyBucketM = btree.New(16) // var cmd, key, id, field string // var line string // var command []byte // var val []byte // var b *bolt.Bucket // reading: // for i := 0; ; i++ { // if i%500 == 0 { // if err = tx.Commit(); err != nil { // return // } // tx, err = db.Begin(true) // if err != nil { // return // } // } // command, err = readPreviousCommand() // if err != nil { // if err == io.EOF { // err = nil // break // } // return // } // // quick path // if len(command) == 0 { // continue // ignore blank commands // } // line, cmd = token(string(command)) // cmd = strings.ToLower(cmd) // switch cmd { // case "flushdb": // break reading // all done // case "drop": // if line, key = token(line); key == "" { // err = errors.New("DROP is missing key") // return // } // if !keyIgnoreM[key] { // keyIgnoreM[key] = true // } // case "del": // if line, key = token(line); key == "" { // err = errors.New("DEL is missing key") // return // } // if keyIgnoreM[key] { // continue // ignore // } // if line, id = token(line); id == "" { // err = errors.New("DEL is missing id") // return // } // if keyBucketM.Get(&treeKeyBoolT{key}) == nil { // if _, err = tx.CreateBucket([]byte(key + ".ids")); err != nil { // return // } // if _, err = tx.CreateBucket([]byte(key + ".ignore_ids")); err != nil { // return // } // keyBucketM.ReplaceOrInsert(&treeKeyBoolT{key}) // } // b = tx.Bucket([]byte(key + ".ignore_ids")) // err = b.Put([]byte(id), []byte("2")) // 2 for hard ignore // if err != nil { // return // } // case "set": // if line, key = token(line); key == "" { // err = errors.New("SET is missing key") // return // } // if keyIgnoreM[key] { // continue // ignore // } // if line, id = token(line); id == "" { // err = errors.New("SET is missing id") // return // } // if keyBucketM.Get(&treeKeyBoolT{key}) == nil { // if _, err = tx.CreateBucket([]byte(key + ".ids")); err != nil { // return // } // if _, err = tx.CreateBucket([]byte(key + ".ignore_ids")); err != nil { // return // } // keyBucketM.ReplaceOrInsert(&treeKeyBoolT{key}) // } // b = tx.Bucket([]byte(key + ".ignore_ids")) // val = b.Get([]byte(id)) // if val == nil { // if err = b.Put([]byte(id), []byte("1")); err != nil { // return // } // b = tx.Bucket([]byte(key + ".ids")) // if err = b.Put([]byte(id), command); err != nil { // return // } // } else { // switch string(val) { // default: // err = errors.New("invalid ignore") // case "1", "2": // continue // ignore // } // } // case "fset": // if line, key = token(line); key == "" { // err = errors.New("FSET is missing key") // return // } // if keyIgnoreM[key] { // continue // ignore // } // if line, id = token(line); id == "" { // err = errors.New("FSET is missing id") // return // } // if line, field = token(line); field == "" { // err = errors.New("FSET is missing field") // return // } // if keyBucketM.Get(&treeKeyBoolT{key}) == nil { // if _, err = tx.CreateBucket([]byte(key + ".ids")); err != nil { // return // } // if _, err = tx.CreateBucket([]byte(key + ".ignore_ids")); err != nil { // return // } // keyBucketM.ReplaceOrInsert(&treeKeyBoolT{key}) // } // b = tx.Bucket([]byte(key + ".ignore_ids")) // val = b.Get([]byte(id)) // if val == nil { // b = tx.Bucket([]byte(key + ":" + id + ":0")) // if b == nil { // if b, err = tx.CreateBucket([]byte(key + ":" + id + ":0")); err != nil { // return // } // } // if b.Get([]byte(field)) == nil { // if err = b.Put([]byte(field), command); err != nil { // return // } // } // } else { // switch string(val) { // default: // err = errors.New("invalid ignore") // case "1": // b = tx.Bucket([]byte(key + ":" + id + ":1")) // if b == nil { // if b, err = tx.CreateBucket([]byte(key + ":" + id + ":1")); err != nil { // return // } // } // if b.Get([]byte(field)) == nil { // if err = b.Put([]byte(field), command); err != nil { // return // } // } // case "2": // continue // ignore // } // } // } // } // if err = tx.Commit(); err != nil { // return // } // tx, err = db.Begin(false) // if err != nil { // return // } // keyBucketM.Ascend(func(item btree.Item) bool { // key := item.(*treeKeyBoolT).key // b := tx.Bucket([]byte(key + ".ids")) // if b != nil { // err = b.ForEach(func(id, command []byte) error { // // parse the SET command // _, fields, values, etype, eargs, err := c.parseSetArgs(string(command[4:])) // if err != nil { // return err // } // // store the fields in a map // var fieldm = map[string]float64{} // for i, field := range fields { // fieldm[field] = values[i] // } // // append old FSET values. these are FSETs that existed prior to the last SET. // f1 := tx.Bucket([]byte(key + ":" + string(id) + ":1")) // if f1 != nil { // err = f1.ForEach(func(field, command []byte) error { // d, err := c.parseFSetArgs(string(command[5:])) // if err != nil { // return err // } // if _, ok := fieldm[d.field]; !ok { // fieldm[d.field] = d.value // } // return nil // }) // if err != nil { // return err // } // } // // append new FSET values. these are FSETs that were added after the last SET. // f0 := tx.Bucket([]byte(key + ":" + string(id) + ":0")) // if f0 != nil { // f0.ForEach(func(field, command []byte) error { // d, err := c.parseFSetArgs(string(command[5:])) // if err != nil { // return err // } // fieldm[d.field] = d.value // return nil // }) // } // // rebuild the SET command // ncommand := "set " + key + " " + string(id) // for field, value := range fieldm { // if value != 0 { // ncommand += " field " + field + " " + strconv.FormatFloat(value, 'f', -1, 64) // } // } // ncommand += " " + strings.ToUpper(etype) + " " + eargs // _, err = writeCommand(nf, []byte(ncommand)) // if err != nil { // return err // } // return nil // }) // if err != nil { // return false // } // } // return true // }) // if err == nil { // // add all of the hooks // for _, line := range hooks { // _, err = writeCommand(nf, []byte(line)) // if err != nil { // return // } // } // } }