refactor binlog

can not compatibility
This commit is contained in:
siddontang 2014-09-01 10:16:20 +08:00
parent 356c2eea17
commit cfa6d86ace
3 changed files with 120 additions and 56 deletions

View File

@ -63,12 +63,12 @@ func main() {
} }
} }
func printEvent(createTime uint32, event []byte) error { func printEvent(head *ledis.BinLogHead, event []byte) error {
if createTime < startTime || createTime > stopTime { if head.CreateTime < startTime || head.CreateTime > stopTime {
return nil return nil
} }
t := time.Unix(int64(createTime), 0) t := time.Unix(int64(head.CreateTime), 0)
fmt.Printf("%s ", t.Format(TimeFormat)) fmt.Printf("%s ", t.Format(TimeFormat))

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/config"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
@ -15,6 +16,65 @@ import (
"time" "time"
) )
type BinLogHead struct {
CreateTime uint32
BatchId uint32
PayloadLen uint32
}
func (h *BinLogHead) Len() int {
return 12
}
func (h *BinLogHead) Write(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, h.CreateTime); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, h.BatchId); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, h.PayloadLen); err != nil {
return err
}
return nil
}
func (h *BinLogHead) handleReadError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
} else {
return err
}
}
func (h *BinLogHead) Read(r io.Reader) error {
var err error
if err = binary.Read(r, binary.BigEndian, &h.CreateTime); err != nil {
return err
}
if err = binary.Read(r, binary.BigEndian, &h.BatchId); err != nil {
return h.handleReadError(err)
}
if err = binary.Read(r, binary.BigEndian, &h.PayloadLen); err != nil {
return h.handleReadError(err)
}
return nil
}
func (h *BinLogHead) InSameBatch(ho *BinLogHead) bool {
if h.CreateTime == ho.CreateTime && h.BatchId == ho.BatchId {
return true
} else {
return false
}
}
/* /*
index file format: index file format:
ledis-bin.00001 ledis-bin.00001
@ -23,7 +83,9 @@ ledis-bin.00003
log file format log file format
timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData Log: Head|PayloadData
Head: createTime|batchId|payloadData
*/ */
@ -41,6 +103,8 @@ type BinLog struct {
indexName string indexName string
logNames []string logNames []string
lastLogIndex int64 lastLogIndex int64
batchId uint32
} }
func NewBinLog(cfg *config.Config) (*BinLog, error) { func NewBinLog(cfg *config.Config) (*BinLog, error) {
@ -49,7 +113,7 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) {
l.cfg = &cfg.BinLog l.cfg = &cfg.BinLog
l.cfg.Adjust() l.cfg.Adjust()
l.path = path.Join(cfg.DataDir, "bin_log") l.path = path.Join(cfg.DataDir, "binlog")
if err := os.MkdirAll(l.path, os.ModePerm); err != nil { if err := os.MkdirAll(l.path, os.ModePerm); err != nil {
return nil, err return nil, err
@ -285,17 +349,17 @@ func (l *BinLog) Log(args ...[]byte) error {
} }
} }
//we treat log many args as a batch, so use same createTime head := &BinLogHead{}
createTime := uint32(time.Now().Unix())
head.CreateTime = uint32(time.Now().Unix())
head.BatchId = l.batchId
l.batchId++
for _, data := range args { for _, data := range args {
payLoadLen := uint32(len(data)) head.PayloadLen = uint32(len(data))
if err := binary.Write(l.logWb, binary.BigEndian, createTime); err != nil { if err := head.Write(l.logWb); err != nil {
return err
}
if err := binary.Write(l.logWb, binary.BigEndian, payLoadLen); err != nil {
return err return err
} }

View File

@ -3,7 +3,6 @@ package ledis
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"encoding/binary"
"errors" "errors"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/store/driver" "github.com/siddontang/ledisdb/store/driver"
@ -11,6 +10,11 @@ import (
"os" "os"
) )
const (
maxReplBatchNum = 100
maxReplLogSize = 1 * 1024 * 1024
)
var ( var (
ErrSkipEvent = errors.New("skip to next event") ErrSkipEvent = errors.New("skip to next event")
) )
@ -23,8 +27,9 @@ var (
type replBatch struct { type replBatch struct {
wb driver.IWriteBatch wb driver.IWriteBatch
events [][]byte events [][]byte
createTime uint32
l *Ledis l *Ledis
lastHead *BinLogHead
} }
func (b *replBatch) Commit() error { func (b *replBatch) Commit() error {
@ -50,7 +55,7 @@ func (b *replBatch) Commit() error {
func (b *replBatch) Rollback() error { func (b *replBatch) Rollback() error {
b.wb.Rollback() b.wb.Rollback()
b.events = [][]byte{} b.events = [][]byte{}
b.createTime = 0 b.lastHead = nil
return nil return nil
} }
@ -100,14 +105,13 @@ func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error {
return nil return nil
} }
func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) error) error { func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
var createTime uint32 head := &BinLogHead{}
var dataLen uint32
var dataBuf bytes.Buffer var dataBuf bytes.Buffer
var err error var err error
for { for {
if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil { if err = head.Read(rb); err != nil {
if err == io.EOF { if err == io.EOF {
break break
} else { } else {
@ -115,15 +119,11 @@ func ReadEventFromReader(rb io.Reader, f func(createTime uint32, event []byte) e
} }
} }
if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil { if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
return err return err
} }
if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil { err = f(head, dataBuf.Bytes())
return err
}
err = f(createTime, dataBuf.Bytes())
if err != nil && err != ErrSkipEvent { if err != nil && err != ErrSkipEvent {
return err return err
} }
@ -140,15 +140,15 @@ func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
b.wb = l.ldb.NewWriteBatch() b.wb = l.ldb.NewWriteBatch()
b.l = l b.l = l
f := func(createTime uint32, event []byte) error { f := func(head *BinLogHead, event []byte) error {
if b.createTime == 0 { if b.lastHead == nil {
b.createTime = createTime b.lastHead = head
} else if b.createTime != createTime { } else if !b.lastHead.InSameBatch(head) {
if err := b.Commit(); err != nil { if err := b.Commit(); err != nil {
log.Fatal("replication error %s, skip to next", err.Error()) log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent return ErrSkipEvent
} }
b.createTime = createTime b.lastHead = head
} }
err := l.replicateEvent(b, event) err := l.replicateEvent(b, event)
@ -240,12 +240,14 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
return return
} }
var lastCreateTime uint32 = 0 var lastHead *BinLogHead = nil
var createTime uint32
var dataLen uint32 head := &BinLogHead{}
batchNum := 0
for { for {
if err = binary.Read(f, binary.BigEndian, &createTime); err != nil { if err = head.Read(f); err != nil {
if err == io.EOF { if err == io.EOF {
//we will try to use next binlog //we will try to use next binlog
if index < l.binlog.LogFileIndex() { if index < l.binlog.LogFileIndex() {
@ -257,32 +259,30 @@ func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
} else { } else {
return return
} }
} }
if lastCreateTime == 0 { if lastHead == nil {
lastCreateTime = createTime lastHead = head
} else if lastCreateTime != createTime { batchNum++
} else if !lastHead.InSameBatch(head) {
lastHead = head
batchNum++
if batchNum > maxReplBatchNum || n > maxReplLogSize {
return
}
}
if err = head.Write(w); err != nil {
return return
} }
if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil { if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
return return
} }
if err = binary.Write(w, binary.BigEndian, createTime); err != nil { n += (head.Len() + int(head.PayloadLen))
return info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
}
if err = binary.Write(w, binary.BigEndian, dataLen); err != nil {
return
}
if _, err = io.CopyN(w, f, int64(dataLen)); err != nil {
return
}
n += (8 + int(dataLen))
info.LogPos = info.LogPos + 8 + int64(dataLen)
} }
return return