diff --git a/ledis/replication.go b/ledis/replication.go index 02c3e4e..1fa1531 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "github.com/siddontang/go/log" + "github.com/siddontang/go/snappy" "github.com/siddontang/ledisdb/rpl" "io" "time" @@ -37,12 +38,23 @@ func (l *Ledis) handleReplication() { } } else { l.rbatch.Rollback() + + if rl.Compression == 1 { + //todo optimize + if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil { + log.Error("decode log error %s", err.Error()) + return + } + } + decodeEventBatch(l.rbatch, rl.Data) if err := l.rbatch.Commit(); err != nil { log.Error("commit log error %s", err.Error()) + return } else if err = l.r.UpdateCommitID(rl.ID); err != nil { log.Error("update commit id error %s", err.Error()) + return } } diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 6e277a9..c300ef8 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -34,6 +34,7 @@ func TestReplication(t *testing.T) { cfgM.DataDir = "/tmp/test_repl/master" cfgM.UseReplication = true + cfgM.Replication.Compression = true os.RemoveAll(cfgM.DataDir) diff --git a/rpl/log.go b/rpl/log.go index 775ea5d..261e852 100644 --- a/rpl/log.go +++ b/rpl/log.go @@ -4,27 +4,18 @@ import ( "bytes" "encoding/binary" "io" - "time" ) type Log struct { - ID uint64 - CreateTime uint32 + ID uint64 + CreateTime uint32 + Compression uint8 Data []byte } -func NewLog(id uint64, data []byte) *Log { - l := new(Log) - l.ID = id - l.CreateTime = uint32(time.Now().Unix()) - l.Data = data - - return l -} - func (l *Log) HeadSize() int { - return 16 + return 17 } func (l *Log) Size() int { @@ -58,6 +49,9 @@ func (l *Log) Encode(w io.Writer) error { binary.BigEndian.PutUint32(buf[pos:], l.CreateTime) pos += 4 + buf[pos] = l.Compression + pos++ + binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data))) if n, err := w.Write(buf); err != nil { @@ -88,6 +82,9 @@ func (l *Log) Decode(r io.Reader) error { l.CreateTime = binary.BigEndian.Uint32(buf[pos:]) pos += 4 + l.Compression = uint8(buf[pos]) + pos++ + length := binary.BigEndian.Uint32(buf[pos:]) l.Data = l.Data[0:0] diff --git a/rpl/rpl.go b/rpl/rpl.go index f7324c8..3eaad9a 100644 --- a/rpl/rpl.go +++ b/rpl/rpl.go @@ -3,6 +3,7 @@ package rpl import ( "encoding/binary" "github.com/siddontang/go/log" + "github.com/siddontang/go/snappy" "github.com/siddontang/ledisdb/config" "os" "path" @@ -86,6 +87,14 @@ func (r *Replication) Close() error { } func (r *Replication) Log(data []byte) (*Log, error) { + if r.cfg.Replication.Compression { + //todo optimize + var err error + if data, err = snappy.Encode(nil, data); err != nil { + return nil, err + } + } + r.m.Lock() defer r.m.Unlock() @@ -103,6 +112,12 @@ func (r *Replication) Log(data []byte) (*Log, error) { l.ID = lastID + 1 l.CreateTime = uint32(time.Now().Unix()) + if r.cfg.Replication.Compression { + l.Compression = 1 + } else { + l.Compression = 0 + } + l.Data = data if err = r.s.StoreLog(l); err != nil { diff --git a/server/client.go b/server/client.go index 2ccea7a..ef9de76 100644 --- a/server/client.go +++ b/server/client.go @@ -60,8 +60,7 @@ type client struct { resp responseWriter - syncBuf bytes.Buffer - compressBuf []byte + syncBuf bytes.Buffer lastLogID uint64 @@ -83,7 +82,6 @@ func newClient(app *App) *client { c.ldb = app.ldb c.db, _ = app.ldb.Select(0) //use default db - c.compressBuf = []byte{} c.reqErr = make(chan error) return c diff --git a/server/cmd_replication.go b/server/cmd_replication.go index a261f42..ba091aa 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -3,8 +3,6 @@ package server import ( "fmt" "github.com/siddontang/go/hack" - "github.com/siddontang/go/snappy" - "github.com/siddontang/ledisdb/ledis" "io/ioutil" "os" @@ -94,14 +92,6 @@ func syncCommand(c *client) error { } else { buf := c.syncBuf.Bytes() - if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) { - c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf))) - } - - if buf, err = snappy.Encode(c.compressBuf, buf); err != nil { - return err - } - c.resp.writeBulk(buf) } diff --git a/server/replication.go b/server/replication.go index 3dd767e..d912016 100644 --- a/server/replication.go +++ b/server/replication.go @@ -7,8 +7,6 @@ import ( "fmt" "github.com/siddontang/go/hack" "github.com/siddontang/go/log" - - "github.com/siddontang/go/snappy" "github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/rpl" "net" @@ -38,8 +36,6 @@ type master struct { wg sync.WaitGroup syncBuf bytes.Buffer - - compressBuf []byte } func newMaster(app *App) *master { @@ -48,8 +44,6 @@ func newMaster(app *App) *master { m.quit = make(chan struct{}, 1) - m.compressBuf = make([]byte, 256) - return m } @@ -219,13 +213,7 @@ func (m *master) sync() error { } } - var buf []byte - buf, err = snappy.Decode(m.compressBuf, m.syncBuf.Bytes()) - if err != nil { - return err - } else if len(buf) > len(m.compressBuf) { - m.compressBuf = buf - } + buf := m.syncBuf.Bytes() if len(buf) == 0 { return nil