diff --git a/server/cmd_replication.go b/server/cmd_replication.go index b910e51..7aa2e17 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -182,9 +182,86 @@ func replconfCommand(c *client) error { return nil } +func roleCommand(c *client) error { + if len(c.args) != 0 { + return ErrCmdParams + } + + c.app.m.Lock() + isMaster := len(c.app.cfg.SlaveOf) == 0 + c.app.m.Unlock() + + ay := make([]interface{}, 0, 5) + + var lastId int64 = 0 + + stat, _ := c.app.ldb.ReplicationStat() + if stat != nil { + lastId = int64(stat.LastID) + } + + if isMaster { + ay = append(ay, []byte("master")) + ay = append(ay, lastId) + + items := make([]interface{}, 0, 3) + + c.app.slock.Lock() + for addr, slave := range c.app.slaves { + host, port, _ := splitHostPort(addr) + + items = append(items, []interface{}{[]byte(host), + strconv.AppendUint(nil, uint64(port), 10), + strconv.AppendUint(nil, slave.lastLogID.Get(), 10)}) + } + c.app.slock.Unlock() + ay = append(ay, items) + } else { + host, port, _ := splitHostPort(c.app.cfg.Addr) + ay = append(ay, []byte("slave")) + ay = append(ay, []byte(host)) + ay = append(ay, int64(port)) + ay = append(ay, []byte(replStatetring(c.app.m.state.Get()))) + ay = append(ay, lastId) + } + + c.resp.writeArray(ay) + return nil +} + +func replStatetring(r int32) string { + switch r { + case replConnectState: + return "connect" + case replConnectingState: + return "connecting" + case replSyncState: + return "sync" + case replConnectedState: + return "connected" + default: + return "unknown" + } +} + +func splitHostPort(str string) (string, int16, error) { + host, port, err := net.SplitHostPort(str) + if err != nil { + return "", 0, err + } + + p, err := strconv.ParseInt(port, 10, 16) + if err != nil { + return "", 0, err + } + + return host, int16(p), nil +} + func init() { register("slaveof", slaveofCommand) register("fullsync", fullsyncCommand) register("sync", syncCommand) register("replconf", replconfCommand) + register("role", roleCommand) } diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index 3c79836..6be89a8 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -2,6 +2,7 @@ package server import ( "fmt" + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/config" "os" "reflect" @@ -120,6 +121,32 @@ func TestReplication(t *testing.T) { t.Fatal(err) } + mStat, _ := master.ldb.ReplicationStat() + sStat, _ := slave.ldb.ReplicationStat() + + if err = checkTestRole(masterCfg.Addr, []interface{}{ + []byte("master"), + int64(mStat.LastID), + []interface{}{ + []interface{}{ + []byte("127.0.0.1"), + []byte("11183"), + []byte(fmt.Sprintf("%d", sStat.LastID)), + }}, + }); err != nil { + t.Fatal(err) + } + + if err = checkTestRole(slaveCfg.Addr, []interface{}{ + []byte("slave"), + []byte("127.0.0.1"), + int64(11183), + []byte("connected"), + int64(sStat.LastID), + }); err != nil { + t.Fatal(err) + } + slave.tryReSlaveof() time.Sleep(1 * time.Second) @@ -129,5 +156,16 @@ func TestReplication(t *testing.T) { if err = checkDataEqual(master, slave); err != nil { t.Fatal(err) } - +} + +func checkTestRole(addr string, checkRoles []interface{}) error { + conn := goledis.NewConn(addr) + defer conn.Close() + roles, err := goledis.MultiBulk(conn.Do("ROLE")) + if err != nil { + return err + } else if !reflect.DeepEqual(roles, checkRoles) { + return fmt.Errorf("%v != %v", roles, checkRoles) + } + return nil } diff --git a/server/replication.go b/server/replication.go index 44f43d5..2633ee5 100644 --- a/server/replication.go +++ b/server/replication.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/siddontang/go/log" "github.com/siddontang/go/num" + "github.com/siddontang/go/sync2" goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/rpl" @@ -22,6 +23,17 @@ var ( errReplClosed = errors.New("replication is closed") ) +const ( + // slave needs to connect to its master + replConnectState int32 = iota + 1 + // slave-master connection is in progress + replConnectingState + // perform the synchronization + replSyncState + // slave is online + replConnectedState +) + type master struct { sync.Mutex @@ -36,6 +48,8 @@ type master struct { wg sync.WaitGroup syncBuf bytes.Buffer + + state sync2.AtomicInt32 } func newMaster(app *App) *master { @@ -44,6 +58,8 @@ func newMaster(app *App) *master { m.quit = make(chan struct{}, 1) + m.state.Set(replConnectState) + return m } @@ -58,6 +74,8 @@ func (m *master) Close() { case <-m.quit: default: } + + m.state.Set(replConnectState) } func (m *master) resetConn() error { @@ -103,8 +121,12 @@ func (m *master) startReplication(masterAddr string, restart bool) error { } func (m *master) runReplication(restart bool) { - defer m.wg.Done() + defer func() { + m.state.Set(replConnectState) + m.wg.Done() + }() + m.state.Set(replConnectingState) if err := m.resetConn(); err != nil { log.Errorf("reset conn error %s", err.Error()) return @@ -122,12 +144,15 @@ func (m *master) runReplication(restart bool) { } } + m.state.Set(replConnectedState) + if err := m.replConf(); err != nil { log.Errorf("replconf error %s", err.Error()) return } if restart { + m.state.Set(replSyncState) if err := m.fullSync(); err != nil { log.Errorf("restart fullsync error %s", err.Error()) return @@ -139,6 +164,7 @@ func (m *master) runReplication(restart bool) { case <-m.quit: return default: + m.state.Set(replConnectedState) if err := m.sync(); err != nil { log.Errorf("sync error %s", err.Error()) return @@ -250,6 +276,7 @@ func (m *master) sync() error { return nil } + m.state.Set(replSyncState) if err = m.app.ldb.StoreLogsFromData(buf); err != nil { return err }