slave support reconnecting

This commit is contained in:
siddontang 2015-02-11 09:55:34 +08:00
parent 92811d8ea1
commit cbbec9d625
1 changed files with 32 additions and 21 deletions

View File

@ -120,55 +120,63 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
return nil return nil
} }
func (m *master) needQuit() bool {
select {
case <-m.quit:
return true
default:
return false
}
}
func (m *master) runReplication(restart bool) { func (m *master) runReplication(restart bool) {
defer func() { defer func() {
m.state.Set(replConnectState) m.state.Set(replConnectState)
m.wg.Done() m.wg.Done()
}() }()
m.state.Set(replConnectingState)
if err := m.resetConn(); err != nil { if err := m.resetConn(); err != nil {
log.Errorf("reset conn error %s", err.Error()) log.Errorf("reset conn error %s", err.Error())
return return
} }
for { for {
select { m.state.Set(replConnectState)
case <-m.quit:
if _, err := m.conn.Do("ping"); err != nil {
log.Errorf("ping master %s error %s, try 3s later", m.addr, err.Error())
time.Sleep(3 * time.Second)
continue
}
if m.needQuit() {
return return
default:
if _, err := m.conn.Do("ping"); err != nil {
log.Errorf("ping master %s error %s, try 2s later", m.addr, err.Error())
time.Sleep(2 * time.Second)
continue
}
} }
m.state.Set(replConnectedState) m.state.Set(replConnectedState)
if err := m.replConf(); err != nil { if err := m.replConf(); err != nil {
log.Errorf("replconf error %s", err.Error()) log.Errorf("replconf error %s", err.Error())
return continue
} }
if restart { if restart {
m.state.Set(replSyncState)
if err := m.fullSync(); err != nil { if err := m.fullSync(); err != nil {
log.Errorf("restart fullsync error %s", err.Error()) log.Errorf("restart fullsync error %s", err.Error())
return continue
} }
m.state.Set(replConnectedState)
} }
for { for {
select { if err := m.sync(); err != nil {
case <-m.quit: log.Errorf("sync error %s", err.Error())
break
}
m.state.Set(replConnectedState)
if m.needQuit() {
return return
default:
m.state.Set(replConnectedState)
if err := m.sync(); err != nil {
log.Errorf("sync error %s", err.Error())
return
}
} }
} }
} }
@ -198,6 +206,8 @@ func (m *master) fullSync() error {
return err return err
} }
m.state.Set(replSyncState)
dumpPath := path.Join(m.app.cfg.DataDir, "master.dump") dumpPath := path.Join(m.app.cfg.DataDir, "master.dump")
f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
@ -245,6 +255,8 @@ func (m *master) sync() error {
return err return err
} }
m.state.Set(replSyncState)
m.syncBuf.Reset() m.syncBuf.Reset()
if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil { if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil {
@ -276,7 +288,6 @@ func (m *master) sync() error {
return nil return nil
} }
m.state.Set(replSyncState)
if err = m.app.ldb.StoreLogsFromData(buf); err != nil { if err = m.app.ldb.StoreLogsFromData(buf); err != nil {
return err return err
} }