refactor, remove bin log, add consensus replication

This commit is contained in:
siddontang 2014-09-23 17:28:09 +08:00
parent 59e974c258
commit 1a1250d949
22 changed files with 364 additions and 279 deletions

View File

@ -2,7 +2,6 @@ package main
import (
"flag"
"fmt"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/ledis"
)

View File

@ -30,9 +30,10 @@ type LMDBConfig struct {
}
type ReplicationConfig struct {
Use bool `toml:"use"`
Path string `toml:"path"`
ExpiredLogDays int `toml:"expired_log_days"`
Sync bool `toml:"sync"`
WaitSyncTime int `toml:"wait_sync_time"`
}
type Config struct {
@ -54,6 +55,7 @@ type Config struct {
AccessLog string `toml:"access_log"`
UseReplication bool `toml:"use_replication"`
Replication ReplicationConfig `toml:"replication"`
}
@ -95,6 +97,8 @@ func NewConfigDefault() *Config {
cfg.LMDB.MapSize = 20 * 1024 * 1024
cfg.LMDB.NoSync = true
cfg.Replication.WaitSyncTime = 1
return cfg
}

View File

@ -30,6 +30,9 @@ db_name = "leveldb"
# If not set, use data_dir/"db_name"_data
db_path = ""
# enable replication or not
use_replication = true
[leveldb]
compression = false
block_size = 32768
@ -42,9 +45,6 @@ map_size = 524288000
nosync = true
[replication]
# enable replication or not
use = true
# Path to store replication information(write ahead log, commit log, etc.)
# if not set, use data_dir/rpl
path = ""
@ -52,4 +52,10 @@ path = ""
# Expire write ahead logs after the given days
expired_log_days = 7
# If sync is true, the new log must be sent to some slaves, and then commit.
# It may affect performance.
sync = true
# If sync is true, wait at last wait_sync_time seconds to check whether slave sync this log
wait_sync_time = 1

View File

@ -1,34 +1,13 @@
package config
import (
"reflect"
"testing"
)
func TestConfig(t *testing.T) {
dstCfg := new(Config)
dstCfg.Addr = "127.0.0.1:6380"
dstCfg.HttpAddr = "127.0.0.1:11181"
dstCfg.DataDir = "/tmp/ledis_server"
dstCfg.DBName = "leveldb"
dstCfg.LevelDB.Compression = false
dstCfg.LevelDB.BlockSize = 32768
dstCfg.LevelDB.WriteBufferSize = 67108864
dstCfg.LevelDB.CacheSize = 524288000
dstCfg.LevelDB.MaxOpenFiles = 1024
dstCfg.LMDB.MapSize = 524288000
dstCfg.LMDB.NoSync = true
dstCfg.Replication.Use = true
dstCfg.Replication.ExpiredLogDays = 7
cfg, err := NewConfigWithFile("./config.toml")
_, err := NewConfigWithFile("./config.toml")
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(dstCfg, cfg) {
t.Fatal("parse toml error")
}
}

View File

@ -2474,7 +2474,7 @@ ERR invalid db index 16
### FLUSHALL
Delete all the keys of all the existing databases, not just the currently selected one. This command never fails.
Delete all the keys of all the existing databases and replication logs, not just the currently selected one. This command never fails.
Very dangerous to use!!!

View File

@ -33,6 +33,8 @@ func (b *batch) Commit() error {
return err
}
b.l.propagate(l)
if err = b.WriteBatch.Commit(); err != nil {
log.Fatal("commit error %s", err.Error())
return err

View File

@ -86,7 +86,9 @@ const (
var (
ErrScoreMiss = errors.New("zset score miss")
ErrWriteInROnly = errors.New("write in readonly mode")
ErrWriteInROnly = errors.New("write not support in readonly mode")
ErrRplInRDWR = errors.New("replication not support in read write mode")
ErrRplNotSupport = errors.New("replication not support")
)
const (

View File

@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"encoding/binary"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-snappy/snappy"
"github.com/siddontang/ledisdb/store"
"io"
@ -126,34 +125,13 @@ func (l *Ledis) LoadDumpFile(path string) (*DumpHead, error) {
return l.LoadDump(f)
}
func (l *Ledis) clearAllWhenLoad() error {
it := l.ldb.NewIterator()
defer it.Close()
w := l.ldb.NewWriteBatch()
defer w.Rollback()
n := 0
for ; it.Valid(); it.Next() {
n++
if n == 10000 {
w.Commit()
n = 0
}
w.Delete(it.RawKey())
}
return w.Commit()
}
// clear all data and load dump file to db
func (l *Ledis) LoadDump(r io.Reader) (*DumpHead, error) {
l.wLock.Lock()
defer l.wLock.Unlock()
var err error
if err = l.clearAllWhenLoad(); err != nil {
log.Fatal("clear all error when loaddump, err :%s", err.Error())
if err = l.flushAll(); err != nil {
return nil, err
}

View File

@ -19,10 +19,12 @@ type Ledis struct {
quit chan struct{}
wg sync.WaitGroup
//for replication
r *rpl.Replication
rc chan struct{}
rbatch store.WriteBatch
rwg sync.WaitGroup
rhs []NewLogEventHandler
wLock sync.RWMutex //allow one write at same time
commitLock sync.Mutex //allow one write commit at same time
@ -53,15 +55,19 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) {
l.ldb = ldb
if cfg.Replication.Use {
if cfg.UseReplication {
if l.r, err = rpl.NewReplication(cfg); err != nil {
return nil, err
}
l.rc = make(chan struct{}, 1)
l.rc = make(chan struct{}, 8)
l.rbatch = l.ldb.NewWriteBatch()
go l.onReplication()
//first we must try wait all replication ok
//maybe some logs are not committed
l.WaitReplication()
} else {
l.r = nil
}
@ -95,10 +101,43 @@ func (l *Ledis) Select(index int) (*DB, error) {
return l.dbs[index], nil
}
// Flush All will clear all data and replication logs
func (l *Ledis) FlushAll() error {
for index, db := range l.dbs {
if _, err := db.FlushAll(); err != nil {
log.Error("flush db %d error %s", index, err.Error())
l.wLock.Lock()
defer l.wLock.Unlock()
return l.flushAll()
}
func (l *Ledis) flushAll() error {
it := l.ldb.NewIterator()
defer it.Close()
w := l.ldb.NewWriteBatch()
defer w.Rollback()
n := 0
for ; it.Valid(); it.Next() {
n++
if n == 10000 {
if err := w.Commit(); err != nil {
log.Fatal("flush all commit error: %s", err.Error())
return err
}
n = 0
}
w.Delete(it.RawKey())
}
if err := w.Commit(); err != nil {
log.Fatal("flush all commit error: %s", err.Error())
return err
}
if l.r != nil {
if err := l.r.Clear(); err != nil {
log.Fatal("flush all replication clear error: %s", err.Error())
return err
}
}

View File

@ -3,7 +3,6 @@ package ledis
import (
"bytes"
"errors"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/rpl"
"io"
@ -18,6 +17,10 @@ var (
ErrLogMissed = errors.New("log is pured in server")
)
func (l *Ledis) ReplicationUsed() bool {
return l.r != nil
}
func (l *Ledis) handleReplication() {
l.commitLock.Lock()
defer l.commitLock.Unlock()
@ -25,7 +28,7 @@ func (l *Ledis) handleReplication() {
l.rwg.Add(1)
rl := &rpl.Log{}
for {
if err := l.r.NextCommitLog(rl); err != nil {
if err := l.r.NextNeedCommitLog(rl); err != nil {
if err != rpl.ErrNoBehindLog {
log.Error("get next commit log err, %s", err.Error)
} else {
@ -47,9 +50,7 @@ func (l *Ledis) handleReplication() {
}
func (l *Ledis) onReplication() {
if l.r == nil {
return
}
AsyncNotify(l.rc)
for {
select {
@ -62,11 +63,19 @@ func (l *Ledis) onReplication() {
}
func (l *Ledis) WaitReplication() error {
if !l.ReplicationUsed() {
return ErrRplNotSupport
}
AsyncNotify(l.rc)
l.rwg.Wait()
b, err := l.r.CommitIDBehind()
if err != nil {
return err
} else if b {
l.rc <- struct{}{}
AsyncNotify(l.rc)
l.rwg.Wait()
}
@ -74,8 +83,10 @@ func (l *Ledis) WaitReplication() error {
}
func (l *Ledis) StoreLogsFromReader(rb io.Reader) error {
if l.r == nil {
return fmt.Errorf("replication not enable")
if !l.ReplicationUsed() {
return ErrRplNotSupport
} else if !l.readOnly {
return ErrRplInRDWR
}
log := &rpl.Log{}
@ -95,11 +106,7 @@ func (l *Ledis) StoreLogsFromReader(rb io.Reader) error {
}
select {
case l.rc <- struct{}{}:
default:
break
}
AsyncNotify(l.rc)
return nil
}
@ -111,9 +118,10 @@ func (l *Ledis) StoreLogsFromData(data []byte) error {
}
func (l *Ledis) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uint64, err error) {
if l.r == nil {
if !l.ReplicationUsed() {
// no replication log
nextLogID = 0
err = ErrRplNotSupport
return
}
@ -134,6 +142,8 @@ func (l *Ledis) ReadLogsTo(startLogID uint64, w io.Writer) (n int, nextLogID uin
return
}
nextLogID = startLogID
log := &rpl.Log{}
for i := startLogID; i <= lastID; i++ {
if err = l.r.GetLog(i, log); err != nil {
@ -161,14 +171,48 @@ func (l *Ledis) ReadLogsToTimeout(startLogID uint64, w io.Writer, timeout int) (
n, nextLogID, err = l.ReadLogsTo(startLogID, w)
if err != nil {
return
} else if n == 0 || nextLogID == 0 {
} else if n != 0 {
return
}
//no events read
select {
//case <-l.binlog.Wait():
case <-l.r.WaitLog():
case <-time.After(time.Duration(timeout) * time.Second):
}
return l.ReadLogsTo(startLogID, w)
}
func (l *Ledis) NextSyncLogID() (uint64, error) {
if !l.ReplicationUsed() {
return 0, ErrRplNotSupport
}
s, err := l.r.Stat()
if err != nil {
return 0, err
}
if s.LastID > s.CommitID {
return s.LastID + 1, nil
} else {
return s.CommitID + 1, nil
}
}
func (l *Ledis) propagate(rl *rpl.Log) {
for _, h := range l.rhs {
h(rl)
}
}
type NewLogEventHandler func(rl *rpl.Log)
func (l *Ledis) AddNewLogEventHandler(h NewLogEventHandler) error {
if !l.ReplicationUsed() {
return ErrRplNotSupport
}
l.rhs = append(l.rhs, h)
return nil
}

View File

@ -33,7 +33,7 @@ func TestReplication(t *testing.T) {
cfgM := new(config.Config)
cfgM.DataDir = "/tmp/test_repl/master"
cfgM.Replication.Use = true
cfgM.UseReplication = true
os.RemoveAll(cfgM.DataDir)
@ -44,11 +44,11 @@ func TestReplication(t *testing.T) {
cfgS := new(config.Config)
cfgS.DataDir = "/tmp/test_repl/slave"
cfgS.Replication.Use = true
cfgS.UseReplication = true
os.RemoveAll(cfgS.DataDir)
slave, err = Open(cfgS)
slave, err = Open2(cfgS, ROnlyMode)
if err != nil {
t.Fatal(err)
}

View File

@ -183,7 +183,6 @@ func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) {
}
t.Put(key, value)
//todo, binlog
err = t.Commit()

View File

@ -74,6 +74,16 @@ func StrInt64(v []byte, err error) (int64, error) {
}
}
func StrUint64(v []byte, err error) (uint64, error) {
if err != nil {
return 0, err
} else if v == nil {
return 0, nil
} else {
return strconv.ParseUint(String(v), 10, 64)
}
}
func StrInt32(v []byte, err error) (int32, error) {
if err != nil {
return 0, err
@ -123,3 +133,10 @@ func MaxInt32(a int32, b int32) int32 {
return b
}
}
func AsyncNotify(ch chan struct{}) {
select {
case ch <- struct{}{}:
default:
}
}

View File

@ -2,7 +2,6 @@ package rpl
import (
"encoding/binary"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/ledisdb/config"
"os"
@ -11,6 +10,12 @@ import (
"time"
)
type Stat struct {
FirstID uint64
LastID uint64
CommitID uint64
}
type Replication struct {
m sync.Mutex
@ -24,13 +29,11 @@ type Replication struct {
quit chan struct{}
wg sync.WaitGroup
nc chan struct{}
}
func NewReplication(cfg *config.Config) (*Replication, error) {
if !cfg.Replication.Use {
return nil, fmt.Errorf("replication not enalbed")
}
if len(cfg.Replication.Path) == 0 {
cfg.Replication.Path = path.Join(cfg.DataDir, "rpl")
}
@ -40,6 +43,7 @@ func NewReplication(cfg *config.Config) (*Replication, error) {
r := new(Replication)
r.quit = make(chan struct{})
r.nc = make(chan struct{})
r.cfg = cfg
@ -105,9 +109,16 @@ func (r *Replication) Log(data []byte) (*Log, error) {
return nil, err
}
close(r.nc)
r.nc = make(chan struct{})
return l, nil
}
func (r *Replication) WaitLog() <-chan struct{} {
return r.nc
}
func (r *Replication) StoreLog(log *Log) error {
return r.StoreLogs([]*Log{log})
}
@ -133,22 +144,6 @@ func (r *Replication) LastLogID() (uint64, error) {
return id, err
}
func (r *Replication) NextSyncID() (uint64, error) {
r.m.Lock()
defer r.m.Unlock()
lastId, err := r.s.LastID()
if err != nil {
return 0, err
}
if lastId > r.commitID {
return lastId + 1, nil
} else {
return r.commitID + 1, nil
}
}
func (r *Replication) LastCommitID() (uint64, error) {
r.m.Lock()
id := r.commitID
@ -160,6 +155,29 @@ func (r *Replication) UpdateCommitID(id uint64) error {
r.m.Lock()
defer r.m.Unlock()
return r.updateCommitID(id)
}
func (r *Replication) Stat() (*Stat, error) {
r.m.Lock()
defer r.m.Unlock()
s := &Stat{}
var err error
if s.FirstID, err = r.s.FirstID(); err != nil {
return nil, err
}
if s.LastID, err = r.s.LastID(); err != nil {
return nil, err
}
s.CommitID = r.commitID
return s, nil
}
func (r *Replication) updateCommitID(id uint64) error {
if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
return err
}
@ -189,7 +207,7 @@ func (r *Replication) GetLog(id uint64, log *Log) error {
return r.s.GetLog(id, log)
}
func (r *Replication) NextCommitLog(log *Log) error {
func (r *Replication) NextNeedCommitLog(log *Log) error {
r.m.Lock()
defer r.m.Unlock()
@ -206,6 +224,21 @@ func (r *Replication) NextCommitLog(log *Log) error {
}
func (r *Replication) Clear() error {
return r.ClearWithCommitID(0)
}
func (r *Replication) ClearWithCommitID(id uint64) error {
r.m.Lock()
defer r.m.Unlock()
if err := r.s.Clear(); err != nil {
return err
}
return r.updateCommitID(id)
}
func (r *Replication) onPurgeExpired() {
r.wg.Add(1)
defer r.wg.Done()

View File

@ -15,7 +15,6 @@ func TestReplication(t *testing.T) {
defer os.RemoveAll(dir)
c := new(config.Config)
c.Replication.Use = true
c.Replication.Path = dir
r, err := NewReplication(c)

View File

@ -7,6 +7,7 @@ import (
"net/http"
"path"
"strings"
"sync"
)
type App struct {
@ -29,6 +30,10 @@ type App struct {
info *info
s *script
// handle slaves
slock sync.Mutex
slaves map[*client]struct{}
}
func netType(s string) string {
@ -53,6 +58,8 @@ func NewApp(cfg *config.Config) (*App, error) {
app.cfg = cfg
app.slaves = make(map[*client]struct{})
var err error
if app.info, err = newInfo(app); err != nil {
@ -89,6 +96,8 @@ func NewApp(cfg *config.Config) (*App, error) {
app.openScript()
app.ldb.AddNewLogEventHandler(app.publishNewLog)
return app, nil
}

View File

@ -12,9 +12,6 @@ var scriptUnsupportedCmds = map[string]struct{}{
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"begin": struct{}{},
"commit": struct{}{},
"rollback": struct{}{},
"flushall": struct{}{},
"flushdb": struct{}{},
}
@ -32,6 +29,11 @@ type responseWriter interface {
flush()
}
type syncAck struct {
id uint64
ch chan uint64
}
type client struct {
app *App
ldb *ledis.Ledis
@ -47,6 +49,10 @@ type client struct {
syncBuf bytes.Buffer
compressBuf []byte
lastSyncLogID uint64
ack *syncAck
reqErr chan error
buf bytes.Buffer

View File

@ -53,10 +53,7 @@ func (c *respClient) run() {
c.conn.Close()
}
if c.tx != nil {
c.tx.Rollback()
c.tx = nil
}
c.app.removeSlave(c.client)
}()
for {

View File

@ -1,7 +1,6 @@
package server
import (
"encoding/binary"
"fmt"
"github.com/siddontang/go-snappy/snappy"
"github.com/siddontang/ledisdb/ledis"
@ -66,44 +65,36 @@ func fullsyncCommand(c *client) error {
return nil
}
var reserveInfoSpace = make([]byte, 16)
func syncCommand(c *client) error {
args := c.args
if len(args) != 1 {
return ErrCmdParams
}
var logIndex int64
var logPos int64
var logId uint64
var err error
logIndex, err = ledis.Str(args[0], nil)
if err != nil {
if logId, err = ledis.StrUint64(args[0], nil); err != nil {
return ErrCmdParams
}
logPos, err = ledis.StrInt64(args[1], nil)
if err != nil {
return ErrCmdParams
c.lastSyncLogID = logId - 1
if c.ack != nil && logId > c.ack.id {
select {
case c.ack.ch <- logId:
default:
}
c.ack = nil
}
c.syncBuf.Reset()
//reserve space to write binlog anchor
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
return err
}
m := &ledis.BinLogAnchor{logIndex, logPos}
if _, err := c.app.ldb.ReadEventsToTimeout(m, &c.syncBuf, 5); err != nil {
if _, _, err := c.app.ldb.ReadLogsToTimeout(logId, &c.syncBuf, 30); err != nil {
return err
} else {
buf := c.syncBuf.Bytes()
binary.BigEndian.PutUint64(buf[0:], uint64(m.LogFileIndex))
binary.BigEndian.PutUint64(buf[8:], uint64(m.LogPos))
if len(c.compressBuf) < snappy.MaxEncodedLen(len(buf)) {
c.compressBuf = make([]byte, snappy.MaxEncodedLen(len(buf)))
}
@ -115,6 +106,8 @@ func syncCommand(c *client) error {
c.resp.writeBulk(buf)
}
c.app.addSlave(c)
return nil
}

View File

@ -17,7 +17,7 @@ func checkDataEqual(master *App, slave *App) error {
skeys, _ := sdb.Scan(nil, 100, true, "")
if len(mkeys) != len(skeys) {
return fmt.Errorf("keys number not equal")
return fmt.Errorf("keys number not equal %d != %d", len(mkeys), len(skeys))
} else if !reflect.DeepEqual(mkeys, skeys) {
return fmt.Errorf("keys not equal")
} else {
@ -40,7 +40,9 @@ func TestReplication(t *testing.T) {
masterCfg := new(config.Config)
masterCfg.DataDir = fmt.Sprintf("%s/master", data_dir)
masterCfg.Addr = "127.0.0.1:11182"
masterCfg.UseBinLog = true
masterCfg.UseReplication = true
masterCfg.Replication.Sync = true
masterCfg.Replication.WaitSyncTime = 5
var master *App
var slave *App
@ -55,6 +57,7 @@ func TestReplication(t *testing.T) {
slaveCfg.DataDir = fmt.Sprintf("%s/slave", data_dir)
slaveCfg.Addr = "127.0.0.1:11183"
slaveCfg.SlaveOf = masterCfg.Addr
slaveCfg.UseReplication = true
slave, err = NewApp(slaveCfg)
if err != nil {
@ -64,6 +67,9 @@ func TestReplication(t *testing.T) {
go master.Run()
time.Sleep(1 * time.Second)
go slave.Run()
db, _ := master.ldb.Select(0)
value := make([]byte, 10)
@ -73,10 +79,7 @@ func TestReplication(t *testing.T) {
db.Set([]byte("c"), value)
db.Set([]byte("d"), value)
go slave.Run()
time.Sleep(1 * time.Second)
if err = checkDataEqual(master, slave); err != nil {
t.Fatal(err)
}
@ -86,7 +89,9 @@ func TestReplication(t *testing.T) {
db.Set([]byte("c1"), value)
db.Set([]byte("d1"), value)
time.Sleep(1 * time.Second)
//time.Sleep(1 * time.Second)
slave.ldb.WaitReplication()
if err = checkDataEqual(master, slave); err != nil {
t.Fatal(err)
}
@ -108,6 +113,7 @@ func TestReplication(t *testing.T) {
}
slave.slaveof(masterCfg.Addr)
time.Sleep(1 * time.Second)
if err = checkDataEqual(master, slave); err != nil {

View File

@ -24,7 +24,8 @@
// ledis-cli -p 6381
// ledis 127.0.0.1:6381 > slaveof 127.0.0.1 6380
//
// After you send slaveof command, the slave will start to sync master's binlog and replicate from binlog.
// After you send slaveof command, the slave will start to sync master's write ahead log and replicate from it.
// You must notice that use_replication must be set true if you want to use it.
//
// HTTP Interface
//

View File

@ -3,14 +3,12 @@ package server
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-snappy/snappy"
"github.com/siddontang/ledisdb/ledis"
"io/ioutil"
"github.com/siddontang/ledisdb/rpl"
"net"
"os"
"path"
@ -23,52 +21,6 @@ var (
errConnectMaster = errors.New("connect master error")
)
type MasterInfo struct {
Addr string `json:"addr"`
LogFileIndex int64 `json:"log_file_index"`
LogPos int64 `json:"log_pos"`
}
func (m *MasterInfo) Save(filePath string) error {
data, err := json.Marshal(m)
if err != nil {
return err
}
filePathBak := fmt.Sprintf("%s.bak", filePath)
var fd *os.File
fd, err = os.OpenFile(filePathBak, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
if _, err = fd.Write(data); err != nil {
fd.Close()
return err
}
fd.Close()
return os.Rename(filePathBak, filePath)
}
func (m *MasterInfo) Load(filePath string) error {
data, err := ioutil.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil
} else {
return err
}
}
if err = json.Unmarshal(data, m); err != nil {
return err
}
return nil
}
type master struct {
sync.Mutex
@ -79,9 +31,7 @@ type master struct {
quit chan struct{}
infoName string
info *MasterInfo
addr string
wg sync.WaitGroup
@ -94,17 +44,10 @@ func newMaster(app *App) *master {
m := new(master)
m.app = app
m.infoName = path.Join(m.app.cfg.DataDir, "master.info")
m.quit = make(chan struct{}, 1)
m.compressBuf = make([]byte, 256)
m.info = new(MasterInfo)
//if load error, we will start a fullsync later
m.loadInfo()
return m
}
@ -122,16 +65,8 @@ func (m *master) Close() {
m.wg.Wait()
}
func (m *master) loadInfo() error {
return m.info.Load(m.infoName)
}
func (m *master) saveInfo() error {
return m.info.Save(m.infoName)
}
func (m *master) connect() error {
if len(m.info.Addr) == 0 {
if len(m.addr) == 0 {
return fmt.Errorf("no assign master addr")
}
@ -140,7 +75,7 @@ func (m *master) connect() error {
m.conn = nil
}
if conn, err := net.Dial("tcp", m.info.Addr); err != nil {
if conn, err := net.Dial("tcp", m.addr); err != nil {
return err
} else {
m.conn = conn
@ -150,19 +85,10 @@ func (m *master) connect() error {
return nil
}
func (m *master) resetInfo(addr string) {
m.info.Addr = addr
m.info.LogFileIndex = 0
m.info.LogPos = 0
}
func (m *master) stopReplication() error {
m.Close()
if err := m.saveInfo(); err != nil {
log.Error("save master info error %s", err.Error())
return err
}
m.app.ldb.SetReadOnly(false)
return nil
}
@ -171,16 +97,12 @@ func (m *master) startReplication(masterAddr string) error {
//stop last replcation, if avaliable
m.Close()
if masterAddr != m.info.Addr {
m.resetInfo(masterAddr)
if err := m.saveInfo(); err != nil {
log.Error("save master info error %s", err.Error())
return err
}
}
m.addr = masterAddr
m.quit = make(chan struct{}, 1)
m.app.ldb.SetReadOnly(true)
go m.runReplication()
return nil
}
@ -195,29 +117,12 @@ func (m *master) runReplication() {
return
default:
if err := m.connect(); err != nil {
log.Error("connect master %s error %s, try 2s later", m.info.Addr, err.Error())
log.Error("connect master %s error %s, try 2s later", m.addr, err.Error())
time.Sleep(2 * time.Second)
continue
}
}
if m.info.LogFileIndex == 0 {
//try a fullsync
if err := m.fullSync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Warn("full sync error %s", err.Error())
}
return
}
if m.info.LogFileIndex == 0 {
//master not support binlog, we cannot sync, so stop replication
m.stopReplication()
return
}
}
for {
if err := m.sync(); err != nil {
if m.conn != nil {
@ -241,10 +146,12 @@ func (m *master) runReplication() {
var (
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
syncCmdFormat = "*3\r\n$4\r\nsync\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n" //sync index pos
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
)
func (m *master) fullSync() error {
log.Info("begin full sync")
if _, err := m.conn.Write(fullSyncCmd); err != nil {
return err
}
@ -264,30 +171,25 @@ func (m *master) fullSync() error {
return err
}
if err = m.app.ldb.FlushAll(); err != nil {
return err
}
var head *ledis.BinLogAnchor
head, err = m.app.ldb.LoadDumpFile(dumpPath)
if err != nil {
if _, err = m.app.ldb.LoadDumpFile(dumpPath); err != nil {
log.Error("load dump file error %s", err.Error())
return err
}
m.info.LogFileIndex = head.LogFileIndex
m.info.LogPos = head.LogPos
return m.saveInfo()
return nil
}
func (m *master) sync() error {
logIndexStr := strconv.FormatInt(m.info.LogFileIndex, 10)
logPosStr := strconv.FormatInt(m.info.LogPos, 10)
var err error
var syncID uint64
if syncID, err = m.app.ldb.NextSyncLogID(); err != nil {
return err
}
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr))
logIDStr := strconv.FormatUint(syncID, 10)
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIDStr),
logIDStr))
if _, err := m.conn.Write(cmd); err != nil {
return err
@ -295,10 +197,17 @@ func (m *master) sync() error {
m.syncBuf.Reset()
err := ReadBulkTo(m.rb, &m.syncBuf)
if err != nil {
if err = ReadBulkTo(m.rb, &m.syncBuf); err != nil {
switch err.Error() {
case ledis.ErrLogMissed.Error():
return m.fullSync()
case ledis.ErrRplNotSupport.Error():
m.stopReplication()
return nil
default:
return err
}
}
var buf []byte
buf, err = snappy.Decode(m.compressBuf, m.syncBuf.Bytes())
@ -308,28 +217,15 @@ func (m *master) sync() error {
m.compressBuf = buf
}
if len(buf) < 16 {
return fmt.Errorf("invalid sync data len %d", len(buf))
}
m.info.LogFileIndex = int64(binary.BigEndian.Uint64(buf[0:8]))
m.info.LogPos = int64(binary.BigEndian.Uint64(buf[8:16]))
if m.info.LogFileIndex == 0 {
//master now not support binlog, stop replication
m.stopReplication()
if len(buf) == 0 {
return nil
} else if m.info.LogFileIndex == -1 {
//-1 means than binlog index and pos are lost, we must start a full sync instead
return m.fullSync()
}
err = m.app.ldb.ReplicateFromData(buf[16:])
if err != nil {
if err = m.app.ldb.StoreLogsFromData(buf); err != nil {
return err
}
return m.saveInfo()
return nil
}
@ -337,6 +233,10 @@ func (app *App) slaveof(masterAddr string) error {
app.m.Lock()
defer app.m.Unlock()
if !app.ldb.ReplicationUsed() {
return fmt.Errorf("slaveof must enable replication")
}
if len(masterAddr) == 0 {
return app.m.stopReplication()
} else {
@ -345,3 +245,75 @@ func (app *App) slaveof(masterAddr string) error {
return nil
}
func (app *App) addSlave(c *client) {
app.slock.Lock()
defer app.slock.Unlock()
app.slaves[c] = struct{}{}
}
func (app *App) removeSlave(c *client) {
app.slock.Lock()
defer app.slock.Unlock()
delete(app.slaves, c)
if c.ack != nil {
select {
case c.ack.ch <- c.lastSyncLogID:
default:
}
}
}
func (app *App) publishNewLog(l *rpl.Log) {
if !app.cfg.Replication.Sync {
//no sync replication, we will do async
return
}
ss := make([]*client, 0, 4)
app.slock.Lock()
logId := l.ID
for s, _ := range app.slaves {
if s.lastSyncLogID >= logId {
//slave has already this log
ss = []*client{}
break
} else {
ss = append(ss, s)
}
}
app.slock.Unlock()
if len(ss) == 0 {
return
}
ack := &syncAck{
logId, make(chan uint64, len(ss)),
}
for _, s := range ss {
s.ack = ack
}
done := make(chan struct{}, 1)
go func() {
for i := 0; i < len(ss); i++ {
id := <-ack.ch
if id > logId {
break
}
}
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(time.Duration(app.cfg.Replication.WaitSyncTime) * time.Second):
}
}