ledisdb/ledis/replication.go

312 lines
5.4 KiB
Go
Raw Normal View History

2014-06-04 10:42:02 +04:00
package ledis
import (
"bufio"
"bytes"
"errors"
"github.com/siddontang/go-log/log"
2014-08-30 13:39:44 +04:00
"github.com/siddontang/ledisdb/store/driver"
2014-06-04 10:42:02 +04:00
"io"
"os"
"time"
2014-06-04 10:42:02 +04:00
)
2014-09-01 06:16:20 +04:00
const (
maxReplBatchNum = 100
maxReplLogSize = 1 * 1024 * 1024
)
2014-07-11 09:28:34 +04:00
var (
ErrSkipEvent = errors.New("skip to next event")
)
2014-06-04 10:42:02 +04:00
var (
2014-06-06 04:30:10 +04:00
errInvalidBinLogEvent = errors.New("invalid binglog event")
2014-06-09 13:23:32 +04:00
errInvalidBinLogFile = errors.New("invalid binlog file")
2014-06-04 10:42:02 +04:00
)
2014-08-30 13:39:44 +04:00
type replBatch struct {
2014-09-01 06:16:20 +04:00
wb driver.IWriteBatch
events [][]byte
l *Ledis
lastHead *BinLogHead
2014-08-30 13:39:44 +04:00
}
func (b *replBatch) Commit() error {
b.l.commitLock.Lock()
defer b.l.commitLock.Unlock()
err := b.wb.Commit()
if err != nil {
b.Rollback()
return err
}
if b.l.binlog != nil {
if err = b.l.binlog.Log(b.events...); err != nil {
b.Rollback()
return err
}
}
2014-09-04 10:57:26 +04:00
b.events = [][]byte{}
b.lastHead = nil
2014-08-30 13:39:44 +04:00
return nil
}
func (b *replBatch) Rollback() error {
b.wb.Rollback()
b.events = [][]byte{}
2014-09-01 06:16:20 +04:00
b.lastHead = nil
2014-08-30 13:39:44 +04:00
return nil
}
func (l *Ledis) replicateEvent(b *replBatch, event []byte) error {
2014-06-04 10:42:02 +04:00
if len(event) == 0 {
return errInvalidBinLogEvent
}
2014-09-04 10:57:26 +04:00
b.events = append(b.events, event)
2014-06-04 10:42:02 +04:00
logType := uint8(event[0])
switch logType {
case BinLogTypePut:
2014-08-30 13:39:44 +04:00
return l.replicatePutEvent(b, event)
2014-06-04 10:42:02 +04:00
case BinLogTypeDeletion:
2014-08-30 13:39:44 +04:00
return l.replicateDeleteEvent(b, event)
2014-06-04 10:42:02 +04:00
default:
return errInvalidBinLogEvent
}
}
2014-08-30 13:39:44 +04:00
func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error {
2014-06-04 10:42:02 +04:00
key, value, err := decodeBinLogPut(event)
if err != nil {
return err
}
2014-08-30 13:39:44 +04:00
b.wb.Put(key, value)
2014-06-04 10:42:02 +04:00
2014-08-30 13:39:44 +04:00
return nil
2014-06-04 10:42:02 +04:00
}
2014-08-30 13:39:44 +04:00
func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error {
2014-06-04 10:42:02 +04:00
key, err := decodeBinLogDelete(event)
if err != nil {
return err
}
2014-08-30 13:39:44 +04:00
b.wb.Delete(key)
2014-06-04 10:42:02 +04:00
2014-08-30 13:39:44 +04:00
return nil
2014-06-04 10:42:02 +04:00
}
2014-09-01 06:16:20 +04:00
func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
head := &BinLogHead{}
2014-06-09 13:23:32 +04:00
var err error
for {
2014-09-01 06:16:20 +04:00
if err = head.Read(rb); err != nil {
2014-06-09 13:23:32 +04:00
if err == io.EOF {
break
} else {
return err
}
}
2014-09-04 10:57:26 +04:00
var dataBuf bytes.Buffer
2014-09-01 06:16:20 +04:00
if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
2014-06-09 13:23:32 +04:00
return err
}
2014-09-01 06:16:20 +04:00
err = f(head, dataBuf.Bytes())
2014-07-11 09:28:34 +04:00
if err != nil && err != ErrSkipEvent {
return err
2014-06-09 13:23:32 +04:00
}
}
return nil
}
2014-07-11 09:28:34 +04:00
func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
2014-08-30 13:39:44 +04:00
b := new(replBatch)
b.wb = l.ldb.NewWriteBatch()
b.l = l
2014-09-01 06:16:20 +04:00
f := func(head *BinLogHead, event []byte) error {
if b.lastHead == nil {
b.lastHead = head
} else if !b.lastHead.InSameBatch(head) {
2014-08-30 13:39:44 +04:00
if err := b.Commit(); err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
2014-09-01 06:16:20 +04:00
b.lastHead = head
2014-08-30 13:39:44 +04:00
}
err := l.replicateEvent(b, event)
2014-07-11 09:28:34 +04:00
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
return ErrSkipEvent
}
return nil
}
2014-08-30 13:39:44 +04:00
err := ReadEventFromReader(rb, f)
if err != nil {
b.Rollback()
return err
}
return b.Commit()
2014-07-11 09:28:34 +04:00
}
2014-06-09 13:23:32 +04:00
func (l *Ledis) ReplicateFromData(data []byte) error {
rb := bytes.NewReader(data)
err := l.ReplicateFromReader(rb)
return err
}
func (l *Ledis) ReplicateFromBinLog(filePath string) error {
f, err := os.Open(filePath)
2014-06-04 10:42:02 +04:00
if err != nil {
2014-06-09 13:23:32 +04:00
return err
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
rb := bufio.NewReaderSize(f, 4096)
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
err = l.ReplicateFromReader(rb)
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
f.Close()
return err
}
// try to read events, if no events read, try to wait the new event singal until timeout seconds
func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
lastIndex := info.LogFileIndex
lastPos := info.LogPos
n = 0
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
info.LogPos = 0
return
}
n, err = l.ReadEventsTo(info, w)
if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
//no events read
select {
case <-l.binlog.Wait():
case <-time.After(time.Duration(timeout) * time.Second):
}
return l.ReadEventsTo(info, w)
}
return
}
2014-09-04 18:43:56 +04:00
func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
2014-06-10 06:41:50 +04:00
n = 0
2014-06-09 13:23:32 +04:00
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
2014-06-10 06:41:50 +04:00
info.LogPos = 0
2014-06-09 13:23:32 +04:00
return
2014-06-04 10:42:02 +04:00
}
2014-06-10 06:41:50 +04:00
index := info.LogFileIndex
offset := info.LogPos
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
filePath := l.binlog.FormatLogFilePath(index)
var f *os.File
f, err = os.Open(filePath)
2014-06-10 06:41:50 +04:00
if os.IsNotExist(err) {
2014-06-09 13:23:32 +04:00
lastIndex := l.binlog.LogFileIndex()
2014-06-10 06:41:50 +04:00
2014-06-09 13:23:32 +04:00
if index == lastIndex {
//no binlog at all
2014-06-10 06:41:50 +04:00
info.LogPos = 0
} else {
//slave binlog info had lost
info.LogFileIndex = -1
2014-06-09 13:23:32 +04:00
}
2014-06-10 06:41:50 +04:00
}
2014-06-09 13:23:32 +04:00
2014-06-10 06:41:50 +04:00
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return
2014-06-09 13:23:32 +04:00
}
defer f.Close()
2014-06-10 06:41:50 +04:00
var fileSize int64
st, _ := f.Stat()
fileSize = st.Size()
if fileSize == info.LogPos {
return
}
2014-06-09 13:23:32 +04:00
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
//may be invliad seek offset
return
}
2014-09-01 06:16:20 +04:00
var lastHead *BinLogHead = nil
head := &BinLogHead{}
batchNum := 0
2014-06-09 13:23:32 +04:00
2014-06-04 10:42:02 +04:00
for {
2014-09-01 06:16:20 +04:00
if err = head.Read(f); err != nil {
2014-06-09 13:23:32 +04:00
if err == io.EOF {
//we will try to use next binlog
2014-06-10 06:41:50 +04:00
if index < l.binlog.LogFileIndex() {
info.LogFileIndex += 1
info.LogPos = 0
}
err = nil
2014-06-09 13:23:32 +04:00
return
} else {
return
}
2014-06-04 10:42:02 +04:00
}
2014-09-01 06:16:20 +04:00
if lastHead == nil {
lastHead = head
batchNum++
} else if !lastHead.InSameBatch(head) {
lastHead = head
batchNum++
if batchNum > maxReplBatchNum || n > maxReplLogSize {
return
}
2014-06-09 13:23:32 +04:00
}
2014-06-04 10:42:02 +04:00
2014-09-01 06:16:20 +04:00
if err = head.Write(w); err != nil {
2014-06-09 13:23:32 +04:00
return
}
2014-06-04 10:42:02 +04:00
2014-09-01 06:16:20 +04:00
if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
2014-06-09 13:23:32 +04:00
return
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
2014-09-01 06:16:20 +04:00
n += (head.Len() + int(head.PayloadLen))
info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
return
2014-06-04 10:42:02 +04:00
}