diff --git a/cmd/ledis-binlog/main.go b/cmd/ledis-binlog/main.go index 7212b0e..3725920 100644 --- a/cmd/ledis-binlog/main.go +++ b/cmd/ledis-binlog/main.go @@ -63,12 +63,12 @@ func main() { } } -func printEvent(createTime uint32, event []byte) error { - if createTime < startTime || createTime > stopTime { +func printEvent(head *ledis.BinLogHead, event []byte) error { + if head.CreateTime < startTime || head.CreateTime > stopTime { return nil } - t := time.Unix(int64(createTime), 0) + t := time.Unix(int64(head.CreateTime), 0) fmt.Printf("%s ", t.Format(TimeFormat)) diff --git a/ledis/binlog.go b/ledis/binlog.go index 3bdf50a..6eb0c30 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/config" + "io" "io/ioutil" "os" "path" @@ -15,6 +16,65 @@ import ( "time" ) +type BinLogHead struct { + CreateTime uint32 + BatchId uint32 + PayloadLen uint32 +} + +func (h *BinLogHead) Len() int { + return 12 +} + +func (h *BinLogHead) Write(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil { + return err + } + + return nil +} + +func (h *BinLogHead) handleReadError(err error) error { + if err == io.EOF { + return io.ErrUnexpectedEOF + } else { + return err + } +} + +func (h *BinLogHead) Read(r io.Reader) error { + var err error + if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil { + return err + } + + if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil { + return h.handleReadError(err) + } + + if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil { + return h.handleReadError(err) + } + + return nil +} + +func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool { + if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId { + return true + } else { + return false + } +} + /* index file format: ledis-bin.00001 @@ -23,7 +83,9 @@ ledis-bin.00003 log file format -timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData +Log: Head|PayloadData + +Head: createTime|batchId|payloadData */ @@ -41,6 +103,8 @@ type BinLog struct { indexName string logNames []string lastLogIndex int64 + + batchId uint32 } func NewBinLog(cfg *config.Config) (*BinLog, error) { @@ -49,7 +113,7 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) { l.cfg = &cfg.BinLog l.cfg.Adjust() - l.path = path.Join(cfg.DataDir, "bin_log") + l.path = path.Join(cfg.DataDir, "binlog") if err := os.MkdirAll(l.path, os.ModePerm); err != nil { return nil, err @@ -285,17 +349,17 @@ func (l *BinLog) Log(args ...[]byte) error { } } - //we treat log many args as a batch, so use same createTime - createTime := uint32(time.Now().Unix()) + head := &BinLogHead{} + + head.CreateTime = uint32(time.Now().Unix()) + head.BatchId = l.batchId + + l.batchId++ for _, data := range args { - payLoadLen := uint32(len(data)) + head.PayloadLen = uint32(len(data)) - if err := binary.Write(l.logWb, binary.BigEndian, createTime); err != nil { - return err - } - - if err := binary.Write(l.logWb, binary.BigEndian, payLoadLen); err != nil { + if err := head.Write(l.logWb); err != nil { return err } diff --git a/ledis/replication.go b/ledis/replication.go index 2b19cfe..2d8db48 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -3,7 +3,6 @@ package ledis import ( "bufio" "bytes" - "encoding/binary" "errors" "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/store/driver" @@ -11,6 +10,11 @@ import ( "os" ) +const ( + maxReplBatchNum = 100 + maxReplLogSize = 1 * 1024 * 1024 +) + var ( ErrSkipEvent = errors.New("skip to next event") ) @@ -21,10 +25,11 @@ var ( ) type replBatch struct { - wb driver.IWriteBatch - events [][]byte - createTime uint32 - l *Ledis + wb driver.IWriteBatch + events [][]byte + l *Ledis + + lastHead *BinLogHead } func (b *replBatch) Commit() error { @@ -50,7 +55,7 @@ func (b *replBatch) Commit() error { func (b *replBatch) Rollback() error { b.wb.Rollback() b.events = [][]byte{} - b.createTime = 0 + b.lastHead = nil return nil } @@ -100,14 +105,13 @@ func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error { return nil } -func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { - var createTime uint32 - var dataLen uint32 +func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { + head := &BinLogHead{} var dataBuf bytes.Buffer var err error for { - if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil { + if err = head.Read(rb); err != nil { if err == io.EOF { break } else { @@ -115,15 +119,11 @@ func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) e } } - if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil { + if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { return err } - if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil { - return err - } - - err = f(createTime, dataBuf.Bytes()) + err = f(head, dataBuf.Bytes()) if err != nil && err != ErrSkipEvent { return err } @@ -140,15 +140,15 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error { b.wb = l.ldb.NewWriteBatch() b.l = l - f := func(createTime uint32, event []byte) error { - if b.createTime == 0 { - b.createTime = createTime - } else if b.createTime != createTime { + f := func(head *BinLogHead, event []byte) error { + if b.lastHead == nil { + b.lastHead = head + } else if !b.lastHead.InSameBatch(head) { if err := b.Commit(); err != nil { log.Fatal("replication error %s, skip to next", err.Error()) return ErrSkipEvent } - b.createTime = createTime + b.lastHead = head } err := l.replicateEvent(b, event) @@ -240,12 +240,14 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { return } - var lastCreateTime uint32 = 0 - var createTime uint32 - var dataLen uint32 + var lastHead *BinLogHead = nil + + head := &BinLogHead{} + + batchNum := 0 for { - if err = binary.Read(f, binary.BigEndian, &createTime); err != nil { + if err = head.Read(f); err != nil { if err == io.EOF { //we will try to use next binlog if index < l.binlog.LogFileIndex() { @@ -257,32 +259,30 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { } else { return } + } - if lastCreateTime == 0 { - lastCreateTime = createTime - } else if lastCreateTime != createTime { + if lastHead == nil { + lastHead = head + batchNum++ + } else if !lastHead.InSameBatch(head) { + lastHead = head + batchNum++ + if batchNum > maxReplBatchNum || n > maxReplLogSize { + return + } + } + + if err = head.Write(w); err != nil { return } - if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil { + if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { return } - if err = binary.Write(w, binary.BigEndian, createTime); err != nil { - return - } - - if err = binary.Write(w, binary.BigEndian, dataLen); err != nil { - return - } - - if _, err = io.CopyN(w, f, int64(dataLen)); err != nil { - return - } - - n += (8 + int(dataLen)) - info.LogPos = info.LogPos + 8 + int64(dataLen) + n += (head.Len() + int(head.PayloadLen)) + info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) } return