mirror of https://github.com/ledisdb/ledisdb.git
using close channel to wait new bin log
This commit is contained in:
parent
dad6478c12
commit
b6e44c3dc0
|
@ -105,6 +105,8 @@ type BinLog struct {
|
||||||
lastLogIndex int64
|
lastLogIndex int64
|
||||||
|
|
||||||
batchId uint32
|
batchId uint32
|
||||||
|
|
||||||
|
ch chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBinLog(cfg *config.Config) (*BinLog, error) {
|
func NewBinLog(cfg *config.Config) (*BinLog, error) {
|
||||||
|
@ -121,6 +123,8 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) {
|
||||||
|
|
||||||
l.logNames = make([]string, 0, 16)
|
l.logNames = make([]string, 0, 16)
|
||||||
|
|
||||||
|
l.ch = make(chan struct{})
|
||||||
|
|
||||||
if err := l.loadIndex(); err != nil {
|
if err := l.loadIndex(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -375,5 +379,12 @@ func (l *BinLog) Log(args ...[]byte) error {
|
||||||
|
|
||||||
l.checkLogFileSize()
|
l.checkLogFileSize()
|
||||||
|
|
||||||
|
close(l.ch)
|
||||||
|
l.ch = make(chan struct{})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *BinLog) Wait() <-chan struct{} {
|
||||||
|
return l.ch
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/siddontang/ledisdb/store/driver"
|
"github.com/siddontang/ledisdb/store/driver"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -186,6 +187,31 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// try to read events, if no events read, try to wait the new event singal until timeout seconds
|
||||||
|
func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
|
||||||
|
lastIndex := info.LogFileIndex
|
||||||
|
lastPos := info.LogPos
|
||||||
|
|
||||||
|
n = 0
|
||||||
|
if l.binlog == nil {
|
||||||
|
//binlog not supported
|
||||||
|
info.LogFileIndex = 0
|
||||||
|
info.LogPos = 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = l.ReadEventsTo(info, w)
|
||||||
|
if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
|
||||||
|
//no events read
|
||||||
|
select {
|
||||||
|
case <-l.binlog.Wait():
|
||||||
|
case <-time.After(time.Duration(timeout) * time.Second):
|
||||||
|
}
|
||||||
|
return l.ReadEventsTo(info, w)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
|
func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
|
||||||
n = 0
|
n = 0
|
||||||
if l.binlog == nil {
|
if l.binlog == nil {
|
||||||
|
|
|
@ -89,7 +89,7 @@ func syncCommand(c *client) error {
|
||||||
|
|
||||||
c.syncBuf.Reset()
|
c.syncBuf.Reset()
|
||||||
|
|
||||||
//reserve space to write master info
|
//reserve space to write binlog anchor
|
||||||
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
|
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue