update replication

This commit is contained in:
siddontang 2014-10-21 22:51:17 +08:00
parent c3824552b6
commit e92dd80442
3 changed files with 32 additions and 12 deletions

View File

@ -51,7 +51,10 @@ func (c *respClient) run() {
log.Fatal("client run panic %s:%v", buf, e) log.Fatal("client run panic %s:%v", buf, e)
} }
handleQuit := true
if c.conn != nil { if c.conn != nil {
//if handle quit command before, conn is nil
handleQuit = false
c.conn.Close() c.conn.Close()
} }
@ -60,7 +63,7 @@ func (c *respClient) run() {
c.tx = nil c.tx = nil
} }
c.app.removeSlave(c.client) c.app.removeSlave(c.client, handleQuit)
}() }()
for { for {
@ -144,6 +147,7 @@ func (c *respClient) handleRequest(reqData [][]byte) {
c.resp.writeStatus(OK) c.resp.writeStatus(OK)
c.resp.flush() c.resp.flush()
c.conn.Close() c.conn.Close()
c.conn = nil
return return
} }

View File

@ -109,7 +109,16 @@ func syncCommand(c *client) error {
c.lastLogID = logId - 1 c.lastLogID = logId - 1
stat, err := c.app.ldb.ReplicationStat()
if err != nil {
return err
}
if c.lastLogID > stat.LastID {
return fmt.Errorf("invalid sync logid %d > %d + 1", logId, stat.LastID)
} else if c.lastLogID == stat.LastID {
c.app.slaveAck(c) c.app.slaveAck(c)
}
c.syncBuf.Reset() c.syncBuf.Reset()

View File

@ -50,10 +50,17 @@ func newMaster(app *App) *master {
return m return m
} }
var (
quitCmd = []byte("*1\r\n$4\r\nquit\r\n")
)
func (m *master) Close() { func (m *master) Close() {
ledis.AsyncNotify(m.quit) ledis.AsyncNotify(m.quit)
if m.conn != nil { if m.conn != nil {
//for replication, we send quit command to close gracefully
m.conn.Write(quitCmd)
m.conn.Close() m.conn.Close()
m.conn = nil m.conn = nil
} }
@ -321,7 +328,7 @@ func (app *App) addSlave(c *client) {
app.slaves[addr] = c app.slaves[addr] = c
} }
func (app *App) removeSlave(c *client) { func (app *App) removeSlave(c *client, activeQuit bool) {
addr := c.slaveListeningAddr addr := c.slaveListeningAddr
app.slock.Lock() app.slock.Lock()
@ -330,6 +337,9 @@ func (app *App) removeSlave(c *client) {
if _, ok := app.slaves[addr]; ok { if _, ok := app.slaves[addr]; ok {
delete(app.slaves, addr) delete(app.slaves, addr)
log.Info("remove slave %s", addr) log.Info("remove slave %s", addr)
if activeQuit {
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
}
} }
} }
@ -344,7 +354,6 @@ func (app *App) slaveAck(c *client) {
return return
} }
println("ack", c.lastLogID)
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID) asyncNotifyUint64(app.slaveSyncAck, c.lastLogID)
} }
@ -371,9 +380,11 @@ func (app *App) publishNewLog(l *rpl.Log) {
n := 0 n := 0
logId := l.ID logId := l.ID
for _, s := range app.slaves { for _, s := range app.slaves {
if s.lastLogID >= logId { if s.lastLogID == logId {
//slave has already owned this log //slave has already owned this log
n++ n++
} else if s.lastLogID > logId {
log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, s.lastLogID, logId)
} }
} }
@ -387,14 +398,10 @@ func (app *App) publishNewLog(l *rpl.Log) {
startTime := time.Now() startTime := time.Now()
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
go func(total int) { go func(total int) {
n := 0 for i := 0; i < total; i++ {
for {
id := <-app.slaveSyncAck id := <-app.slaveSyncAck
if id >= logId { if id < logId {
n++ log.Info("some slave may close with last logid %d < %d", id, logId)
if n >= total {
break
}
} }
} }
done <- struct{}{} done <- struct{}{}