diff --git a/ledis/binlog_util.go b/ledis/binlog_util.go index b0abd4e..bc1cd63 100644 --- a/ledis/binlog_util.go +++ b/ledis/binlog_util.go @@ -2,33 +2,61 @@ package ledis import ( "encoding/binary" + "errors" +) + +var ( + errBinLogDeleteType = errors.New("invalid bin log delete type") + errBinLogPutType = errors.New("invalid bin log put type") + errBinLogCommandType = errors.New("invalid bin log command type") ) func encodeBinLogDelete(key []byte) []byte { - buf := make([]byte, 3+len(key)) + buf := make([]byte, 1+len(key)) buf[0] = BinLogTypeDeletion - pos := 1 - binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) - pos += 2 - copy(buf[pos:], key) + copy(buf[1:], key) return buf } +func decodeBinLogDelete(sz []byte) ([]byte, error) { + if len(sz) < 1 || sz[0] != BinLogTypeDeletion { + return nil, errBinLogDeleteType + } + + return sz[1:], nil +} + func encodeBinLogPut(key []byte, value []byte) []byte { - buf := make([]byte, 7+len(key)+len(value)) + buf := make([]byte, 3+len(key)+len(value)) buf[0] = BinLogTypePut pos := 1 binary.BigEndian.PutUint16(buf[pos:], uint16(len(key))) pos += 2 copy(buf[pos:], key) pos += len(key) - binary.BigEndian.PutUint32(buf[pos:], uint32(len(value))) - pos += 4 copy(buf[pos:], value) + return buf } -func encodeBinLogCommand(commandType uint8, args []byte) []byte { +func decodeBinLogPut(sz []byte) ([]byte, []byte, error) { + if len(sz) < 3 || sz[0] != BinLogTypePut { + return nil, nil, errBinLogPutType + } + + keyLen := int(binary.BigEndian.Uint16(sz[1:])) + if 3+keyLen > len(sz) { + return nil, nil, errBinLogPutType + } + + return sz[3 : 3+keyLen], sz[3+keyLen:], nil +} + +func encodeBinLogCommand(commandType uint8, args ...[]byte) []byte { //to do return nil } + +func decodeBinLogCommand(sz []byte) (uint8, [][]byte, error) { + return 0, nil, errBinLogCommandType +} diff --git a/ledis/dump.go b/ledis/dump.go index e7ff8a0..2cb08af 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -15,8 +15,8 @@ import ( // |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... type DumpHead struct { - LogFile string `json:"bin_log_file"` - LogPos int64 `json:"bin_log_pos"` + LogFile string `json:"log_file"` + LogPos int64 `json:"log_pos"` } func (l *Ledis) DumpFile(path string) error { diff --git a/ledis/replication.go b/ledis/replication.go new file mode 100644 index 0000000..8de0bc9 --- /dev/null +++ b/ledis/replication.go @@ -0,0 +1,132 @@ +package ledis + +import ( + "bufio" + "bytes" + "encoding/binary" + "errors" + "github.com/siddontang/go-log/log" + "io" + "os" +) + +var ( + errInvalidBinLogEvent = errors.New("invalid binglog event") +) + +func (l *Ledis) replicateEvent(event []byte) error { + if len(event) == 0 { + return errInvalidBinLogEvent + } + + logType := uint8(event[0]) + switch logType { + case BinLogTypePut: + return l.replicatePutEvent(event) + case BinLogTypeDeletion: + return l.replicateDeleteEvent(event) + case BinLogTypeCommand: + return l.replicateCommandEvent(event) + default: + return errInvalidBinLogEvent + } +} + +func (l *Ledis) replicatePutEvent(event []byte) error { + key, value, err := decodeBinLogPut(event) + if err != nil { + return err + } + + if err = l.ldb.Put(key, value); err != nil { + return err + } + + if l.binlog != nil { + err = l.binlog.Log(event) + } + + return err +} + +func (l *Ledis) replicateDeleteEvent(event []byte) error { + key, err := decodeBinLogDelete(event) + if err != nil { + return err + } + + if err = l.ldb.Delete(key); err != nil { + return err + } + + if l.binlog != nil { + err = l.binlog.Log(event) + } + + return err +} + +func (l *Ledis) replicateCommandEvent(event []byte) error { + return errors.New("command event not supported now") +} + +func (l *Ledis) RepliateRelayLog(relayLog string, offset int64) (int64, error) { + f, err := os.Open(relayLog) + if err != nil { + return 0, err + } + + defer f.Close() + + st, _ := f.Stat() + totalSize := st.Size() + + if _, err = f.Seek(offset, os.SEEK_SET); err != nil { + return 0, err + } + + rb := bufio.NewReaderSize(f, 4096) + + var createTime uint32 + var dataLen uint32 + var dataBuf bytes.Buffer + + for { + if offset+8 > totalSize { + //event may not sync completely + return f.Seek(offset, os.SEEK_SET) + } + + if err = binary.Read(rb, binary.BigEndian, &createTime); err != nil { + return 0, err + } + + if err = binary.Read(rb, binary.BigEndian, &dataLen); err != nil { + return 0, err + } + + if offset+8+int64(dataLen) > totalSize { + //event may not sync completely + return f.Seek(offset, os.SEEK_SET) + } else { + if _, err = io.CopyN(&dataBuf, rb, int64(dataLen)); err != nil { + return 0, err + } + + l.Lock() + err = l.replicateEvent(dataBuf.Bytes()) + l.Unlock() + if err != nil { + log.Fatal("replication error %s, skip to next", err.Error()) + } + + dataBuf.Reset() + + offset += (8 + int64(dataLen)) + } + } + + //can not go here??? + log.Error("can not go here") + return offset, nil +} diff --git a/ledis/replication_test.go b/ledis/replication_test.go new file mode 100644 index 0000000..5f2243a --- /dev/null +++ b/ledis/replication_test.go @@ -0,0 +1,78 @@ +package ledis + +import ( + "bytes" + "github.com/siddontang/go-leveldb/leveldb" + "os" + "testing" +) + +func TestReplication(t *testing.T) { + var master *Ledis + var slave *Ledis + var err error + + os.RemoveAll("/tmp/repl") + os.MkdirAll("/tmp/repl", os.ModePerm) + + master, err = Open([]byte(` + { + "data_db" : { + "path" : "/tmp/repl/master_db" + }, + + "binlog" : { + "path" : "/tmp/repl/master_binlog" + } + } + `)) + if err != nil { + t.Fatal(err) + } + + slave, err = Open([]byte(` + { + "data_db" : { + "path" : "/tmp/repl/slave_db" + }, + + "binlog" : { + "path" : "/tmp/repl/slave_binlog" + } + } + `)) + if err != nil { + t.Fatal(err) + } + + db, _ := master.Select(0) + db.Set([]byte("a"), []byte("1")) + db.Set([]byte("b"), []byte("2")) + db.Set([]byte("c"), []byte("3")) + + relayLog := "/tmp/repl/master_binlog/ledis-bin.0000001" + + var offset int64 + offset, err = slave.RepliateRelayLog(relayLog, 0) + if err != nil { + t.Fatal(err) + } else { + if st, err := os.Stat(relayLog); err != nil { + t.Fatal(err) + } else if st.Size() != offset { + t.Fatal(st.Size(), offset) + } + } + + it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1) + for ; it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if v, err := slave.ldb.Get(key); err != nil { + t.Fatal(err) + } else if !bytes.Equal(v, value) { + t.Fatal("replication error", len(v), len(value)) + } + } +} diff --git a/replication/binlog.go b/replication/binlog.go index 1500b27..349880c 100644 --- a/replication/binlog.go +++ b/replication/binlog.go @@ -23,7 +23,7 @@ ledis-bin.00003 log file format -timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData|LogId +timestamp(bigendian uint32, seconds)|PayloadLen(bigendian uint32)|PayloadData */