mirror of https://github.com/ledisdb/ledisdb.git
add compression for log
This commit is contained in:
parent
b2a8b70e54
commit
f49a3bbece
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/siddontang/go/log"
|
"github.com/siddontang/go/log"
|
||||||
|
"github.com/siddontang/go/snappy"
|
||||||
"github.com/siddontang/ledisdb/rpl"
|
"github.com/siddontang/ledisdb/rpl"
|
||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
@ -37,12 +38,23 @@ func (l *Ledis) handleReplication() {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
l.rbatch.Rollback()
|
l.rbatch.Rollback()
|
||||||
|
|
||||||
|
if rl.Compression == 1 {
|
||||||
|
//todo optimize
|
||||||
|
if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil {
|
||||||
|
log.Error("decode log error %s", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
decodeEventBatch(l.rbatch, rl.Data)
|
decodeEventBatch(l.rbatch, rl.Data)
|
||||||
|
|
||||||
if err := l.rbatch.Commit(); err != nil {
|
if err := l.rbatch.Commit(); err != nil {
|
||||||
log.Error("commit log error %s", err.Error())
|
log.Error("commit log error %s", err.Error())
|
||||||
|
return
|
||||||
} else if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
} else if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
||||||
log.Error("update commit id error %s", err.Error())
|
log.Error("update commit id error %s", err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ func TestReplication(t *testing.T) {
|
||||||
cfgM.DataDir = "/tmp/test_repl/master"
|
cfgM.DataDir = "/tmp/test_repl/master"
|
||||||
|
|
||||||
cfgM.UseReplication = true
|
cfgM.UseReplication = true
|
||||||
|
cfgM.Replication.Compression = true
|
||||||
|
|
||||||
os.RemoveAll(cfgM.DataDir)
|
os.RemoveAll(cfgM.DataDir)
|
||||||
|
|
||||||
|
|
19
rpl/log.go
19
rpl/log.go
|
@ -4,27 +4,18 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io"
|
"io"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Log struct {
|
type Log struct {
|
||||||
ID uint64
|
ID uint64
|
||||||
CreateTime uint32
|
CreateTime uint32
|
||||||
|
Compression uint8
|
||||||
|
|
||||||
Data []byte
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLog(id uint64, data []byte) *Log {
|
|
||||||
l := new(Log)
|
|
||||||
l.ID = id
|
|
||||||
l.CreateTime = uint32(time.Now().Unix())
|
|
||||||
l.Data = data
|
|
||||||
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Log) HeadSize() int {
|
func (l *Log) HeadSize() int {
|
||||||
return 16
|
return 17
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Log) Size() int {
|
func (l *Log) Size() int {
|
||||||
|
@ -58,6 +49,9 @@ func (l *Log) Encode(w io.Writer) error {
|
||||||
binary.BigEndian.PutUint32(buf[pos:], l.CreateTime)
|
binary.BigEndian.PutUint32(buf[pos:], l.CreateTime)
|
||||||
pos += 4
|
pos += 4
|
||||||
|
|
||||||
|
buf[pos] = l.Compression
|
||||||
|
pos++
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data)))
|
binary.BigEndian.PutUint32(buf[pos:], uint32(len(l.Data)))
|
||||||
|
|
||||||
if n, err := w.Write(buf); err != nil {
|
if n, err := w.Write(buf); err != nil {
|
||||||
|
@ -88,6 +82,9 @@ func (l *Log) Decode(r io.Reader) error {
|
||||||
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
|
l.CreateTime = binary.BigEndian.Uint32(buf[pos:])
|
||||||
pos += 4
|
pos += 4
|
||||||
|
|
||||||
|
l.Compression = uint8(buf[pos])
|
||||||
|
pos++
|
||||||
|
|
||||||
length := binary.BigEndian.Uint32(buf[pos:])
|
length := binary.BigEndian.Uint32(buf[pos:])
|
||||||
|
|
||||||
l.Data = l.Data[0:0]
|
l.Data = l.Data[0:0]
|
||||||
|
|
15
rpl/rpl.go
15
rpl/rpl.go
|
@ -3,6 +3,7 @@ package rpl
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"github.com/siddontang/go/log"
|
"github.com/siddontang/go/log"
|
||||||
|
"github.com/siddontang/go/snappy"
|
||||||
"github.com/siddontang/ledisdb/config"
|
"github.com/siddontang/ledisdb/config"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
@ -86,6 +87,14 @@ func (r *Replication) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Replication) Log(data []byte) (*Log, error) {
|
func (r *Replication) Log(data []byte) (*Log, error) {
|
||||||
|
if r.cfg.Replication.Compression {
|
||||||
|
//todo optimize
|
||||||
|
var err error
|
||||||
|
if data, err = snappy.Encode(nil, data); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
r.m.Lock()
|
r.m.Lock()
|
||||||
defer r.m.Unlock()
|
defer r.m.Unlock()
|
||||||
|
|
||||||
|
@ -103,6 +112,12 @@ func (r *Replication) Log(data []byte) (*Log, error) {
|
||||||
l.ID = lastID + 1
|
l.ID = lastID + 1
|
||||||
l.CreateTime = uint32(time.Now().Unix())
|
l.CreateTime = uint32(time.Now().Unix())
|
||||||
|
|
||||||
|
if r.cfg.Replication.Compression {
|
||||||
|
l.Compression = 1
|
||||||
|
} else {
|
||||||
|
l.Compression = 0
|
||||||
|
}
|
||||||
|
|
||||||
l.Data = data
|
l.Data = data
|
||||||
|
|
||||||
if err = r.s.StoreLog(l); err != nil {
|
if err = r.s.StoreLog(l); err != nil {
|
||||||
|
|
|
@ -61,7 +61,6 @@ type client struct {
|
||||||
resp responseWriter
|
resp responseWriter
|
||||||
|
|
||||||
syncBuf bytes.Buffer
|
syncBuf bytes.Buffer
|
||||||
compressBuf []byte
|
|
||||||
|
|
||||||
lastLogID uint64
|
lastLogID uint64
|
||||||
|
|
||||||
|
@ -83,7 +82,6 @@ func newClient(app *App) *client {
|
||||||
c.ldb = app.ldb
|
c.ldb = app.ldb
|
||||||
c.db, _ = app.ldb.Select(0) //use default db
|
c.db, _ = app.ldb.Select(0) //use default db
|
||||||
|
|
||||||
c.compressBuf = []byte{}
|
|
||||||
c.reqErr = make(chan error)
|
c.reqErr = make(chan error)
|
||||||
|
|
||||||
return c
|
return c
|
||||||
|
|
|
@ -3,8 +3,6 @@ package server
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/siddontang/go/hack"
|
"github.com/siddontang/go/hack"
|
||||||
"github.com/siddontang/go/snappy"
|
|
||||||
|
|
||||||
"github.com/siddontang/ledisdb/ledis"
|
"github.com/siddontang/ledisdb/ledis"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
@ -94,14 +92,6 @@ func syncCommand(c *client) error {
|
||||||
} else {
|
} else {
|
||||||
buf := c.syncBuf.Bytes()
|
buf := c.syncBuf.Bytes()
|
||||||
|
|
||||||
if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
|
|
||||||
c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
|
|
||||||
}
|
|
||||||
|
|
||||||
if buf, err = snappy.Encode(c.compressBuf, buf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.resp.writeBulk(buf)
|
c.resp.writeBulk(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,8 +7,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/siddontang/go/hack"
|
"github.com/siddontang/go/hack"
|
||||||
"github.com/siddontang/go/log"
|
"github.com/siddontang/go/log"
|
||||||
|
|
||||||
"github.com/siddontang/go/snappy"
|
|
||||||
"github.com/siddontang/ledisdb/ledis"
|
"github.com/siddontang/ledisdb/ledis"
|
||||||
"github.com/siddontang/ledisdb/rpl"
|
"github.com/siddontang/ledisdb/rpl"
|
||||||
"net"
|
"net"
|
||||||
|
@ -38,8 +36,6 @@ type master struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
syncBuf bytes.Buffer
|
syncBuf bytes.Buffer
|
||||||
|
|
||||||
compressBuf []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMaster(app *App) *master {
|
func newMaster(app *App) *master {
|
||||||
|
@ -48,8 +44,6 @@ func newMaster(app *App) *master {
|
||||||
|
|
||||||
m.quit = make(chan struct{}, 1)
|
m.quit = make(chan struct{}, 1)
|
||||||
|
|
||||||
m.compressBuf = make([]byte, 256)
|
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,13 +213,7 @@ func (m *master) sync() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var buf []byte
|
buf := m.syncBuf.Bytes()
|
||||||
buf, err = snappy.Decode(m.compressBuf, m.syncBuf.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
} else if len(buf) > len(m.compressBuf) {
|
|
||||||
m.compressBuf = buf
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue