forked from mirror/ledisdb
replication use snappy
This commit is contained in:
parent
6adfb1be98
commit
64a4fee577
|
@ -51,7 +51,7 @@ ledisdb是一个用go实现的类似redis的高性能nosql数据库,底层基
|
|||
## 嵌入库
|
||||
|
||||
import "github.com/siddontang/ledisdb/ledis"
|
||||
l, _ := ledis.OpenWithConfig(cfg)
|
||||
l, _ := ledis.Open(cfg)
|
||||
db, _ := l.Select(0)
|
||||
|
||||
db.Set(key, value)
|
||||
|
|
|
@ -3,3 +3,4 @@
|
|||
. ./dev.sh
|
||||
|
||||
go get -u github.com/siddontang/go-log/log
|
||||
go get -u githbu.com/siddontang/go-snappy/snappy
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"github.com/siddontang/go-snappy/snappy"
|
||||
"github.com/siddontang/ledisdb/leveldb"
|
||||
"io"
|
||||
"os"
|
||||
|
@ -12,7 +13,8 @@ import (
|
|||
//dump format
|
||||
// fileIndex(bigendian int64)|filePos(bigendian int64)
|
||||
// |keylen(bigendian int32)|key|valuelen(bigendian int32)|value......
|
||||
|
||||
//
|
||||
//key and value are both compressed for fast transfer dump on network using snappy
|
||||
type MasterInfo struct {
|
||||
LogFileIndex int64
|
||||
LogPos int64
|
||||
|
@ -76,12 +78,18 @@ func (l *Ledis) Dump(w io.Writer) error {
|
|||
it := sp.NewIterator()
|
||||
it.SeekToFirst()
|
||||
|
||||
compressBuf := make([]byte, 4096)
|
||||
|
||||
var key []byte
|
||||
var value []byte
|
||||
for ; it.Valid(); it.Next() {
|
||||
key = it.Key()
|
||||
value = it.Value()
|
||||
|
||||
if key, err = snappy.Encode(compressBuf, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = binary.Write(wb, binary.BigEndian, uint16(len(key))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -90,6 +98,10 @@ func (l *Ledis) Dump(w io.Writer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if value, err = snappy.Encode(compressBuf, value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = binary.Write(wb, binary.BigEndian, uint32(len(value))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -103,6 +115,8 @@ func (l *Ledis) Dump(w io.Writer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
compressBuf = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -134,6 +148,12 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
|
|||
|
||||
var keyBuf bytes.Buffer
|
||||
var valueBuf bytes.Buffer
|
||||
|
||||
deKeyBuf := make([]byte, 4096)
|
||||
deValueBuf := make([]byte, 4096)
|
||||
|
||||
var key, value []byte
|
||||
|
||||
for {
|
||||
if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
|
@ -145,6 +165,10 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -153,17 +177,24 @@ func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if err = l.ldb.Put(keyBuf.Bytes(), valueBuf.Bytes()); err != nil {
|
||||
if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = l.ldb.Put(key, value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if l.binlog != nil {
|
||||
err = l.binlog.Log(encodeBinLogPut(keyBuf.Bytes(), valueBuf.Bytes()))
|
||||
err = l.binlog.Log(encodeBinLogPut(key, value))
|
||||
}
|
||||
|
||||
keyBuf.Reset()
|
||||
valueBuf.Reset()
|
||||
}
|
||||
|
||||
deKeyBuf = nil
|
||||
deValueBuf = nil
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
|
|
@ -33,6 +33,8 @@ type client struct {
|
|||
|
||||
syncBuf bytes.Buffer
|
||||
|
||||
compressBuf []byte
|
||||
|
||||
logBuf bytes.Buffer
|
||||
}
|
||||
|
||||
|
@ -50,6 +52,8 @@ func newClient(c net.Conn, app *App) {
|
|||
|
||||
co.reqC = make(chan error, 1)
|
||||
|
||||
co.compressBuf = make([]byte, 256)
|
||||
|
||||
go co.run()
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package server
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/siddontang/go-snappy/snappy"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -103,6 +104,14 @@ func syncCommand(c *client) error {
|
|||
binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex))
|
||||
binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos))
|
||||
|
||||
if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
|
||||
c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
|
||||
}
|
||||
|
||||
if buf, err = snappy.Encode(c.compressBuf, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.writeBulk(buf)
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"github.com/siddontang/go-log/log"
|
||||
"github.com/siddontang/go-snappy/snappy"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
@ -42,6 +43,8 @@ type master struct {
|
|||
wg sync.WaitGroup
|
||||
|
||||
syncBuf bytes.Buffer
|
||||
|
||||
compressBuf []byte
|
||||
}
|
||||
|
||||
func newMaster(app *App) *master {
|
||||
|
@ -53,6 +56,8 @@ func newMaster(app *App) *master {
|
|||
|
||||
m.quit = make(chan struct{}, 1)
|
||||
|
||||
m.compressBuf = make([]byte, 256)
|
||||
|
||||
//if load error, we will start a fullsync later
|
||||
m.loadInfo()
|
||||
|
||||
|
@ -87,8 +92,6 @@ func (m *master) loadInfo() error {
|
|||
return err
|
||||
}
|
||||
|
||||
println(m.addr, m.logFileIndex, m.logPos)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -293,16 +296,21 @@ func (m *master) sync() error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logFileIndex)
|
||||
var buf []byte
|
||||
buf, err = snappy.Decode(m.compressBuf, m.syncBuf.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(buf) > len(m.compressBuf) {
|
||||
m.compressBuf = buf
|
||||
}
|
||||
|
||||
err = binary.Read(&m.syncBuf, binary.BigEndian, &m.logPos)
|
||||
if err != nil {
|
||||
return err
|
||||
if len(buf) < 16 {
|
||||
return fmt.Errorf("invalid sync data len %d", len(buf))
|
||||
}
|
||||
|
||||
m.logFileIndex = int64(binary.BigEndian.Uint64(buf[0:8]))
|
||||
m.logPos = int64(binary.BigEndian.Uint64(buf[8:16]))
|
||||
|
||||
if m.logFileIndex == 0 {
|
||||
//master now not support binlog, stop replication
|
||||
m.stopReplication()
|
||||
|
@ -312,7 +320,7 @@ func (m *master) sync() error {
|
|||
return m.fullSync()
|
||||
}
|
||||
|
||||
err = m.app.ldb.ReplicateFromReader(&m.syncBuf)
|
||||
err = m.app.ldb.ReplicateFromData(buf[16:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue