From 93491791d599d77cdd34c12541e8a598d0bfe863 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 28 May 2014 14:20:45 +0800 Subject: [PATCH] add dump and load --- ledis/dump.go | 158 +++++++++++++++++++++++++++++++++++++++++++++ ledis/dump_test.go | 74 +++++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 ledis/dump.go create mode 100644 ledis/dump_test.go diff --git a/ledis/dump.go b/ledis/dump.go new file mode 100644 index 0000000..546ef31 --- /dev/null +++ b/ledis/dump.go @@ -0,0 +1,158 @@ +package ledis + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "github.com/siddontang/go-leveldb/leveldb" + "io" + "os" +) + +//dump format +// head len(bigendian int32)|head(json format) +// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... + +type DumpHead struct { + LogFile string `json:"bin_log_file"` + LogPos int64 `json:"bin_log_pos"` +} + +func (l *Ledis) DumpFile(path string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + return l.Dump(f) +} + +func (l *Ledis) Dump(w io.Writer) error { + var sp *leveldb.Snapshot + var logFileName string + var logPos int64 + if l.binlog == nil { + sp = l.ldb.NewSnapshot() + } else { + l.binlog.Lock() + sp = l.ldb.NewSnapshot() + logFileName = l.binlog.LogFileName() + logPos = l.binlog.LogFilePos() + l.binlog.Unlock() + } + + var head = DumpHead{ + LogFile: logFileName, + LogPos: logPos, + } + + data, err := json.Marshal(&head) + if err != nil { + return err + } + + wb := bufio.NewWriterSize(w, 4096) + if err = binary.Write(wb, binary.BigEndian, uint32(len(data))); err != nil { + return err + } + + if _, err = wb.Write(data); err != nil { + return err + } + + it := sp.Iterator(nil, nil, leveldb.RangeClose, 0, -1) + var key []byte + var value []byte + for ; it.Valid(); it.Next() { + key = it.Key() + value = it.Value() + + if err = binary.Write(wb, binary.BigEndian, uint16(len(key))); err != nil { + return err + } + + if _, err = wb.Write(key); err != nil { + return err + } + + if err = binary.Write(wb, binary.BigEndian, uint32(len(value))); err != nil { + return err + } + + if _, err = wb.Write(value); err != nil { + return err + } + } + + if err = wb.Flush(); err != nil { + return err + } + + return nil +} + +func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + return l.LoadDump(f) +} + +func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) { + rb := bufio.NewReaderSize(r, 4096) + + var headLen uint32 + err := binary.Read(rb, binary.BigEndian, &headLen) + if err != nil { + return nil, err + } + + buf := make([]byte, headLen) + if _, err = io.ReadFull(rb, buf); err != nil { + return nil, err + } + + var head DumpHead + if err = json.Unmarshal(buf, &head); err != nil { + return nil, err + } + + var keyLen uint16 + var valueLen uint32 + + var keyBuf bytes.Buffer + var valueBuf bytes.Buffer + for { + if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF { + return nil, err + } else if err == io.EOF { + break + } + + if _, err = io.CopyN(&keyBuf, rb, int64(keyLen)); err != nil { + return nil, err + } + + if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil { + return nil, err + } + + if _, err = io.CopyN(&valueBuf, rb, int64(valueLen)); err != nil { + return nil, err + } + + if err = l.ldb.Put(keyBuf.Bytes(), valueBuf.Bytes()); err != nil { + return nil, err + } + + keyBuf.Reset() + valueBuf.Reset() + } + + return &head, nil +} diff --git a/ledis/dump_test.go b/ledis/dump_test.go new file mode 100644 index 0000000..b27d8f0 --- /dev/null +++ b/ledis/dump_test.go @@ -0,0 +1,74 @@ +package ledis + +import ( + "bytes" + "github.com/siddontang/go-leveldb/leveldb" + "os" + "testing" +) + +func TestDump(t *testing.T) { + os.RemoveAll("/tmp/testdb_master") + os.RemoveAll("/tmp/testdb_slave") + os.Remove("/tmp/testdb.dump") + + var masterConfig = []byte(` + { + "data_db" : { + "path" : "/tmp/testdb_master", + "compression":true, + "block_size" : 32768, + "write_buffer_size" : 2097152, + "cache_size" : 20971520 + } + } + `) + + master, err := Open(masterConfig) + if err != nil { + t.Fatal(err) + } + + var slaveConfig = []byte(` + { + "data_db" : { + "path" : "/tmp/testdb_slave", + "compression":true, + "block_size" : 32768, + "write_buffer_size" : 2097152, + "cache_size" : 20971520 + } + } + `) + + var slave *Ledis + if slave, err = Open(slaveConfig); err != nil { + t.Fatal(err) + } + + db, _ := master.Select(0) + + db.Set([]byte("a"), []byte("1")) + db.Set([]byte("b"), []byte("2")) + db.Set([]byte("c"), []byte("3")) + + if err := master.DumpFile("/tmp/testdb.dump"); err != nil { + t.Fatal(err) + } + + if _, err := slave.LoadDumpFile("/tmp/testdb.dump"); err != nil { + t.Fatal(err) + } + + it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1) + for ; it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + if v, err := slave.ldb.Get(key); err != nil { + t.Fatal(err) + } else if !bytes.Equal(v, value) { + t.Fatal("load dump error") + } + } +}