tile38/internal/server/checksum.go

237 lines
5.0 KiB
Go
Raw Permalink Normal View History

package server
2016-03-05 02:08:16 +03:00
import (
"crypto/md5"
"errors"
"fmt"
"io"
"os"
2016-04-01 02:26:36 +03:00
"time"
2016-03-05 02:08:16 +03:00
2016-04-01 03:42:22 +03:00
"github.com/tidwall/resp"
"github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/internal/log"
2016-03-05 02:08:16 +03:00
)
// checksum performs a simple md5 checksum on the aof file
func (c *Server) checksum(pos, size int64) (sum string, err error) {
2016-03-19 17:16:19 +03:00
if pos+size > int64(c.aofsz) {
return "", io.EOF
}
2016-03-05 02:08:16 +03:00
var f *os.File
2017-09-30 21:06:10 +03:00
f, err = os.Open(c.aof.Name())
2016-03-05 02:08:16 +03:00
if err != nil {
return
}
defer f.Close()
2016-04-01 02:26:36 +03:00
sumr := md5.New()
2016-03-05 02:08:16 +03:00
err = func() error {
if size == 0 {
2016-03-19 17:16:19 +03:00
n, err := f.Seek(int64(c.aofsz), 0)
2016-03-05 02:08:16 +03:00
if err != nil {
return err
}
if pos >= n {
return io.EOF
}
return nil
}
_, err = f.Seek(pos, 0)
if err != nil {
return err
}
2016-04-01 02:26:36 +03:00
_, err = io.CopyN(sumr, f, size)
2016-03-05 02:08:16 +03:00
if err != nil {
return err
}
return nil
}()
if err != nil {
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
return "", err
}
2016-04-01 02:26:36 +03:00
return fmt.Sprintf("%x", sumr.Sum(nil)), nil
2016-03-05 02:08:16 +03:00
}
func connAOFMD5(conn *RESPConn, pos, size int64) (sum string, err error) {
2016-04-01 02:26:36 +03:00
v, err := conn.Do("aofmd5", pos, size)
2016-03-05 02:08:16 +03:00
if err != nil {
return "", err
}
2016-04-01 02:26:36 +03:00
if v.Error() != nil {
2016-04-01 03:42:22 +03:00
errmsg := v.Error().Error()
if errmsg == "ERR EOF" || errmsg == "EOF" {
return "", io.EOF
}
2016-04-01 02:26:36 +03:00
return "", v.Error()
2016-03-05 02:08:16 +03:00
}
2016-04-01 02:26:36 +03:00
sum = v.String()
if len(sum) != 32 {
2016-03-05 02:08:16 +03:00
return "", errors.New("checksum not ok")
}
2016-04-01 02:26:36 +03:00
return sum, nil
2016-03-05 02:08:16 +03:00
}
func (c *Server) matchChecksums(conn *RESPConn, pos, size int64) (match bool, err error) {
2016-03-05 02:08:16 +03:00
sum, err := c.checksum(pos, size)
if err != nil {
if err == io.EOF {
return false, nil
}
return false, err
}
csum, err := connAOFMD5(conn, pos, size)
if err != nil {
if err == io.EOF {
return false, nil
}
return false, err
}
return csum == sum, nil
}
2016-04-01 03:42:22 +03:00
// getEndOfLastValuePositionInFile is a very slow operation because it reads the file
// backwards on byte at a time. Eek. It seek+read, seek+read, etc.
func getEndOfLastValuePositionInFile(fname string, startPos int64) (int64, error) {
pos := startPos
f, err := os.Open(fname)
if err != nil {
return 0, err
}
defer f.Close()
readByte := func() (byte, error) {
if pos <= 0 {
return 0, io.EOF
}
pos--
if _, err := f.Seek(pos, 0); err != nil {
return 0, err
}
b := make([]byte, 1)
if n, err := f.Read(b); err != nil {
return 0, err
} else if n != 1 {
return 0, errors.New("invalid read")
}
return b[0], nil
}
for {
c, err := readByte()
if err != nil {
return 0, err
}
if c == '*' {
if _, err := f.Seek(pos, 0); err != nil {
return 0, err
}
rd := resp.NewReader(f)
_, telnet, n, err := rd.ReadMultiBulk()
if err != nil || telnet {
continue // keep reading backwards
}
return pos + int64(n), nil
}
}
}
2016-03-05 02:08:16 +03:00
// followCheckSome is not a full checksum. It just "checks some" data.
// We will do some various checksums on the leader until we find the correct position to start at.
func (c *Server) followCheckSome(addr string, followc int) (pos int64, err error) {
2016-03-06 17:55:00 +03:00
if core.ShowDebugMessages {
2016-03-05 02:08:16 +03:00
log.Debug("follow:", addr, ":check some")
}
c.mu.Lock()
defer c.mu.Unlock()
2017-09-30 17:34:08 +03:00
if c.followc.get() != followc {
2016-03-05 02:08:16 +03:00
return 0, errNoLongerFollowing
}
if c.aofsz < checksumsz {
return 0, nil
}
2016-04-01 02:26:36 +03:00
conn, err := DialTimeout(addr, time.Second*2)
2016-03-05 02:08:16 +03:00
if err != nil {
return 0, err
}
defer conn.Close()
2016-04-01 02:26:36 +03:00
2016-03-05 02:08:16 +03:00
min := int64(0)
max := int64(c.aofsz) - checksumsz
limit := int64(c.aofsz)
match, err := c.matchChecksums(conn, min, checksumsz)
if err != nil {
return 0, err
}
2016-04-01 03:42:22 +03:00
2016-03-05 02:08:16 +03:00
if match {
min += checksumsz // bump up the min
for {
if max < min || max+checksumsz > limit {
pos = min
break
} else {
match, err = c.matchChecksums(conn, max, checksumsz)
if err != nil {
return 0, err
}
if match {
min = max + checksumsz
} else {
limit = max
}
max = (limit-min)/2 - checksumsz/2 + min // multiply
}
}
}
fullpos := pos
2017-09-30 21:06:10 +03:00
fname := c.aof.Name()
2016-03-05 02:08:16 +03:00
if pos == 0 {
2017-09-30 21:06:10 +03:00
c.aof.Close()
c.aof, err = os.Create(fname)
2016-03-05 02:08:16 +03:00
if err != nil {
log.Fatalf("could not recreate aof, possible data loss. %s", err.Error())
return 0, err
}
return 0, nil
}
// we want to truncate at a command location
// search for nearest command
2017-09-30 21:06:10 +03:00
pos, err = getEndOfLastValuePositionInFile(c.aof.Name(), fullpos)
2016-03-05 02:08:16 +03:00
if err != nil {
return 0, err
}
if pos == fullpos {
2016-03-06 17:55:00 +03:00
if core.ShowDebugMessages {
2016-03-05 02:08:16 +03:00
log.Debug("follow: aof fully intact")
}
return pos, nil
}
log.Warnf("truncating aof to %d", pos)
// any errror below are fatal.
2017-09-30 21:06:10 +03:00
c.aof.Close()
2016-03-05 02:08:16 +03:00
if err := os.Truncate(fname, pos); err != nil {
log.Fatalf("could not truncate aof, possible data loss. %s", err.Error())
return 0, err
}
2017-09-30 21:06:10 +03:00
c.aof, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600)
2016-03-05 02:08:16 +03:00
if err != nil {
log.Fatalf("could not create aof, possible data loss. %s", err.Error())
return 0, err
}
// reset the entire system.
log.Infof("reloading aof commands")
c.reset()
if err := c.loadAOF(); err != nil {
log.Fatalf("could not reload aof, possible data loss. %s", err.Error())
return 0, err
}
if int64(c.aofsz) != pos {
log.Fatalf("aof size mismatch during reload, possible data loss.")
return 0, errors.New("?")
}
return pos, nil
}