update slaveof command, wait (n + 1) /2 replies

This commit is contained in:
siddontang 2014-09-28 21:34:24 +08:00
parent 69e489dd1b
commit 2a3fca829e
5 changed files with 38 additions and 16 deletions

View File

@ -119,7 +119,7 @@ Table of Contents
- [BPERSIST key](#bpersist-key)
- [BXSCAN key [MATCH match] [COUNT count]](#bxscan-key-match-match-count-count)
- [Replication](#replication)
- [SLAVEOF host port](#slaveof-host-port)
- [SLAVEOF host port [restart]](#slaveof-host-port-restart)
- [FULLSYNC](#fullsync)
- [SYNC logid](#sync-logid)
- [Server](#server)
@ -2396,13 +2396,13 @@ See [XSCAN](#xscan-key-match-match-count-count) for more information.
## Replication
### SLAVEOF host port
### SLAVEOF host port [restart]
Changes the replication settings of a slave on the fly. If the server is already acting as slave, SLAVEOF NO ONE will turn off the replication.
SLAVEOF host port will make the server a slave of another server listening at the specified host and port.
If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, discarding the old dataset.
If a server is already a slave of a master, SLAVEOF host port will stop the replication against the old and start the synchronization against the new one, if restart is set, it will discard the old dataset, otherwise it will sync with LastLogID + 1.
### FULLSYNC

View File

@ -135,7 +135,7 @@ func (app *App) Close() {
func (app *App) Run() {
if len(app.cfg.SlaveOf) > 0 {
app.slaveof(app.cfg.SlaveOf)
app.slaveof(app.cfg.SlaveOf, false)
}
go app.httpServe()

View File

@ -13,24 +13,32 @@ import (
func slaveofCommand(c *client) error {
args := c.args
if len(args) != 2 {
if len(args) != 2 || len(args) != 3 {
return ErrCmdParams
}
masterAddr := ""
restart := false
if strings.ToLower(hack.String(args[0])) == "no" &&
strings.ToLower(hack.String(args[1])) == "one" {
//stop replication, use master = ""
if len(args) != 2 {
return ErrCmdParams
}
} else {
if _, err := strconv.ParseInt(hack.String(args[1]), 10, 16); err != nil {
return err
}
masterAddr = fmt.Sprintf("%s:%s", args[0], args[1])
if len(args) == 3 && strings.ToLower(hack.String(args[2])) == "restart" {
restart = true
}
}
if err := c.app.slaveof(masterAddr); err != nil {
if err := c.app.slaveof(masterAddr, restart); err != nil {
return err
}

View File

@ -96,7 +96,7 @@ func TestReplication(t *testing.T) {
t.Fatal(err)
}
slave.slaveof("")
slave.slaveof("", false)
db.Set([]byte("a2"), value)
db.Set([]byte("b2"), value)
@ -112,7 +112,7 @@ func TestReplication(t *testing.T) {
t.Fatal("must error")
}
slave.slaveof(masterCfg.Addr)
slave.slaveof(masterCfg.Addr, false)
time.Sleep(1 * time.Second)

View File

@ -84,7 +84,7 @@ func (m *master) stopReplication() error {
return nil
}
func (m *master) startReplication(masterAddr string) error {
func (m *master) startReplication(masterAddr string, restart bool) error {
//stop last replcation, if avaliable
m.Close()
@ -94,11 +94,11 @@ func (m *master) startReplication(masterAddr string) error {
m.app.ldb.SetReadOnly(true)
go m.runReplication()
go m.runReplication(restart)
return nil
}
func (m *master) runReplication() {
func (m *master) runReplication(restart bool) {
m.wg.Add(1)
defer m.wg.Done()
@ -114,6 +114,16 @@ func (m *master) runReplication() {
}
}
if restart {
if err := m.fullSync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Error("restart fullsync error %s", err.Error())
}
return
}
}
for {
if err := m.sync(); err != nil {
if m.conn != nil {
@ -227,7 +237,7 @@ func (m *master) sync() error {
}
func (app *App) slaveof(masterAddr string) error {
func (app *App) slaveof(masterAddr string, restart bool) error {
app.m.Lock()
defer app.m.Unlock()
@ -242,7 +252,7 @@ func (app *App) slaveof(masterAddr string) error {
app.ldb.SetReadOnly(false)
} else {
return app.m.startReplication(masterAddr)
return app.m.startReplication(masterAddr, restart)
}
return nil
@ -308,15 +318,19 @@ func (app *App) publishNewLog(l *rpl.Log) {
}
done := make(chan struct{}, 1)
go func() {
go func(total int) {
n := 0
for i := 0; i < len(ss); i++ {
id := <-ack.ch
if id > logId {
break
n++
if n >= total {
break
}
}
}
done <- struct{}{}
}()
}((len(ss) + 1) / 2)
select {
case <-done: