diff --git a/controller/checksum.go b/controller/checksum.go index 685c7c87..e9c047dd 100644 --- a/controller/checksum.go +++ b/controller/checksum.go @@ -2,19 +2,17 @@ package controller import ( "crypto/md5" - "encoding/binary" "errors" "fmt" "io" "os" "time" + "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/core" ) -const backwardsBufferSize = 50000 - // checksum performs a simple md5 checksum on the aof file func (c *Controller) checksum(pos, size int64) (sum string, err error) { if pos+size > int64(c.aofsz) { @@ -63,6 +61,10 @@ func connAOFMD5(conn *Conn, pos, size int64) (sum string, err error) { return "", err } if v.Error() != nil { + errmsg := v.Error().Error() + if errmsg == "ERR EOF" || errmsg == "EOF" { + return "", io.EOF + } return "", v.Error() } sum = v.String() @@ -90,6 +92,50 @@ func (c *Controller) matchChecksums(conn *Conn, pos, size int64) (match bool, er return csum == sum, nil } +// getEndOfLastValuePositionInFile is a very slow operation because it reads the file +// backwards on byte at a time. Eek. It seek+read, seek+read, etc. +func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error) { + pos := startPos + f, err := os.Open(fname) + if err != nil { + return 0, err + } + defer f.Close() + readByte := func() (byte, error) { + if pos <= 0 { + return 0, io.EOF + } + pos-- + if _, err := f.Seek(pos, 0); err != nil { + return 0, err + } + b := make([]byte, 1) + if n, err := f.Read(b); err != nil { + return 0, err + } else if n != 1 { + return 0, errors.New("invalid read") + } + return b[0], nil + } + for { + c, err := readByte() + if err != nil { + return 0, err + } + if c == '*' { + if _, err := f.Seek(pos, 0); err != nil { + return 0, err + } + rd := resp.NewReader(f) + _, telnet, n, err := rd.ReadMultiBulk() + if err != nil || telnet { + continue // keep reading backwards + } + return pos + int64(n), nil + } + } +} + // followCheckSome is not a full checksum. It just "checks some" data. // We will do some various checksums on the leader until we find the correct position to start at. func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, err error) { @@ -118,6 +164,7 @@ func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, er if err != nil { return 0, err } + if match { min += checksumsz // bump up the min for { @@ -152,63 +199,10 @@ func (c *Controller) followCheckSome(addr string, followc uint64) (pos int64, er // we want to truncate at a command location // search for nearest command - f, err := os.Open(c.f.Name()) + pos, err = getEndOfLastValuePositionInFile(c.f.Name(), fullpos) if err != nil { return 0, err } - defer f.Close() - if _, err := f.Seek(pos, 0); err != nil { - return 0, err - } - // need to read backwards looking for null byte - const bufsz = backwardsBufferSize - buf := make([]byte, bufsz) -outer: - for { - if pos < int64(len(buf)) { - pos = 0 - break - } - if _, err := f.Seek(pos-bufsz, 0); err != nil { - return 0, err - } - if _, err := io.ReadFull(f, buf); err != nil { - return 0, err - } - for i := len(buf) - 1; i >= 0; i-- { - if buf[i] == 0 { - tpos := pos - bufsz + int64(i) - 4 - if tpos < 0 { - pos = 0 - break outer // at beginning of file - } - if _, err := f.Seek(tpos, 0); err != nil { - return 0, err - } - szb := make([]byte, 4) - if _, err := io.ReadFull(f, szb); err != nil { - return 0, err - } - sz2 := int64(binary.LittleEndian.Uint32(szb)) - tpos = tpos - sz2 - 4 - if tpos < 0 { - continue // keep scanning - } - if _, err := f.Seek(tpos, 0); err != nil { - return 0, err - } - if _, err := io.ReadFull(f, szb); err != nil { - return 0, err - } - sz1 := int64(binary.LittleEndian.Uint32(szb)) - if sz1 == sz2 { - pos = pos - bufsz + int64(i) + 1 - break outer // we found our match - } - } - } - pos -= bufsz - } if pos == fullpos { if core.ShowDebugMessages { log.Debug("follow: aof fully intact") @@ -217,7 +211,6 @@ outer: } log.Warnf("truncating aof to %d", pos) // any errror below are fatal. - f.Close() c.f.Close() if err := os.Truncate(fname, pos); err != nil { log.Fatalf("could not truncate aof, possible data loss. %s", err.Error())