From cbbec9d62581246668a72402794db3ce233c409e Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 11 Feb 2015 09:55:34 +0800 Subject: [PATCH] slave support reconnecting --- server/replication.go | 53 ++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/server/replication.go b/server/replication.go index 2633ee5..bb7645c 100644 --- a/server/replication.go +++ b/server/replication.go @@ -120,55 +120,63 @@ func (m *master) startReplication(masterAddr string, restart bool) error { return nil } +func (m *master) needQuit() bool { + select { + case <-m.quit: + return true + default: + return false + } +} + func (m *master) runReplication(restart bool) { defer func() { m.state.Set(replConnectState) m.wg.Done() }() - m.state.Set(replConnectingState) if err := m.resetConn(); err != nil { log.Errorf("reset conn error %s", err.Error()) return } for { - select { - case <-m.quit: + m.state.Set(replConnectState) + + if _, err := m.conn.Do("ping"); err != nil { + log.Errorf("ping master %s error %s, try 3s later", m.addr, err.Error()) + time.Sleep(3 * time.Second) + continue + } + + if m.needQuit() { return - default: - if _, err := m.conn.Do("ping"); err != nil { - log.Errorf("ping master %s error %s, try 2s later", m.addr, err.Error()) - time.Sleep(2 * time.Second) - continue - } } m.state.Set(replConnectedState) if err := m.replConf(); err != nil { log.Errorf("replconf error %s", err.Error()) - return + continue } if restart { - m.state.Set(replSyncState) if err := m.fullSync(); err != nil { log.Errorf("restart fullsync error %s", err.Error()) - return + continue } + m.state.Set(replConnectedState) } for { - select { - case <-m.quit: + if err := m.sync(); err != nil { + log.Errorf("sync error %s", err.Error()) + break + } + m.state.Set(replConnectedState) + + if m.needQuit() { return - default: - m.state.Set(replConnectedState) - if err := m.sync(); err != nil { - log.Errorf("sync error %s", err.Error()) - return - } } } } @@ -198,6 +206,8 @@ func (m *master) fullSync() error { return err } + m.state.Set(replSyncState) + dumpPath := path.Join(m.app.cfg.DataDir, "master.dump") f, err := os.OpenFile(dumpPath, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { @@ -245,6 +255,8 @@ func (m *master) sync() error { return err } + m.state.Set(replSyncState) + m.syncBuf.Reset() if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil { @@ -276,7 +288,6 @@ func (m *master) sync() error { return nil } - m.state.Set(replSyncState) if err = m.app.ldb.StoreLogsFromData(buf); err != nil { return err }