package ledis import ( "bufio" "bytes" "errors" "github.com/siddontang/go-log/log" "github.com/siddontang/ledisdb/store/driver" "io" "os" "time" ) const ( maxReplBatchNum = 100 maxReplLogSize = 1 * 1024 * 1024 ) var ( ErrSkipEvent = errors.New("skip to next event") ) var ( errInvalidBinLogEvent = errors.New("invalid binglog event") errInvalidBinLogFile = errors.New("invalid binlog file") ) type replBatch struct { wb driver.IWriteBatch events [][]byte l *Ledis lastHead *BinLogHead } func (b *replBatch) Commit() error { b.l.commitLock.Lock() defer b.l.commitLock.Unlock() err := b.wb.Commit() if err != nil { b.Rollback() return err } if b.l.binlog != nil { if err = b.l.binlog.Log(b.events...); err != nil { b.Rollback() return err } } b.events = [][]byte{} b.lastHead = nil return nil } func (b *replBatch) Rollback() error { b.wb.Rollback() b.events = [][]byte{} b.lastHead = nil return nil } func (l *Ledis) replicateEvent(b *replBatch, event []byte) error { if len(event) == 0 { return errInvalidBinLogEvent } b.events = append(b.events, event) logType := uint8(event[0]) switch logType { case BinLogTypePut: return l.replicatePutEvent(b, event) case BinLogTypeDeletion: return l.replicateDeleteEvent(b, event) default: return errInvalidBinLogEvent } } func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error { key, value, err := decodeBinLogPut(event) if err != nil { return err } b.wb.Put(key, value) return nil } func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error { key, err := decodeBinLogDelete(event) if err != nil { return err } b.wb.Delete(key) return nil } func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { head := &BinLogHead{} var err error for { if err = head.Read(rb); err != nil { if err == io.EOF { break } else { return err } } var dataBuf bytes.Buffer if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { return err } err = f(head, dataBuf.Bytes()) if err != nil && err != ErrSkipEvent { return err } } return nil } func (l *Ledis) ReplicateFromReader(rb io.Reader) error { b := new(replBatch) b.wb = l.ldb.NewWriteBatch() b.l = l f := func(head *BinLogHead, event []byte) error { if b.lastHead == nil { b.lastHead = head } else if !b.lastHead.InSameBatch(head) { if err := b.Commit(); err != nil { log.Fatal("replication error %s, skip to next", err.Error()) return ErrSkipEvent } b.lastHead = head } err := l.replicateEvent(b, event) if err != nil { log.Fatal("replication error %s, skip to next", err.Error()) return ErrSkipEvent } return nil } err := ReadEventFromReader(rb, f) if err != nil { b.Rollback() return err } return b.Commit() } func (l *Ledis) ReplicateFromData(data []byte) error { rb := bytes.NewReader(data) err := l.ReplicateFromReader(rb) return err } func (l *Ledis) ReplicateFromBinLog(filePath string) error { f, err := os.Open(filePath) if err != nil { return err } rb := bufio.NewReaderSize(f, 4096) err = l.ReplicateFromReader(rb) f.Close() return err } // try to read events, if no events read, try to wait the new event singal until timeout seconds func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { lastIndex := info.LogFileIndex lastPos := info.LogPos n = 0 if l.binlog == nil { //binlog not supported info.LogFileIndex = 0 info.LogPos = 0 return } n, err = l.ReadEventsTo(info, w) if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { //no events read select { case <-l.binlog.Wait(): case <-time.After(time.Duration(timeout) * time.Second): } return l.ReadEventsTo(info, w) } return } func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { n = 0 if l.binlog == nil { //binlog not supported info.LogFileIndex = 0 info.LogPos = 0 return } index := info.LogFileIndex offset := info.LogPos filePath := l.binlog.FormatLogFilePath(index) var f *os.File f, err = os.Open(filePath) if os.IsNotExist(err) { lastIndex := l.binlog.LogFileIndex() if index == lastIndex { //no binlog at all info.LogPos = 0 } else { //slave binlog info had lost info.LogFileIndex = -1 } } if err != nil { if os.IsNotExist(err) { err = nil } return } defer f.Close() var fileSize int64 st, _ := f.Stat() fileSize = st.Size() if fileSize == info.LogPos { return } if _, err = f.Seek(offset, os.SEEK_SET); err != nil { //may be invliad seek offset return } var lastHead *BinLogHead = nil head := &BinLogHead{} batchNum := 0 for { if err = head.Read(f); err != nil { if err == io.EOF { //we will try to use next binlog if index < l.binlog.LogFileIndex() { info.LogFileIndex += 1 info.LogPos = 0 } err = nil return } else { return } } if lastHead == nil { lastHead = head batchNum++ } else if !lastHead.InSameBatch(head) { lastHead = head batchNum++ if batchNum > maxReplBatchNum || n > maxReplLogSize { return } } if err = head.Write(w); err != nil { return } if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { return } n += (head.Len() + int(head.PayloadLen)) info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) } return }