refactor replication

replication log format is not compatibility
This commit is contained in:
siddontang 2014-11-10 12:40:19 +08:00
parent 4a266b30b2
commit d058c20094
7 changed files with 43 additions and 174 deletions

View File

@ -15,8 +15,6 @@ type batch struct {
sync.Locker
tx *Tx
eb *eventBatch
}
func (b *batch) Commit() error {
@ -25,10 +23,12 @@ func (b *batch) Commit() error {
}
if b.tx == nil {
return b.l.handleCommit(b.eb, b.WriteBatch)
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
} else {
if b.l.r != nil {
b.tx.eb.Write(b.eb.Bytes())
if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
return err
}
}
return b.WriteBatch.Commit()
}
@ -39,25 +39,15 @@ func (b *batch) Lock() {
}
func (b *batch) Unlock() {
b.eb.Reset()
b.WriteBatch.Rollback()
b.Locker.Unlock()
}
func (b *batch) Put(key []byte, value []byte) {
if b.l.r != nil {
b.eb.Put(key, value)
}
b.WriteBatch.Put(key, value)
}
func (b *batch) Delete(key []byte) {
if b.l.r != nil {
b.eb.Delete(key)
}
b.WriteBatch.Delete(key)
}
@ -96,7 +86,6 @@ func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker, tx *Tx) *batc
b.Locker = locker
b.tx = tx
b.eb = new(eventBatch)
return b
}
@ -105,14 +94,18 @@ type commiter interface {
Commit() error
}
func (l *Ledis) handleCommit(eb *eventBatch, c commiter) error {
type commitDataGetter interface {
Data() []byte
}
func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error {
l.commitLock.Lock()
defer l.commitLock.Unlock()
var err error
if l.r != nil {
var rl *rpl.Log
if rl, err = l.r.Log(eb.Bytes()); err != nil {
if rl, err = l.r.Log(g.Data()); err != nil {
log.Fatal("write wal error %s", err.Error())
return err
}

View File

@ -1,101 +1,13 @@
package ledis
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/siddontang/go/hack"
"io"
"strconv"
)
const (
kTypeDeleteEvent uint8 = 0
kTypePutEvent uint8 = 1
)
var (
errInvalidPutEvent = errors.New("invalid put event")
errInvalidDeleteEvent = errors.New("invalid delete event")
errInvalidEvent = errors.New("invalid event")
)
type eventBatch struct {
bytes.Buffer
}
func (b *eventBatch) Put(key []byte, value []byte) {
l := uint32(len(key) + len(value) + 1 + 2)
binary.Write(b, binary.BigEndian, l)
b.WriteByte(kTypePutEvent)
keyLen := uint16(len(key))
binary.Write(b, binary.BigEndian, keyLen)
b.Write(key)
b.Write(value)
}
func (b *eventBatch) Delete(key []byte) {
l := uint32(len(key) + 1)
binary.Write(b, binary.BigEndian, l)
b.WriteByte(kTypeDeleteEvent)
b.Write(key)
}
type eventWriter interface {
Put(key []byte, value []byte)
Delete(key []byte)
}
func decodeEventBatch(w eventWriter, data []byte) error {
for {
if len(data) == 0 {
return nil
}
if len(data) < 4 {
return io.ErrUnexpectedEOF
}
l := binary.BigEndian.Uint32(data)
data = data[4:]
if uint32(len(data)) < l {
return io.ErrUnexpectedEOF
}
if err := decodeEvent(w, data[0:l]); err != nil {
return err
}
data = data[l:]
}
}
func decodeEvent(w eventWriter, b []byte) error {
if len(b) == 0 {
return errInvalidEvent
}
switch b[0] {
case kTypePutEvent:
if len(b[1:]) < 2 {
return errInvalidPutEvent
}
keyLen := binary.BigEndian.Uint16(b[1:3])
b = b[3:]
if len(b) < int(keyLen) {
return errInvalidPutEvent
}
w.Put(b[0:keyLen], b[keyLen:])
case kTypeDeleteEvent:
w.Delete(b[1:])
default:
return errInvalidEvent
}
return nil
}
var errInvalidEvent = errors.New("invalid event")
func formatEventKey(buf []byte, k []byte) ([]byte, error) {
if len(k) < 2 {

View File

@ -1,56 +0,0 @@
package ledis
import (
"reflect"
"testing"
)
type testEvent struct {
Key []byte
Value []byte
}
type testEventWriter struct {
evs []testEvent
}
func (w *testEventWriter) Put(key []byte, value []byte) {
e := testEvent{key, value}
w.evs = append(w.evs, e)
}
func (w *testEventWriter) Delete(key []byte) {
e := testEvent{key, nil}
w.evs = append(w.evs, e)
}
func TestEvent(t *testing.T) {
k1 := []byte("k1")
v1 := []byte("v1")
k2 := []byte("k2")
k3 := []byte("k3")
v3 := []byte("v3")
b := new(eventBatch)
b.Put(k1, v1)
b.Delete(k2)
b.Put(k3, v3)
buf := b.Bytes()
w := &testEventWriter{}
ev2 := &testEventWriter{
evs: []testEvent{
testEvent{k1, v1},
testEvent{k2, nil},
testEvent{k3, v3}},
}
if err := decodeEventBatch(w, buf); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(w, ev2) {
t.Fatal("not equal")
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/siddontang/go/log"
"github.com/siddontang/go/snappy"
"github.com/siddontang/ledisdb/rpl"
"github.com/siddontang/ledisdb/store"
"io"
"time"
)
@ -49,7 +50,12 @@ func (l *Ledis) handleReplication() error {
}
}
decodeEventBatch(l.rbatch, rl.Data)
if bd, err := store.NewBatchData(rl.Data); err != nil {
log.Error("decode batch log error %s", err.Error())
return err
} else if err = bd.Replay(l.rbatch); err != nil {
log.Error("replay batch log error %s", err.Error())
}
l.commitLock.Lock()
if err = l.rbatch.Commit(); err != nil {

View File

@ -16,7 +16,7 @@ type Tx struct {
tx *store.Tx
eb *eventBatch
data *store.BatchData
}
func (db *DB) IsTransaction() bool {
@ -32,7 +32,7 @@ func (db *DB) Begin() (*Tx, error) {
tx := new(Tx)
tx.eb = new(eventBatch)
tx.data = &store.BatchData{}
tx.DB = new(DB)
tx.DB.l = db.l
@ -71,7 +71,8 @@ func (tx *Tx) Commit() error {
return ErrTxDone
}
err := tx.l.handleCommit(tx.eb, tx.tx)
err := tx.l.handleCommit(tx.data, tx.tx)
tx.data.Reset()
tx.tx = nil
@ -88,7 +89,7 @@ func (tx *Tx) Rollback() error {
}
err := tx.tx.Rollback()
tx.eb.Reset()
tx.data.Reset()
tx.tx = nil
tx.l.wLock.Unlock()

View File

@ -352,10 +352,7 @@ func testBatchData(db *DB, t *testing.T) {
w.Put([]byte("b"), nil)
w.Delete([]byte("c"))
d, err := w.Data()
if err != nil {
t.Fatal(err)
}
d := w.BatchData()
if kvs, err := d.Items(); err != nil {
t.Fatal(err)

View File

@ -53,9 +53,21 @@ func (wb *WriteBatch) Rollback() error {
return wb.wb.Rollback()
}
func (wb *WriteBatch) Data() (*BatchData, error) {
// the data will be undefined after commit or rollback
func (wb *WriteBatch) BatchData() *BatchData {
data := wb.wb.Data()
return NewBatchData(data)
d, err := NewBatchData(data)
if err != nil {
//can not enter this
panic(err)
}
return d
}
func (wb *WriteBatch) Data() []byte {
b := wb.BatchData()
return b.Data()
}
const BatchDataHeadLen = 12
@ -84,8 +96,8 @@ func (d *BatchData) Append(do *BatchData) error {
n := d.Len() + do.Len()
binary.LittleEndian.PutUint32(d1[8:], uint32(n))
d1 = append(d1, d2[BatchDataHeadLen:]...)
binary.LittleEndian.PutUint32(d1[8:], uint32(n))
return d.Load(d1)
}
@ -94,6 +106,10 @@ func (d *BatchData) Data() []byte {
return d.Dump()
}
func (d *BatchData) Reset() {
d.Batch.Reset()
}
type BatchDataReplay interface {
Put(key, value []byte)
Delete(key []byte)