mirror of https://github.com/ledisdb/ledisdb.git
add replication test
This commit is contained in:
parent
ef22d7000d
commit
ebd15e5738
|
@ -265,7 +265,7 @@ func (l *BinLog) FormatLogFileName(index int64) string {
|
|||
}
|
||||
|
||||
func (l *BinLog) FormatLogFilePath(index int64) string {
|
||||
return path.Join(l.cfg.Path, fmt.Sprintf("ledis-bin.%07d", index))
|
||||
return path.Join(l.cfg.Path, l.FormatLogFileName(index))
|
||||
}
|
||||
|
||||
func (l *BinLog) LogPath() string {
|
||||
|
|
|
@ -143,3 +143,8 @@ func (l *Ledis) FlushAll() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
//very dangerous to use
|
||||
func (l *Ledis) DataDB() *leveldb.DB {
|
||||
return l.ldb
|
||||
}
|
||||
|
|
|
@ -130,44 +130,53 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
const maxSyncEvents = 16
|
||||
|
||||
//events data format
|
||||
// nextfileIndex(bigendian int64)|nextfilePos(bigendian int64)|binlogevents
|
||||
func (l *Ledis) ReadEventsTo(index int64, offset int64, w io.Writer) (info *MasterInfo, err error) {
|
||||
info = new(MasterInfo)
|
||||
const maxSyncEvents = 64
|
||||
|
||||
func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) {
|
||||
n = 0
|
||||
if l.binlog == nil {
|
||||
//binlog not supported
|
||||
info.LogFileIndex = 0
|
||||
info.LogPos = 0
|
||||
return
|
||||
}
|
||||
|
||||
info.LogFileIndex = index
|
||||
info.LogPos = offset
|
||||
index := info.LogFileIndex
|
||||
offset := info.LogPos
|
||||
|
||||
filePath := l.binlog.FormatLogFilePath(index)
|
||||
|
||||
var f *os.File
|
||||
f, err = os.Open(filePath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return
|
||||
} else if os.IsNotExist(err) {
|
||||
l.Lock()
|
||||
if os.IsNotExist(err) {
|
||||
lastIndex := l.binlog.LogFileIndex()
|
||||
|
||||
if index == lastIndex {
|
||||
//no binlog at all
|
||||
l.Unlock()
|
||||
return
|
||||
info.LogPos = 0
|
||||
} else {
|
||||
//slave binlog info had lost
|
||||
info.LogFileIndex = -1
|
||||
}
|
||||
l.Unlock()
|
||||
}
|
||||
|
||||
//slave binlog info had lost
|
||||
info.LogFileIndex = -1
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
var fileSize int64
|
||||
st, _ := f.Stat()
|
||||
fileSize = st.Size()
|
||||
|
||||
if fileSize == info.LogPos {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
|
||||
//may be invliad seek offset
|
||||
return
|
||||
|
@ -177,27 +186,29 @@ func (l *Ledis) ReadEventsTo(index int64, offset int64, w io.Writer) (info *Mast
|
|||
var createTime uint32
|
||||
var dataLen uint32
|
||||
|
||||
var n int = 0
|
||||
var eventsNum int = 0
|
||||
|
||||
for {
|
||||
if err = binary.Read(f, binary.BigEndian, &createTime); err != nil {
|
||||
if err == io.EOF {
|
||||
//we will try to use next binlog
|
||||
info.LogFileIndex = index + 1
|
||||
info.LogPos = 0
|
||||
|
||||
if index < l.binlog.LogFileIndex() {
|
||||
info.LogFileIndex += 1
|
||||
info.LogPos = 0
|
||||
}
|
||||
err = nil
|
||||
return
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n++
|
||||
eventsNum++
|
||||
if lastCreateTime == 0 {
|
||||
lastCreateTime = createTime
|
||||
} else if lastCreateTime != createTime {
|
||||
return
|
||||
} else if n > maxSyncEvents {
|
||||
} else if eventsNum > maxSyncEvents {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -217,6 +228,7 @@ func (l *Ledis) ReadEventsTo(index int64, offset int64, w io.Writer) (info *Mast
|
|||
return
|
||||
}
|
||||
|
||||
n += (8 + int(dataLen))
|
||||
info.LogPos = info.LogPos + 8 + int64(dataLen)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,23 +2,44 @@ package ledis
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/siddontang/go-leveldb/leveldb"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func checkLedisEqual(master *Ledis, slave *Ledis) error {
|
||||
it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
key := it.Key()
|
||||
value := it.Value()
|
||||
|
||||
if v, err := slave.ldb.Get(key); err != nil {
|
||||
return err
|
||||
} else if !bytes.Equal(v, value) {
|
||||
return fmt.Errorf("replication error %d != %d", len(v), len(value))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestReplication(t *testing.T) {
|
||||
var master *Ledis
|
||||
var slave *Ledis
|
||||
var err error
|
||||
|
||||
os.RemoveAll("/tmp/repl_repl")
|
||||
os.RemoveAll("/tmp/test_repl")
|
||||
|
||||
master, err = Open([]byte(`
|
||||
{
|
||||
"data_dir" : "/tmp/test_repl/master",
|
||||
"use_bin_log" : true
|
||||
}
|
||||
"use_bin_log" : true,
|
||||
"bin_log" : {
|
||||
"max_file_size" : 50
|
||||
}
|
||||
}
|
||||
`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -34,26 +55,63 @@ func TestReplication(t *testing.T) {
|
|||
}
|
||||
|
||||
db, _ := master.Select(0)
|
||||
db.Set([]byte("a"), []byte("1"))
|
||||
db.Set([]byte("b"), []byte("2"))
|
||||
db.Set([]byte("c"), []byte("3"))
|
||||
db.Set([]byte("a"), []byte("value"))
|
||||
db.Set([]byte("b"), []byte("value"))
|
||||
db.Set([]byte("c"), []byte("value"))
|
||||
|
||||
binLogName := "/tmp/test_repl/master/bin_log/ledis-bin.0000001"
|
||||
db.HSet([]byte("a"), []byte("1"), []byte("value"))
|
||||
db.HSet([]byte("b"), []byte("2"), []byte("value"))
|
||||
db.HSet([]byte("c"), []byte("3"), []byte("value"))
|
||||
|
||||
err = slave.ReplicateFromBinLog(binLogName)
|
||||
if err != nil {
|
||||
for _, name := range master.binlog.LogNames() {
|
||||
p := path.Join(master.binlog.cfg.Path, name)
|
||||
|
||||
err = slave.ReplicateFromBinLog(p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
if err = checkLedisEqual(master, slave); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
it := master.ldb.Iterator(nil, nil, leveldb.RangeClose, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
key := it.Key()
|
||||
value := it.Value()
|
||||
slave.FlushAll()
|
||||
|
||||
if v, err := slave.ldb.Get(key); err != nil {
|
||||
db.Set([]byte("a1"), []byte("1"))
|
||||
db.Set([]byte("b1"), []byte("2"))
|
||||
db.Set([]byte("c1"), []byte("3"))
|
||||
|
||||
db.HSet([]byte("a1"), []byte("1"), []byte("value"))
|
||||
db.HSet([]byte("b1"), []byte("2"), []byte("value"))
|
||||
db.HSet([]byte("c1"), []byte("3"), []byte("value"))
|
||||
|
||||
info := new(MasterInfo)
|
||||
info.LogFileIndex = 1
|
||||
info.LogPos = 0
|
||||
var buf bytes.Buffer
|
||||
var n int
|
||||
|
||||
for {
|
||||
buf.Reset()
|
||||
n, err = master.ReadEventsTo(info, &buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !bytes.Equal(v, value) {
|
||||
t.Fatal("replication error", len(v), len(value))
|
||||
} else if info.LogFileIndex == -1 {
|
||||
t.Fatal("invalid log file index -1")
|
||||
} else if info.LogFileIndex == 0 {
|
||||
t.Fatal("invalid log file index 0")
|
||||
} else {
|
||||
if err = slave.ReplicateFromReader(&buf); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = checkLedisEqual(master, slave); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,9 @@ func syncCommand(c *client) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if m, err := c.app.ldb.ReadEventsTo(logIndex, logPos, &c.syncBuf); err != nil {
|
||||
m := &ledis.MasterInfo{logIndex, logPos}
|
||||
|
||||
if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil {
|
||||
return err
|
||||
} else {
|
||||
buf := c.syncBuf.Bytes()
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/siddontang/go-leveldb/leveldb"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func checkDataEqual(master *App, slave *App) error {
|
||||
it := master.ldb.DataDB().Iterator(nil, nil, leveldb.RangeClose, 0, -1)
|
||||
for ; it.Valid(); it.Next() {
|
||||
key := it.Key()
|
||||
value := it.Value()
|
||||
|
||||
if v, err := slave.ldb.DataDB().Get(key); err != nil {
|
||||
return err
|
||||
} else if !bytes.Equal(v, value) {
|
||||
return fmt.Errorf("replication error %d != %d", len(v), len(value))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestReplication(t *testing.T) {
|
||||
data_dir := "/tmp/test_replication"
|
||||
os.RemoveAll(data_dir)
|
||||
|
||||
masterCfg := new(Config)
|
||||
masterCfg.DataDir = fmt.Sprintf("%s/master", data_dir)
|
||||
masterCfg.Addr = "127.0.0.1:11182"
|
||||
masterCfg.DB.UseBinLog = true
|
||||
|
||||
var master *App
|
||||
var slave *App
|
||||
var err error
|
||||
master, err = NewApp(masterCfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
slaveCfg := new(Config)
|
||||
slaveCfg.DataDir = fmt.Sprintf("%s/slave", data_dir)
|
||||
slaveCfg.Addr = "127.0.0.1:11183"
|
||||
slaveCfg.SlaveOf = masterCfg.Addr
|
||||
|
||||
slave, err = NewApp(slaveCfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go master.Run()
|
||||
|
||||
db, _ := master.ldb.Select(0)
|
||||
|
||||
value := make([]byte, 10)
|
||||
|
||||
db.Set([]byte("a"), value)
|
||||
db.Set([]byte("b"), value)
|
||||
db.HSet([]byte("a"), []byte("1"), value)
|
||||
db.HSet([]byte("b"), []byte("2"), value)
|
||||
|
||||
go slave.Run()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if err = checkDataEqual(master, slave); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db.Set([]byte("a1"), value)
|
||||
db.Set([]byte("b1"), value)
|
||||
db.HSet([]byte("a1"), []byte("1"), value)
|
||||
db.HSet([]byte("b1"), []byte("2"), value)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
if err = checkDataEqual(master, slave); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
slave.slaveof("")
|
||||
|
||||
db.Set([]byte("a2"), value)
|
||||
db.Set([]byte("b2"), value)
|
||||
db.HSet([]byte("a2"), []byte("1"), value)
|
||||
db.HSet([]byte("b2"), []byte("2"), value)
|
||||
|
||||
db.Set([]byte("a3"), value)
|
||||
db.Set([]byte("b3"), value)
|
||||
db.HSet([]byte("a3"), []byte("1"), value)
|
||||
db.HSet([]byte("b3"), []byte("2"), value)
|
||||
|
||||
if err = checkDataEqual(master, slave); err == nil {
|
||||
t.Fatal("must error")
|
||||
}
|
||||
|
||||
slave.slaveof(masterCfg.Addr)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if err = checkDataEqual(master, slave); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
}
|
|
@ -51,7 +51,7 @@ func newMaster(app *App) *master {
|
|||
m.infoName = path.Join(m.app.cfg.DataDir, "master.info")
|
||||
m.infoNameBak = fmt.Sprintf("%s.bak", m.infoName)
|
||||
|
||||
m.quit = make(chan struct{})
|
||||
m.quit = make(chan struct{}, 1)
|
||||
|
||||
//if load error, we will start a fullsync later
|
||||
m.loadInfo()
|
||||
|
@ -60,7 +60,10 @@ func newMaster(app *App) *master {
|
|||
}
|
||||
|
||||
func (m *master) Close() {
|
||||
close(m.quit)
|
||||
select {
|
||||
case m.quit <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
if m.c != nil {
|
||||
m.c.Close()
|
||||
|
@ -157,7 +160,7 @@ func (m *master) startReplication(masterAddr string) error {
|
|||
}
|
||||
}
|
||||
|
||||
m.quit = make(chan struct{})
|
||||
m.quit = make(chan struct{}, 1)
|
||||
|
||||
go m.runReplication()
|
||||
return nil
|
||||
|
@ -193,18 +196,26 @@ func (m *master) runReplication() {
|
|||
}
|
||||
}
|
||||
|
||||
t := time.NewTicker(1 * time.Second)
|
||||
|
||||
//then we will try sync every 1 seconds
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
for {
|
||||
lastIndex := m.logFileIndex
|
||||
lastPos := m.logPos
|
||||
if err := m.sync(); err != nil {
|
||||
log.Warn("sync error %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if m.logFileIndex == lastIndex && m.logPos == lastPos {
|
||||
//sync no data, wait 1s and retry
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-m.quit:
|
||||
return
|
||||
case <-time.After(1 * time.Second):
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -213,8 +224,8 @@ func (m *master) runReplication() {
|
|||
}
|
||||
|
||||
var (
|
||||
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
||||
syncCmdFormat = "*3\r\n$4\r\nsync\r\n$%d\r\n%s\r\n%d\r\n%s\r\n" //sync file pos
|
||||
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
|
||||
syncCmdFormat = "*3\r\n$4\r\nsync\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n" //sync index pos
|
||||
)
|
||||
|
||||
func (m *master) fullSync() error {
|
||||
|
@ -259,8 +270,9 @@ func (m *master) sync() error {
|
|||
logIndexStr := strconv.FormatInt(m.logFileIndex, 10)
|
||||
logPosStr := strconv.FormatInt(m.logPos, 10)
|
||||
|
||||
if _, err := m.c.Write(ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
|
||||
logIndexStr, len(logPosStr), logPosStr))); err != nil {
|
||||
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
|
||||
logIndexStr, len(logPosStr), logPosStr))
|
||||
if _, err := m.c.Write(cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ func readLine(rb *bufio.Reader) ([]byte, error) {
|
|||
func readBulkTo(rb *bufio.Reader, w io.Writer) error {
|
||||
l, err := readLine(rb)
|
||||
if len(l) == 0 {
|
||||
return errArrayFormat
|
||||
return errBulkFormat
|
||||
} else if l[0] == '$' {
|
||||
var n int
|
||||
//handle resp string
|
||||
|
@ -50,7 +50,7 @@ func readBulkTo(rb *bufio.Reader, w io.Writer) error {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
return errArrayFormat
|
||||
return errBulkFormat
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue