diff --git a/ledis/binlog.go b/ledis/binlog.go index 6eb0c30..6e6aa5b 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -105,6 +105,8 @@ type BinLog struct { lastLogIndex int64 batchId uint32 + + ch chan struct{} } func NewBinLog(cfg *config.Config) (*BinLog, error) { @@ -121,6 +123,8 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) { l.logNames = make([]string, 0, 16) + l.ch = make(chan struct{}) + if err := l.loadIndex(); err != nil { return nil, err } @@ -375,5 +379,12 @@ func (l *BinLog) Log(args ...[]byte) error { l.checkLogFileSize() + close(l.ch) + l.ch = make(chan struct{}) + return nil } + +func (l *BinLog) Wait() <-chan struct{} { + return l.ch +} diff --git a/ledis/replication.go b/ledis/replication.go index e004ca4..804573d 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -8,6 +8,7 @@ import ( "github.com/siddontang/ledisdb/store/driver" "io" "os" + "time" ) const ( @@ -186,6 +187,31 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error { return err } +// try to read events, if no events read, try to wait the new event singal until timeout seconds +func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { + lastIndex := info.LogFileIndex + lastPos := info.LogPos + + n = 0 + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + info.LogPos = 0 + return + } + + n, err = l.ReadEventsTo(info, w) + if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { + //no events read + select { + case <-l.binlog.Wait(): + case <-time.After(time.Duration(timeout) * time.Second): + } + return l.ReadEventsTo(info, w) + } + return +} + func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { n = 0 if l.binlog == nil { diff --git a/server/cmd_replication.go b/server/cmd_replication.go index 6fa8c4c..c047a52 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -89,7 +89,7 @@ func syncCommand(c *client) error { c.syncBuf.Reset() - //reserve space to write master info + //reserve space to write binlog anchor if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil { return err }