diff --git a/cmd/ledis-load/main.go b/cmd/ledis-load/main.go index 5218ea4..34165b8 100644 --- a/cmd/ledis-load/main.go +++ b/cmd/ledis-load/main.go @@ -57,7 +57,7 @@ func loadDump(cfg *config.Config, ldb *ledis.Ledis) error { return err } - var head *ledis.MasterInfo + var head *ledis.BinLogAnchor head, err = ldb.LoadDumpFile(*dumpPath) if err != nil { diff --git a/ledis/binlog.go b/ledis/binlog.go index 6eb0c30..6e6aa5b 100644 --- a/ledis/binlog.go +++ b/ledis/binlog.go @@ -105,6 +105,8 @@ type BinLog struct { lastLogIndex int64 batchId uint32 + + ch chan struct{} } func NewBinLog(cfg *config.Config) (*BinLog, error) { @@ -121,6 +123,8 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) { l.logNames = make([]string, 0, 16) + l.ch = make(chan struct{}) + if err := l.loadIndex(); err != nil { return nil, err } @@ -375,5 +379,12 @@ func (l *BinLog) Log(args ...[]byte) error { l.checkLogFileSize() + close(l.ch) + l.ch = make(chan struct{}) + return nil } + +func (l *BinLog) Wait() <-chan struct{} { + return l.ch +} diff --git a/ledis/dump.go b/ledis/dump.go index 63c1d58..f162481 100644 --- a/ledis/dump.go +++ b/ledis/dump.go @@ -15,12 +15,12 @@ import ( // //key and value are both compressed for fast transfer dump on network using snappy -type MasterInfo struct { +type BinLogAnchor struct { LogFileIndex int64 LogPos int64 } -func (m *MasterInfo) WriteTo(w io.Writer) error { +func (m *BinLogAnchor) WriteTo(w io.Writer) error { if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { return err } @@ -31,7 +31,7 @@ func (m *MasterInfo) WriteTo(w io.Writer) error { return nil } -func (m *MasterInfo) ReadFrom(r io.Reader) error { +func (m *BinLogAnchor) ReadFrom(r io.Reader) error { err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) if err != nil { return err @@ -56,7 +56,7 @@ func (l *Ledis) DumpFile(path string) error { } func (l *Ledis) Dump(w io.Writer) error { - var m *MasterInfo = new(MasterInfo) + m := new(BinLogAnchor) var err error @@ -118,7 +118,7 @@ func (l *Ledis) Dump(w io.Writer) error { return nil } -func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) { +func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -128,11 +128,11 @@ func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) { return l.LoadDump(f) } -func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { +func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) { l.wLock.Lock() defer l.wLock.Unlock() - info := new(MasterInfo) + info := new(BinLogAnchor) rb := bufio.NewReaderSize(r, 4096) diff --git a/ledis/replication.go b/ledis/replication.go index 421a5ab..804573d 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -8,6 +8,7 @@ import ( "github.com/siddontang/ledisdb/store/driver" "io" "os" + "time" ) const ( @@ -186,7 +187,32 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error { return err } -func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { +// try to read events, if no events read, try to wait the new event singal until timeout seconds +func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { + lastIndex := info.LogFileIndex + lastPos := info.LogPos + + n = 0 + if l.binlog == nil { + //binlog not supported + info.LogFileIndex = 0 + info.LogPos = 0 + return + } + + n, err = l.ReadEventsTo(info, w) + if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { + //no events read + select { + case <-l.binlog.Wait(): + case <-time.After(time.Duration(timeout) * time.Second): + } + return l.ReadEventsTo(info, w) + } + return +} + +func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { n = 0 if l.binlog == nil { //binlog not supported diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 2a64a11..07643c6 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -106,7 +106,7 @@ func TestReplication(t *testing.T) { tx.Rollback() } - info := new(MasterInfo) + info := new(BinLogAnchor) info.LogFileIndex = 1 info.LogPos = 0 var buf bytes.Buffer diff --git a/server/cmd_replication.go b/server/cmd_replication.go index fe84191..ec501f6 100644 --- a/server/cmd_replication.go +++ b/server/cmd_replication.go @@ -89,14 +89,14 @@ func syncCommand(c *client) error { c.syncBuf.Reset() - //reserve space to write master info + //reserve space to write binlog anchor if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil { return err } - m := &ledis.MasterInfo{logIndex, logPos} + m := &ledis.BinLogAnchor{logIndex, logPos} - if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil { + if _, err := c.app.ldb.ReadEventsToTimeout(m, &c.syncBuf, 5); err != nil { return err } else { buf := c.syncBuf.Bytes() diff --git a/server/info.go b/server/info.go index fb6c099..cae6a3f 100644 --- a/server/info.go +++ b/server/info.go @@ -9,7 +9,6 @@ import ( "strings" "sync" "sync/atomic" - "syscall" ) type info struct { @@ -76,8 +75,6 @@ func (i *info) Dump(section string) []byte { i.dumpServer(buf) case "client": i.dumpClients(buf) - case "cpu": - i.dumpCPU(buf) case "mem": i.dumpMem(buf) case "persistence": @@ -103,8 +100,6 @@ func (i *info) dumpAll(buf *bytes.Buffer) { buf.Write(Delims) i.dumpClients(buf) buf.Write(Delims) - i.dumpCPU(buf) - buf.Write(Delims) i.dumpMem(buf) buf.Write(Delims) i.dumpGoroutine(buf) @@ -125,18 +120,6 @@ func (i *info) dumpClients(buf *bytes.Buffer) { i.dumpPairs(buf, infoPair{"client_num", i.Clients.ConnectedClients}) } -func (i *info) dumpCPU(buf *bytes.Buffer) { - buf.WriteString("# CPU\r\n") - - var rusage syscall.Rusage - if err := syscall.Getrusage(syscall.RUSAGE_SELF, &rusage); err != nil { - return - } - - i.dumpPairs(buf, infoPair{"cpu_sys", rusage.Stime.Usec}, - infoPair{"cpu_user", rusage.Utime.Usec}) -} - func (i *info) dumpMem(buf *bytes.Buffer) { buf.WriteString("# Mem\r\n") diff --git a/server/replication.go b/server/replication.go index d02264d..267a29b 100644 --- a/server/replication.go +++ b/server/replication.go @@ -204,7 +204,10 @@ func (m *master) runReplication() { if m.info.LogFileIndex == 0 { //try a fullsync if err := m.fullSync(); err != nil { - log.Warn("full sync error %s", err.Error()) + if m.conn != nil { + //if conn == nil, other close the replication, not error + log.Warn("full sync error %s", err.Error()) + } return } @@ -216,24 +219,18 @@ func (m *master) runReplication() { } for { - for { - lastIndex := m.info.LogFileIndex - lastPos := m.info.LogPos - if err := m.sync(); err != nil { + if err := m.sync(); err != nil { + if m.conn != nil { + //if conn == nil, other close the replication, not error log.Warn("sync error %s", err.Error()) - return - } - - if m.info.LogFileIndex == lastIndex && m.info.LogPos == lastPos { - //sync no data, wait 1s and retry - break } + return } select { case <-m.quit: return - case <-time.After(1 * time.Second): + default: break } } @@ -271,7 +268,7 @@ func (m *master) fullSync() error { return err } - var head *ledis.MasterInfo + var head *ledis.BinLogAnchor head, err = m.app.ldb.LoadDumpFile(dumpPath) if err != nil { @@ -291,6 +288,7 @@ func (m *master) sync() error { cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), logIndexStr, len(logPosStr), logPosStr)) + if _, err := m.conn.Write(cmd); err != nil { return err } diff --git a/server/util.go b/server/util.go index 49c20ba..c015b60 100644 --- a/server/util.go +++ b/server/util.go @@ -29,7 +29,9 @@ func ReadLine(rb *bufio.Reader) ([]byte, error) { func ReadBulkTo(rb *bufio.Reader, w io.Writer) error { l, err := ReadLine(rb) - if len(l) == 0 { + if err != nil { + return err + } else if len(l) == 0 { return errBulkFormat } else if l[0] == '$' { var n int @@ -39,8 +41,11 @@ func ReadBulkTo(rb *bufio.Reader, w io.Writer) error { } else if n == -1 { return nil } else { - if _, err = io.CopyN(w, rb, int64(n)); err != nil { + var nn int64 + if nn, err = io.CopyN(w, rb, int64(n)); err != nil { return err + } else if nn != int64(n) { + return io.ErrShortWrite } if l, err = ReadLine(rb); err != nil {