diff --git a/server/client_resp.go b/server/client_resp.go index 078a02c..93edef9 100644 --- a/server/client_resp.go +++ b/server/client_resp.go @@ -51,7 +51,10 @@ func (c *respClient) run() { log.Fatal("client run panic %s:%v", buf, e) } + handleQuit := true if c.conn != nil { + //if handle quit command before, conn is nil + handleQuit = false c.conn.Close() } @@ -60,7 +63,7 @@ func (c *respClient) run() { c.tx = nil } - c.app.removeSlave(c.client) + c.app.removeSlave(c.client, handleQuit) }() for { @@ -144,6 +147,7 @@ func (c *respClient) handleRequest(reqData [][]byte) { c.resp.writeStatus(OK) c.resp.flush() c.conn.Close() + c.conn = nil return } diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 20c6982..aefd1ac 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -109,7 +109,16 @@ func syncCommand(c *client) error { c.lastLogID = logId - 1 - c.app.slaveAck(c) + stat, err := c.app.ldb.ReplicationStat() + if err != nil { + return err + } + + if c.lastLogID > stat.LastID { + return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID) + } else if c.lastLogID == stat.LastID { + c.app.slaveAck(c) + } c.syncBuf.Reset() diff --git a/server/replication.go b/server/replication.go index ea5f6cf..25e4a8d 100644 --- a/server/replication.go +++ b/server/replication.go @@ -50,10 +50,17 @@ func newMaster(app *App) *master { return m } +var ( + quitCmd = []byte("*1\r\n$4\r\nquit\r\n") +) + func (m *master) Close() { ledis.AsyncNotify(m.quit) if m.conn != nil { + //for replication, we send quit command to close gracefully + m.conn.Write(quitCmd) + m.conn.Close() m.conn = nil } @@ -321,7 +328,7 @@ func (app *App) addSlave(c *client) { app.slaves[addr] = c } -func (app *App) removeSlave(c *client) { +func (app *App) removeSlave(c *client, activeQuit bool) { addr := c.slaveListeningAddr app.slock.Lock() @@ -330,6 +337,9 @@ func (app *App) removeSlave(c *client) { if _, ok := app.slaves[addr]; ok { delete(app.slaves, addr) log.Info("remove slave %s", addr) + if activeQuit { + asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) + } } } @@ -344,7 +354,6 @@ func (app *App) slaveAck(c *client) { return } - println("ack", c.lastLogID) asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) } @@ -371,9 +380,11 @@ func (app *App) publishNewLog(l *rpl.Log) { n := 0 logId := l.ID for _, s := range app.slaves { - if s.lastLogID >= logId { + if s.lastLogID == logId { //slave has already owned this log n++ + } else if s.lastLogID > logId { + log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId) } } @@ -387,14 +398,10 @@ func (app *App) publishNewLog(l *rpl.Log) { startTime := time.Now() done := make(chan struct{}, 1) go func(total int) { - n := 0 - for { + for i := 0; i < total; i++ { id := <-app.slaveSyncAck - if id >= logId { - n++ - if n >= total { - break - } + if id < logId { + log.Info("some slave may close with last logid %d < %d", id, logId) } } done <- struct{}{}