ledisdb/ledis/replication_test.go

139 lines
3.0 KiB
Go

package ledis
import (
"bytes"
"fmt"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store"
"os"
"path"
"testing"
)
func checkLedisEqual(master *Ledis, slave *Ledis) error {
it := master.ldb.RangeLimitIterator(nil, nil, store.RangeClose, 0, -1)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
if v, err := slave.ldb.Get(key); err != nil {
return err
} else if !bytes.Equal(v, value) {
return fmt.Errorf("replication error %d != %d", len(v), len(value))
}
}
return nil
}
func TestReplication(t *testing.T) {
var master *Ledis
var slave *Ledis
var err error
cfgM := new(config.Config)
cfgM.DataDir = "/tmp/test_repl/master"
cfgM.UseBinLog = true
cfgM.BinLog.MaxFileNum = 10
cfgM.BinLog.MaxFileSize = 50
os.RemoveAll(cfgM.DataDir)
master, err = Open(cfgM)
if err != nil {
t.Fatal(err)
}
cfgS := new(config.Config)
cfgS.DataDir = "/tmp/test_repl/slave"
os.RemoveAll(cfgS.DataDir)
slave, err = Open(cfgS)
if err != nil {
t.Fatal(err)
}
db, _ := master.Select(0)
db.Set([]byte("a"), []byte("value"))
db.Set([]byte("b"), []byte("value"))
db.Set([]byte("c"), []byte("value"))
if tx, err := db.Begin(); err == nil {
tx.HSet([]byte("a"), []byte("1"), []byte("value"))
tx.HSet([]byte("b"), []byte("2"), []byte("value"))
tx.HSet([]byte("c"), []byte("3"), []byte("value"))
tx.Commit()
} else {
db.HSet([]byte("a"), []byte("1"), []byte("value"))
db.HSet([]byte("b"), []byte("2"), []byte("value"))
db.HSet([]byte("c"), []byte("3"), []byte("value"))
}
m, _ := db.Multi()
m.Set([]byte("a1"), []byte("value"))
m.Set([]byte("b1"), []byte("value"))
m.Set([]byte("c1"), []byte("value"))
m.Close()
for _, name := range master.binlog.LogNames() {
p := path.Join(master.binlog.LogPath(), name)
err = slave.ReplicateFromBinLog(p)
if err != nil {
t.Fatal(err)
}
}
if err = checkLedisEqual(master, slave); err != nil {
t.Fatal(err)
}
slave.FlushAll()
db.Set([]byte("a1"), []byte("value"))
db.Set([]byte("b1"), []byte("value"))
db.Set([]byte("c1"), []byte("value"))
db.HSet([]byte("a1"), []byte("1"), []byte("value"))
db.HSet([]byte("b1"), []byte("2"), []byte("value"))
db.HSet([]byte("c1"), []byte("3"), []byte("value"))
if tx, err := db.Begin(); err == nil {
tx.HSet([]byte("a1"), []byte("1"), []byte("value1"))
tx.HSet([]byte("b1"), []byte("2"), []byte("value1"))
tx.HSet([]byte("c1"), []byte("3"), []byte("value1"))
tx.Rollback()
}
info := new(BinLogAnchor)
info.LogFileIndex = 1
info.LogPos = 0
var buf bytes.Buffer
var n int
for {
buf.Reset()
n, err = master.ReadEventsTo(info, &buf)
if err != nil {
t.Fatal(err)
} else if info.LogFileIndex == -1 {
t.Fatal("invalid log file index -1")
} else if info.LogFileIndex == 0 {
t.Fatal("invalid log file index 0")
} else {
if err = slave.ReplicateFromReader(&buf); err != nil {
t.Fatal(err)
}
if n == 0 {
break
}
}
}
if err = checkLedisEqual(master, slave); err != nil {
t.Fatal(err)
}
}