read backward multibulk

This commit is contained in:
Josh Baker 2016-03-31 17:42:22 -07:00
parent 9ece493a8e
commit 4a15a20e27
1 changed files with 51 additions and 58 deletions

View File

@ -2,19 +2,17 @@ package controller
import (
"crypto/md5"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"time"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/core"
)
const backwardsBufferSize = 50000
// checksum performs a simple md5 checksum on the aof file
func (c *Controller) checksum(pos, size int64) (sum string, err error) {
if pos+size > int64(c.aofsz) {
@ -63,6 +61,10 @@ func connAOFMD5(conn *Conn, pos, size int64) (sum string, err error) {
return "", err
}
if v.Error() != nil {
errmsg := v.Error().Error()
if errmsg == "ERR EOF" || errmsg == "EOF" {
return "", io.EOF
}
return "", v.Error()
}
sum = v.String()
@ -90,6 +92,50 @@ func (c *Controller) matchChecksums(conn *Conn, pos, size int64) (match bool, er
return csum == sum, nil
}
// getEndOfLastValuePositionInFile is a very slow operation because it reads the file
// backwards on byte at a time. Eek. It seek+read, seek+read, etc.
func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error) {
pos := startPos
f, err := os.Open(fname)
if err != nil {
return 0, err
}
defer f.Close()
readByte := func() (byte, error) {
if pos <= 0 {
return 0, io.EOF
}
pos--
if _, err := f.Seek(pos, 0); err != nil {
return 0, err
}
b := make([]byte, 1)
if n, err := f.Read(b); err != nil {
return 0, err
} else if n != 1 {
return 0, errors.New("invalid read")
}
return b[0], nil
}
for {
c, err := readByte()
if err != nil {
return 0, err
}
if c == '*' {
if _, err := f.Seek(pos, 0); err != nil {
return 0, err
}
rd := resp.NewReader(f)
_, telnet, n, err := rd.ReadMultiBulk()
if err != nil || telnet {
continue // keep reading backwards
}
return pos + int64(n), nil
}
}
}
// followCheckSome is not a full checksum. It just "checks some" data.
// We will do some various checksums on the leader until we find the correct position to start at.
func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, err error) {
@ -118,6 +164,7 @@ func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, er
if err != nil {
return 0, err
}
if match {
min += checksumsz // bump up the min
for {
@ -152,63 +199,10 @@ func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, er
// we want to truncate at a command location
// search for nearest command
f, err := os.Open(c.f.Name())
pos, err = getEndOfLastValuePositionInFile(c.f.Name(), fullpos)
if err != nil {
return 0, err
}
defer f.Close()
if _, err := f.Seek(pos, 0); err != nil {
return 0, err
}
// need to read backwards looking for null byte
const bufsz = backwardsBufferSize
buf := make([]byte, bufsz)
outer:
for {
if pos < int64(len(buf)) {
pos = 0
break
}
if _, err := f.Seek(pos-bufsz, 0); err != nil {
return 0, err
}
if _, err := io.ReadFull(f, buf); err != nil {
return 0, err
}
for i := len(buf) - 1; i >= 0; i-- {
if buf[i] == 0 {
tpos := pos - bufsz + int64(i) - 4
if tpos < 0 {
pos = 0
break outer // at beginning of file
}
if _, err := f.Seek(tpos, 0); err != nil {
return 0, err
}
szb := make([]byte, 4)
if _, err := io.ReadFull(f, szb); err != nil {
return 0, err
}
sz2 := int64(binary.LittleEndian.Uint32(szb))
tpos = tpos - sz2 - 4
if tpos < 0 {
continue // keep scanning
}
if _, err := f.Seek(tpos, 0); err != nil {
return 0, err
}
if _, err := io.ReadFull(f, szb); err != nil {
return 0, err
}
sz1 := int64(binary.LittleEndian.Uint32(szb))
if sz1 == sz2 {
pos = pos - bufsz + int64(i) + 1
break outer // we found our match
}
}
}
pos -= bufsz
}
if pos == fullpos {
if core.ShowDebugMessages {
log.Debug("follow: aof fully intact")
@ -217,7 +211,6 @@ outer:
}
log.Warnf("truncating aof to %d", pos)
// any errror below are fatal.
f.Close()
c.f.Close()
if err := os.Truncate(fname, pos); err != nil {
log.Fatalf("could not truncate aof, possible data loss. %s", err.Error())