mirror of https://github.com/ledisdb/ledisdb.git
133 lines
2.4 KiB
Go
133 lines
2.4 KiB
Go
package ledis
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"github.com/siddontang/go-log/log"
|
|
"io"
|
|
"os"
|
|
)
|
|
|
|
var (
|
|
errInvalidBinLogEvent = errors.New("invalid binglog event")
|
|
)
|
|
|
|
func (l *Ledis) ReplicateEvent(event []byte) error {
|
|
if len(event) == 0 {
|
|
return errInvalidBinLogEvent
|
|
}
|
|
|
|
logType := uint8(event[0])
|
|
switch logType {
|
|
case BinLogTypePut:
|
|
return l.replicatePutEvent(event)
|
|
case BinLogTypeDeletion:
|
|
return l.replicateDeleteEvent(event)
|
|
case BinLogTypeCommand:
|
|
return l.replicateCommandEvent(event)
|
|
default:
|
|
return errInvalidBinLogEvent
|
|
}
|
|
}
|
|
|
|
func (l *Ledis) replicatePutEvent(event []byte) error {
|
|
key, value, err := decodeBinLogPut(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = l.ldb.Put(key, value); err != nil {
|
|
return err
|
|
}
|
|
|
|
if l.binlog != nil {
|
|
err = l.binlog.Log(event)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (l *Ledis) replicateDeleteEvent(event []byte) error {
|
|
key, err := decodeBinLogDelete(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = l.ldb.Delete(key); err != nil {
|
|
return err
|
|
}
|
|
|
|
if l.binlog != nil {
|
|
err = l.binlog.Log(event)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (l *Ledis) replicateCommandEvent(event []byte) error {
|
|
return errors.New("command event not supported now")
|
|
}
|
|
|
|
func (l *Ledis) RepliateFromBinLog(filePath string, offset int64) (int64, error) {
|
|
f, err := os.Open(filePath)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
st, _ := f.Stat()
|
|
totalSize := st.Size()
|
|
|
|
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
rb := bufio.NewReaderSize(f, 4096)
|
|
|
|
var createTime uint32
|
|
var dataLen uint32
|
|
var dataBuf bytes.Buffer
|
|
|
|
for {
|
|
if offset+8 > totalSize {
|
|
//event may not sync completely
|
|
return f.Seek(offset, os.SEEK_SET)
|
|
}
|
|
|
|
if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if offset+8+int64(dataLen) > totalSize {
|
|
//event may not sync completely
|
|
return f.Seek(offset, os.SEEK_SET)
|
|
} else {
|
|
if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
l.Lock()
|
|
err = l.ReplicateEvent(dataBuf.Bytes())
|
|
l.Unlock()
|
|
if err != nil {
|
|
log.Fatal("replication error %s, skip to next", err.Error())
|
|
}
|
|
|
|
dataBuf.Reset()
|
|
|
|
offset += (8 + int64(dataLen))
|
|
}
|
|
}
|
|
|
|
//can not go here???
|
|
log.Error("can not go here")
|
|
return offset, nil
|
|
}
|