add replicate for relay log

This commit is contained in:
siddontang 2014-06-04 14:42:02 +08:00
parent a7eb1e1884
commit 7a0c5bcd17
5 changed files with 250 additions and 12 deletions

View File

@ -2,33 +2,61 @@ package ledis
import ( import (
"encoding/binary" "encoding/binary"
"errors"
)
var (
errBinLogDeleteType = errors.New("invalid bin log delete type")
errBinLogPutType = errors.New("invalid bin log put type")
errBinLogCommandType = errors.New("invalid bin log command type")
) )
func encodeBinLogDelete(key []byte) []byte { func encodeBinLogDelete(key []byte) []byte {
buf := make([]byte, 3+len(key)) buf := make([]byte, 1+len(key))
buf[0] = BinLogTypeDeletion buf[0] = BinLogTypeDeletion
pos := 1 copy(buf[1:], key)
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
pos += 2
copy(buf[pos:], key)
return buf return buf
} }
func decodeBinLogDelete(sz []byte) ([]byte, error) {
if len(sz) < 1 || sz[0] != BinLogTypeDeletion {
return nil, errBinLogDeleteType
}
return sz[1:], nil
}
func encodeBinLogPut(key []byte, value []byte) []byte { func encodeBinLogPut(key []byte, value []byte) []byte {
buf := make([]byte, 7+len(key)+len(value)) buf := make([]byte, 3+len(key)+len(value))
buf[0] = BinLogTypePut buf[0] = BinLogTypePut
pos := 1 pos := 1
binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
pos += 2 pos += 2
copy(buf[pos:], key) copy(buf[pos:], key)
pos += len(key) pos += len(key)
binary.BigEndian.PutUint32(buf[pos:], uint32(len(value)))
pos += 4
copy(buf[pos:], value) copy(buf[pos:], value)
return buf return buf
} }
func encodeBinLogCommand(commandType uint8, args []byte) []byte { func decodeBinLogPut(sz []byte) ([]byte, []byte, error) {
if len(sz) < 3 || sz[0] != BinLogTypePut {
return nil, nil, errBinLogPutType
}
keyLen := int(binary.BigEndian.Uint16(sz[1:]))
if 3+keyLen > len(sz) {
return nil, nil, errBinLogPutType
}
return sz[3 : 3+keyLen], sz[3+keyLen:], nil
}
func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte {
//to do //to do
return nil return nil
} }
func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) {
return 0, nil, errBinLogCommandType
}

View File

@ -15,8 +15,8 @@ import (
// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... // |keylen(bigendian int32)|key|valuelen(bigendian int32)|value......
type DumpHead struct { type DumpHead struct {
LogFile string `json:"bin_log_file"` LogFile string `json:"log_file"`
LogPos int64 `json:"bin_log_pos"` LogPos int64 `json:"log_pos"`
} }
func (l *Ledis) DumpFile(path string) error { func (l *Ledis) DumpFile(path string) error {

132
ledis/replication.go Normal file
View File

@ -0,0 +1,132 @@
package ledis
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"github.com/siddontang/go-log/log"
"io"
"os"
)
var (
errInvalidBinLogEvent = errors.New("invalid binglog event")
)
func (l *Ledis) replicateEvent(event []byte) error {
if len(event) == 0 {
return errInvalidBinLogEvent
}
logType := uint8(event[0])
switch logType {
case BinLogTypePut:
return l.replicatePutEvent(event)
case BinLogTypeDeletion:
return l.replicateDeleteEvent(event)
case BinLogTypeCommand:
return l.replicateCommandEvent(event)
default:
return errInvalidBinLogEvent
}
}
func (l *Ledis) replicatePutEvent(event []byte) error {
key, value, err := decodeBinLogPut(event)
if err != nil {
return err
}
if err = l.ldb.Put(key, value); err != nil {
return err
}
if l.binlog != nil {
err = l.binlog.Log(event)
}
return err
}
func (l *Ledis) replicateDeleteEvent(event []byte) error {
key, err := decodeBinLogDelete(event)
if err != nil {
return err
}
if err = l.ldb.Delete(key); err != nil {
return err
}
if l.binlog != nil {
err = l.binlog.Log(event)
}
return err
}
func (l *Ledis) replicateCommandEvent(event []byte) error {
return errors.New("command event not supported now")
}
func (l *Ledis) RepliateRelayLog(relayLog string, offset int64) (int64, error) {
f, err := os.Open(relayLog)
if err != nil {
return 0, err
}
defer f.Close()
st, _ := f.Stat()
totalSize := st.Size()
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
return 0, err
}
rb := bufio.NewReaderSize(f, 4096)
var createTime uint32
var dataLen uint32
var dataBuf bytes.Buffer
for {
if offset+8 > totalSize {
//event may not sync completely
return f.Seek(offset, os.SEEK_SET)
}
if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil {
return 0, err
}
if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil {
return 0, err
}
if offset+8+int64(dataLen) > totalSize {
//event may not sync completely
return f.Seek(offset, os.SEEK_SET)
} else {
if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil {
return 0, err
}
l.Lock()
err = l.replicateEvent(dataBuf.Bytes())
l.Unlock()
if err != nil {
log.Fatal("replication error %s, skip to next", err.Error())
}
dataBuf.Reset()
offset += (8 + int64(dataLen))
}
}
//can not go here???
log.Error("can not go here")
return offset, nil
}

78
ledis/replication_test.go Normal file
View File

@ -0,0 +1,78 @@
package ledis
import (
"bytes"
"github.com/siddontang/go-leveldb/leveldb"
"os"
"testing"
)
func TestReplication(t *testing.T) {
var master *Ledis
var slave *Ledis
var err error
os.RemoveAll("/tmp/repl")
os.MkdirAll("/tmp/repl", os.ModePerm)
master, err = Open([]byte(`
{
"data_db" : {
"path" : "/tmp/repl/master_db"
},
"binlog" : {
"path" : "/tmp/repl/master_binlog"
}
}
`))
if err != nil {
t.Fatal(err)
}
slave, err = Open([]byte(`
{
"data_db" : {
"path" : "/tmp/repl/slave_db"
},
"binlog" : {
"path" : "/tmp/repl/slave_binlog"
}
}
`))
if err != nil {
t.Fatal(err)
}
db, _ := master.Select(0)
db.Set([]byte("a"), []byte("1"))
db.Set([]byte("b"), []byte("2"))
db.Set([]byte("c"), []byte("3"))
relayLog := "/tmp/repl/master_binlog/ledis-bin.0000001"
var offset int64
offset, err = slave.RepliateRelayLog(relayLog, 0)
if err != nil {
t.Fatal(err)
} else {
if st, err := os.Stat(relayLog); err != nil {
t.Fatal(err)
} else if st.Size() != offset {
t.Fatal(st.Size(), offset)
}
}
it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if v, err := slave.ldb.Get(key); err != nil {
t.Fatal(err)
} else if !bytes.Equal(v, value) {
t.Fatal("replication error", len(v), len(value))
}
}
}

View File

@ -23,7 +23,7 @@ ledis-bin.00003
log file format log file format
timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData|LogId timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData
*/ */