ledisdb/ledis/replication.go

239 lines
4.1 KiB
Go
Raw Normal View History

2014-06-04 10:42:02 +04:00
package ledis
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
2014-06-19 13:50:27 +04:00
"github.com/siddontang/ledisdb/log"
2014-06-04 10:42:02 +04:00
"io"
"os"
)
var (
2014-06-06 04:30:10 +04:00
errInvalidBinLogEvent = errors.New("invalid binglog event")
2014-06-09 13:23:32 +04:00
errInvalidBinLogFile = errors.New("invalid binlog file")
2014-06-04 10:42:02 +04:00
)
func (l *Ledis) ReplicateEvent(event []byte) error {
2014-06-04 10:42:02 +04:00
if len(event) == 0 {
return errInvalidBinLogEvent
}
logType := uint8(event[0])
switch logType {
case BinLogTypePut:
return l.replicatePutEvent(event)
case BinLogTypeDeletion:
return l.replicateDeleteEvent(event)
case BinLogTypeCommand:
return l.replicateCommandEvent(event)
default:
return errInvalidBinLogEvent
}
}
func (l *Ledis) replicatePutEvent(event []byte) error {
key, value, err := decodeBinLogPut(event)
if err != nil {
return err
}
if err = l.ldb.Put(key, value); err != nil {
return err
}
if l.binlog != nil {
err = l.binlog.Log(event)
}
return err
}
func (l *Ledis) replicateDeleteEvent(event []byte) error {
key, err := decodeBinLogDelete(event)
if err != nil {
return err
}
if err = l.ldb.Delete(key); err != nil {
return err
}
if l.binlog != nil {
err = l.binlog.Log(event)
}
return err
}
func (l *Ledis) replicateCommandEvent(event []byte) error {
return errors.New("command event not supported now")
}
2014-06-09 13:23:32 +04:00
func (l *Ledis) ReplicateFromReader(rb io.Reader) error {
var createTime uint32
var dataLen uint32
var dataBuf bytes.Buffer
var err error
for {
if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil {
if err == io.EOF {
break
} else {
return err
}
}
if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil {
return err
}
if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil {
return err
}
err = l.ReplicateEvent(dataBuf.Bytes())
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
}
dataBuf.Reset()
}
return nil
}
func (l *Ledis) ReplicateFromData(data []byte) error {
rb := bytes.NewReader(data)
l.Lock()
err := l.ReplicateFromReader(rb)
l.Unlock()
return err
}
func (l *Ledis) ReplicateFromBinLog(filePath string) error {
f, err := os.Open(filePath)
2014-06-04 10:42:02 +04:00
if err != nil {
2014-06-09 13:23:32 +04:00
return err
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
rb := bufio.NewReaderSize(f, 4096)
2014-06-04 10:42:02 +04:00
2014-06-11 10:52:34 +04:00
l.Lock()
2014-06-09 13:23:32 +04:00
err = l.ReplicateFromReader(rb)
2014-06-11 10:52:34 +04:00
l.Unlock()
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
f.Close()
return err
}
2014-06-10 06:41:50 +04:00
const maxSyncEvents = 64
2014-06-09 13:23:32 +04:00
2014-06-10 06:41:50 +04:00
func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
n = 0
2014-06-09 13:23:32 +04:00
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
2014-06-10 06:41:50 +04:00
info.LogPos = 0
2014-06-09 13:23:32 +04:00
return
2014-06-04 10:42:02 +04:00
}
2014-06-10 06:41:50 +04:00
index := info.LogFileIndex
offset := info.LogPos
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
filePath := l.binlog.FormatLogFilePath(index)
var f *os.File
f, err = os.Open(filePath)
2014-06-10 06:41:50 +04:00
if os.IsNotExist(err) {
2014-06-09 13:23:32 +04:00
lastIndex := l.binlog.LogFileIndex()
2014-06-10 06:41:50 +04:00
2014-06-09 13:23:32 +04:00
if index == lastIndex {
//no binlog at all
2014-06-10 06:41:50 +04:00
info.LogPos = 0
} else {
//slave binlog info had lost
info.LogFileIndex = -1
2014-06-09 13:23:32 +04:00
}
2014-06-10 06:41:50 +04:00
}
2014-06-09 13:23:32 +04:00
2014-06-10 06:41:50 +04:00
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return
2014-06-09 13:23:32 +04:00
}
defer f.Close()
2014-06-10 06:41:50 +04:00
var fileSize int64
st, _ := f.Stat()
fileSize = st.Size()
if fileSize == info.LogPos {
return
}
2014-06-09 13:23:32 +04:00
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
//may be invliad seek offset
return
}
var lastCreateTime uint32 = 0
2014-06-04 10:42:02 +04:00
var createTime uint32
var dataLen uint32
2014-06-09 13:23:32 +04:00
2014-06-10 06:41:50 +04:00
var eventsNum int = 0
2014-06-04 10:42:02 +04:00
for {
2014-06-09 13:23:32 +04:00
if err = binary.Read(f, binary.BigEndian, &createTime); err != nil {
if err == io.EOF {
//we will try to use next binlog
2014-06-10 06:41:50 +04:00
if index < l.binlog.LogFileIndex() {
info.LogFileIndex += 1
info.LogPos = 0
}
err = nil
2014-06-09 13:23:32 +04:00
return
} else {
return
}
2014-06-04 10:42:02 +04:00
}
2014-06-10 06:41:50 +04:00
eventsNum++
2014-06-09 13:23:32 +04:00
if lastCreateTime == 0 {
lastCreateTime = createTime
} else if lastCreateTime != createTime {
return
2014-06-10 06:41:50 +04:00
} else if eventsNum > maxSyncEvents {
2014-06-09 13:23:32 +04:00
return
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
if err = binary.Read(f, binary.BigEndian, &dataLen); err != nil {
return
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
if err = binary.Write(w, binary.BigEndian, createTime); err != nil {
return
}
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
if err = binary.Write(w, binary.BigEndian, dataLen); err != nil {
return
}
2014-06-04 10:42:02 +04:00
2014-06-09 13:23:32 +04:00
if _, err = io.CopyN(w, f, int64(dataLen)); err != nil {
return
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
2014-06-10 06:41:50 +04:00
n += (8 + int(dataLen))
2014-06-09 13:23:32 +04:00
info.LogPos = info.LogPos + 8 + int64(dataLen)
2014-06-04 10:42:02 +04:00
}
2014-06-09 13:23:32 +04:00
return
2014-06-04 10:42:02 +04:00
}