diff --git a/doc/commands.md b/doc/commands.md index 0809131..4f90a18 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -119,7 +119,7 @@ Table of Contents - [BPERSIST key](#bpersist-key) - [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count) - [Replication](#replication) - - [SLAVEOF host port](#slaveof-host-port) + - [SLAVEOF host port [restart]](#slaveof-host-port-restart) - [FULLSYNC](#fullsync) - [SYNC logid](#sync-logid) - [Server](#server) @@ -2396,13 +2396,13 @@ See [XSCAN](#xscan-key-match-match-count-count) for more information. ## Replication -### SLAVEOF host port +### SLAVEOF host port [restart] Changes the replication settings of a slave on the fly. If the server is already acting as slave, SLAVEOF NO ONE will turn off the replication. SLAVEOF host port will make the server a slave of another server listening at the specified host and port. -If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, discarding the old dataset. +If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, if restart is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1. ### FULLSYNC diff --git a/server/app.go b/server/app.go index 62f8514..dbf12e5 100644 --- a/server/app.go +++ b/server/app.go @@ -135,7 +135,7 @@ func (app *App) Close() { func (app *App) Run() { if len(app.cfg.SlaveOf) > 0 { - app.slaveof(app.cfg.SlaveOf) + app.slaveof(app.cfg.SlaveOf, false) } go app.httpServe() diff --git a/server/cmd_replication.go b/server/cmd_replication.go index ba091aa..aa6ede4 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -13,24 +13,32 @@ import ( func slaveofCommand(c *client) error { args := c.args - if len(args) != 2 { + if len(args) != 2 || len(args) != 3 { return ErrCmdParams } masterAddr := "" + restart := false if strings.ToLower(hack.String(args[0])) == "no" && strings.ToLower(hack.String(args[1])) == "one" { //stop replication, use master = "" + if len(args) != 2 { + return ErrCmdParams + } } else { if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil { return err } masterAddr = fmt.Sprintf("%s:%s", args[0], args[1]) + + if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "restart" { + restart = true + } } - if err := c.app.slaveof(masterAddr); err != nil { + if err := c.app.slaveof(masterAddr, restart); err != nil { return err } diff --git a/server/cmd_replication_test.go b/server/cmd_replication_test.go index fee81fa..07db0c7 100644 --- a/server/cmd_replication_test.go +++ b/server/cmd_replication_test.go @@ -96,7 +96,7 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - slave.slaveof("") + slave.slaveof("", false) db.Set([]byte("a2"), value) db.Set([]byte("b2"), value) @@ -112,7 +112,7 @@ func TestReplication(t *testing.T) { t.Fatal("must error") } - slave.slaveof(masterCfg.Addr) + slave.slaveof(masterCfg.Addr, false) time.Sleep(1 * time.Second) diff --git a/server/replication.go b/server/replication.go index d912016..2c409bf 100644 --- a/server/replication.go +++ b/server/replication.go @@ -84,7 +84,7 @@ func (m *master) stopReplication() error { return nil } -func (m *master) startReplication(masterAddr string) error { +func (m *master) startReplication(masterAddr string, restart bool) error { //stop last replcation, if avaliable m.Close() @@ -94,11 +94,11 @@ func (m *master) startReplication(masterAddr string) error { m.app.ldb.SetReadOnly(true) - go m.runReplication() + go m.runReplication(restart) return nil } -func (m *master) runReplication() { +func (m *master) runReplication(restart bool) { m.wg.Add(1) defer m.wg.Done() @@ -114,6 +114,16 @@ func (m *master) runReplication() { } } + if restart { + if err := m.fullSync(); err != nil { + if m.conn != nil { + //if conn == nil, other close the replication, not error + log.Error("restart fullsync error %s", err.Error()) + } + return + } + } + for { if err := m.sync(); err != nil { if m.conn != nil { @@ -227,7 +237,7 @@ func (m *master) sync() error { } -func (app *App) slaveof(masterAddr string) error { +func (app *App) slaveof(masterAddr string, restart bool) error { app.m.Lock() defer app.m.Unlock() @@ -242,7 +252,7 @@ func (app *App) slaveof(masterAddr string) error { app.ldb.SetReadOnly(false) } else { - return app.m.startReplication(masterAddr) + return app.m.startReplication(masterAddr, restart) } return nil @@ -308,15 +318,19 @@ func (app *App) publishNewLog(l *rpl.Log) { } done := make(chan struct{}, 1) - go func() { + go func(total int) { + n := 0 for i := 0; i < len(ss); i++ { id := <-ack.ch if id > logId { - break + n++ + if n >= total { + break + } } } done <- struct{}{} - }() + }((len(ss) + 1) / 2) select { case <-done: