diff --git a/README_CN.md b/README_CN.md index 22849b4..e8e639d 100644 --- a/README_CN.md +++ b/README_CN.md @@ -51,7 +51,7 @@ ledisdb是一个用go实现的类似redis的高性能nosql数据库,底层基 ## 嵌入库 import "github.com/siddontang/ledisdb/ledis" - l, _ := ledis.OpenWithConfig(cfg) + l, _ := ledis.Open(cfg) db, _ := l.Select(0) db.Set(key, value) diff --git a/bootstrap.sh b/bootstrap.sh index e7332ce..afb5ec7 100644 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -3,3 +3,4 @@ . ./dev.sh go get -u github.com/siddontang/go-log/log +go get -u githbu.com/siddontang/go-snappy/snappy \ No newline at end of file diff --git a/ledis/dump.go b/ledis/dump.go index 16354c8..bc0427c 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "encoding/binary" + "github.com/siddontang/go-snappy/snappy" "github.com/siddontang/ledisdb/leveldb" "io" "os" @@ -12,7 +13,8 @@ import ( //dump format // fileIndex(bigendian int64)|filePos(bigendian int64) // |keylen(bigendian int32)|key|valuelen(bigendian int32)|value...... - +// +//key and value are both compressed for fast transfer dump on network using snappy type MasterInfo struct { LogFileIndex int64 LogPos int64 @@ -76,12 +78,18 @@ func (l *Ledis) Dump(w io.Writer) error { it := sp.NewIterator() it.SeekToFirst() + compressBuf := make([]byte, 4096) + var key []byte var value []byte for ; it.Valid(); it.Next() { key = it.Key() value = it.Value() + if key, err = snappy.Encode(compressBuf, key); err != nil { + return err + } + if err = binary.Write(wb, binary.BigEndian, uint16(len(key))); err != nil { return err } @@ -90,6 +98,10 @@ func (l *Ledis) Dump(w io.Writer) error { return err } + if value, err = snappy.Encode(compressBuf, value); err != nil { + return err + } + if err = binary.Write(wb, binary.BigEndian, uint32(len(value))); err != nil { return err } @@ -103,6 +115,8 @@ func (l *Ledis) Dump(w io.Writer) error { return err } + compressBuf = nil + return nil } @@ -134,6 +148,12 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { var keyBuf bytes.Buffer var valueBuf bytes.Buffer + + deKeyBuf := make([]byte, 4096) + deValueBuf := make([]byte, 4096) + + var key, value []byte + for { if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF { return nil, err @@ -145,6 +165,10 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { return nil, err } + if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil { + return nil, err + } + if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil { return nil, err } @@ -153,17 +177,24 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { return nil, err } - if err = l.ldb.Put(keyBuf.Bytes(), valueBuf.Bytes()); err != nil { + if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil { + return nil, err + } + + if err = l.ldb.Put(key, value); err != nil { return nil, err } if l.binlog != nil { - err = l.binlog.Log(encodeBinLogPut(keyBuf.Bytes(), valueBuf.Bytes())) + err = l.binlog.Log(encodeBinLogPut(key, value)) } keyBuf.Reset() valueBuf.Reset() } + deKeyBuf = nil + deValueBuf = nil + return info, nil } diff --git a/server/client.go b/server/client.go index 92929bc..bb4fe0e 100644 --- a/server/client.go +++ b/server/client.go @@ -33,6 +33,8 @@ type client struct { syncBuf bytes.Buffer + compressBuf []byte + logBuf bytes.Buffer } @@ -50,6 +52,8 @@ func newClient(c net.Conn, app *App) { co.reqC = make(chan error, 1) + co.compressBuf = make([]byte, 256) + go co.run() } diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 82803dd..c47be30 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -3,6 +3,7 @@ package server import ( "encoding/binary" "fmt" + "github.com/siddontang/go-snappy/snappy" "github.com/siddontang/ledisdb/ledis" "io/ioutil" "os" @@ -103,6 +104,14 @@ func syncCommand(c *client) error { binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex)) binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos)) + if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) { + c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf))) + } + + if buf, err = snappy.Encode(c.compressBuf, buf); err != nil { + return err + } + c.writeBulk(buf) } diff --git a/server/replication.go b/server/replication.go index 383a244..6c30968 100644 --- a/server/replication.go +++ b/server/replication.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "github.com/siddontang/go-log/log" + "github.com/siddontang/go-snappy/snappy" "github.com/siddontang/ledisdb/ledis" "io/ioutil" "net" @@ -42,6 +43,8 @@ type master struct { wg sync.WaitGroup syncBuf bytes.Buffer + + compressBuf []byte } func newMaster(app *App) *master { @@ -53,6 +56,8 @@ func newMaster(app *App) *master { m.quit = make(chan struct{}, 1) + m.compressBuf = make([]byte, 256) + //if load error, we will start a fullsync later m.loadInfo() @@ -87,8 +92,6 @@ func (m *master) loadInfo() error { return err } - println(m.addr, m.logFileIndex, m.logPos) - return nil } @@ -293,16 +296,21 @@ func (m *master) sync() error { return err } - err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logFileIndex) + var buf []byte + buf, err = snappy.Decode(m.compressBuf, m.syncBuf.Bytes()) if err != nil { return err + } else if len(buf) > len(m.compressBuf) { + m.compressBuf = buf } - err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logPos) - if err != nil { - return err + if len(buf) < 16 { + return fmt.Errorf("invalid sync data len %d", len(buf)) } + m.logFileIndex = int64(binary.BigEndian.Uint64(buf[0:8])) + m.logPos = int64(binary.BigEndian.Uint64(buf[8:16])) + if m.logFileIndex == 0 { //master now not support binlog, stop replication m.stopReplication() @@ -312,7 +320,7 @@ func (m *master) sync() error { return m.fullSync() } - err = m.app.ldb.ReplicateFromReader(&m.syncBuf) + err = m.app.ldb.ReplicateFromData(buf[16:]) if err != nil { return err }