diff --git a/controller/aof.go b/controller/aof.go index 27c0f53e..089fde60 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -2,7 +2,6 @@ package controller import ( "bufio" - "encoding/binary" "errors" "fmt" "io" @@ -19,19 +18,6 @@ import ( "github.com/tidwall/tile38/controller/server" ) -const backwardsBufferSize = 50000 - -var errCorruptedAOF = errors.New("corrupted aof file") - -type AOFReader struct { - r io.Reader // reader - rerr error // read error - chunk []byte // chunk buffer - buf []byte // main buffer - l int // length of valid data in buffer - p int // pointer -} - type errAOFHook struct { err error } @@ -40,62 +26,6 @@ func (err errAOFHook) Error() string { return fmt.Sprintf("hook: %v", err.err) } -func (rd *AOFReader) ReadCommand() ([]byte, error) { - if rd.l >= 4 { - sz1 := int(binary.LittleEndian.Uint32(rd.buf[rd.p:])) - if rd.l >= sz1+9 { - // we have enough data for a record - sz2 := int(binary.LittleEndian.Uint32(rd.buf[rd.p+4+sz1:])) - if sz2 != sz1 || rd.buf[rd.p+4+sz1+4] != 0 { - return nil, errCorruptedAOF - } - buf := rd.buf[rd.p+4 : rd.p+4+sz1] - rd.p += sz1 + 9 - rd.l -= sz1 + 9 - return buf, nil - } - } - // need more data - if rd.rerr != nil { - if rd.rerr == io.EOF { - rd.rerr = nil // we want to return EOF, but we want to be able to try again - if rd.l != 0 { - return nil, io.ErrUnexpectedEOF - } - return nil, io.EOF - } - return nil, rd.rerr - } - if rd.p != 0 { - // move p to the beginning - copy(rd.buf, rd.buf[rd.p:rd.p+rd.l]) - rd.p = 0 - } - var n int - n, rd.rerr = rd.r.Read(rd.chunk) - if n > 0 { - cbuf := rd.chunk[:n] - if len(rd.buf)-rd.l < n { - if len(rd.buf) == 0 { - rd.buf = make([]byte, len(cbuf)) - copy(rd.buf, cbuf) - } else { - copy(rd.buf[rd.l:], cbuf[:len(rd.buf)-rd.l]) - rd.buf = append(rd.buf, cbuf[len(rd.buf)-rd.l:]...) - } - } else { - copy(rd.buf[rd.l:], cbuf) - } - rd.l += n - } - return rd.ReadCommand() -} - -func NewAOFReader(r io.Reader) *AOFReader { - rd := &AOFReader{r: r, chunk: make([]byte, 0xFFFF)} - return rd -} - func (c *Controller) loadAOF() error { start := time.Now() var count int diff --git a/controller/aofmigrate.go b/controller/aofmigrate.go new file mode 100644 index 00000000..10d3b60c --- /dev/null +++ b/controller/aofmigrate.go @@ -0,0 +1,119 @@ +package controller + +import ( + "encoding/binary" + "errors" + "io" + "os" + "path" +) + +var errCorruptedAOF = errors.New("corrupted aof file") + +type LegacyAOFReader struct { + r io.Reader // reader + rerr error // read error + chunk []byte // chunk buffer + buf []byte // main buffer + l int // length of valid data in buffer + p int // pointer +} + +func (rd *LegacyAOFReader) ReadCommand() ([]byte, error) { + if rd.l >= 4 { + sz1 := int(binary.LittleEndian.Uint32(rd.buf[rd.p:])) + if rd.l >= sz1+9 { + // we have enough data for a record + sz2 := int(binary.LittleEndian.Uint32(rd.buf[rd.p+4+sz1:])) + if sz2 != sz1 || rd.buf[rd.p+4+sz1+4] != 0 { + return nil, errCorruptedAOF + } + buf := rd.buf[rd.p+4 : rd.p+4+sz1] + rd.p += sz1 + 9 + rd.l -= sz1 + 9 + return buf, nil + } + } + // need more data + if rd.rerr != nil { + if rd.rerr == io.EOF { + rd.rerr = nil // we want to return EOF, but we want to be able to try again + if rd.l != 0 { + return nil, io.ErrUnexpectedEOF + } + return nil, io.EOF + } + return nil, rd.rerr + } + if rd.p != 0 { + // move p to the beginning + copy(rd.buf, rd.buf[rd.p:rd.p+rd.l]) + rd.p = 0 + } + var n int + n, rd.rerr = rd.r.Read(rd.chunk) + if n > 0 { + cbuf := rd.chunk[:n] + if len(rd.buf)-rd.l < n { + if len(rd.buf) == 0 { + rd.buf = make([]byte, len(cbuf)) + copy(rd.buf, cbuf) + } else { + copy(rd.buf[rd.l:], cbuf[:len(rd.buf)-rd.l]) + rd.buf = append(rd.buf, cbuf[len(rd.buf)-rd.l:]...) + } + } else { + copy(rd.buf[rd.l:], cbuf) + } + rd.l += n + } + return rd.ReadCommand() +} + +func NewLegacyAOFReader(r io.Reader) *LegacyAOFReader { + rd := &LegacyAOFReader{r: r, chunk: make([]byte, 0xFFFF)} + return rd +} + +func (c *Controller) migrateAOF() error { + _, err := os.Stat(path.Join(c.dir, "appendonly.aof")) + if err == nil { + return nil + } + if !os.IsNotExist(err) { + return err + } + _, err = os.Stat(path.Join(c.dir, "aof")) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + newf, err := os.Create(path.Join(c.dir, "migrate.aof")) + if err != nil { + return err + } + defer newf.Close() + + oldf, err := os.Open(path.Join(c.dir, "aof")) + if err != nil { + return err + } + defer oldf.Close() + + rd := NewLegacyAOFReader(newf) + for { + cmd, err := rd.ReadCommand() + if err != nil { + if err == io.EOF { + break + } + return err + } + println(string(cmd)) + } + return errors.New("unsupported") + return nil +} diff --git a/controller/aofshrink.go b/controller/aofshrink.go index e5a2bedc..1a1ababb 100644 --- a/controller/aofshrink.go +++ b/controller/aofshrink.go @@ -102,11 +102,11 @@ func (c *Controller) aofshrink() { // swap files f.Close() c.f.Close() - err = os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "aof")) + err = os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")) if err != nil { log.Fatal("shink rename fatal operation") } - c.f, err = os.OpenFile(path.Join(c.dir, "aof"), os.O_CREATE|os.O_RDWR, 0600) + c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600) if err != nil { log.Fatal("shink openfile fatal operation") } diff --git a/controller/checksum.go b/controller/checksum.go index b2df1c9d..cc695add 100644 --- a/controller/checksum.go +++ b/controller/checksum.go @@ -14,6 +14,8 @@ import ( "github.com/tidwall/tile38/core" ) +const backwardsBufferSize = 50000 + // checksum performs a simple md5 checksum on the aof file func (c *Controller) checksum(pos, size int64) (sum string, err error) { if pos+size > int64(c.aofsz) { diff --git a/controller/controller.go b/controller/controller.go index fd71c559..d9f9e487 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "path" "runtime" "strings" "sync" @@ -87,7 +88,10 @@ func ListenAndServe(host string, port int, dir string) error { if err := c.loadConfig(); err != nil { return err } - f, err := os.OpenFile(dir+"/aof", os.O_CREATE|os.O_RDWR, 0600) + if err := c.migrateAOF(); err != nil { + return err + } + f, err := os.OpenFile(path.Join(dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600) if err != nil { return err } diff --git a/controller/follow.go b/controller/follow.go index 2a1440e9..ea437442 100644 --- a/controller/follow.go +++ b/controller/follow.go @@ -193,7 +193,7 @@ func (c *Controller) followStep(host string, port int, followc uint64) error { log.Info("caught up") } nullw := ioutil.Discard - rd := NewAOFReader(conn.Reader()) + rd := NewLegacyAOFReader(conn.Reader()) for { buf, err := rd.ReadCommand() if err != nil {