mirror of https://github.com/ledisdb/ledisdb.git
parent
b6e44c3dc0
commit
c8e5e39f9d
|
@ -96,7 +96,7 @@ func syncCommand(c *client) error {
|
|||
|
||||
m := &ledis.BinLogAnchor{logIndex, logPos}
|
||||
|
||||
if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil {
|
||||
if _, err := c.app.ldb.ReadEventsToTimeout(m, &c.syncBuf, 5); err != nil {
|
||||
return err
|
||||
} else {
|
||||
buf := c.syncBuf.Bytes()
|
||||
|
|
|
@ -204,7 +204,10 @@ func (m *master) runReplication() {
|
|||
if m.info.LogFileIndex == 0 {
|
||||
//try a fullsync
|
||||
if err := m.fullSync(); err != nil {
|
||||
if m.conn != nil {
|
||||
//if conn == nil, other close the replication, not error
|
||||
log.Warn("full sync error %s", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -216,24 +219,18 @@ func (m *master) runReplication() {
|
|||
}
|
||||
|
||||
for {
|
||||
for {
|
||||
lastIndex := m.info.LogFileIndex
|
||||
lastPos := m.info.LogPos
|
||||
if err := m.sync(); err != nil {
|
||||
if m.conn != nil {
|
||||
//if conn == nil, other close the replication, not error
|
||||
log.Warn("sync error %s", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if m.info.LogFileIndex == lastIndex && m.info.LogPos == lastPos {
|
||||
//sync no data, wait 1s and retry
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-m.quit:
|
||||
return
|
||||
case <-time.After(1 * time.Second):
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -291,6 +288,7 @@ func (m *master) sync() error {
|
|||
|
||||
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
|
||||
logIndexStr, len(logPosStr), logPosStr))
|
||||
|
||||
if _, err := m.conn.Write(cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue