Merge branch 'develop'

This commit is contained in:
siddontang 2014-09-07 08:45:32 +08:00
commit 4989bfa22f
9 changed files with 68 additions and 45 deletions

View File

@ -57,7 +57,7 @@ func loadDump(cfg *config.Config, ldb *ledis.Ledis) error {
return err return err
} }
var head *ledis.MasterInfo var head *ledis.BinLogAnchor
head, err = ldb.LoadDumpFile(*dumpPath) head, err = ldb.LoadDumpFile(*dumpPath)
if err != nil { if err != nil {

View File

@ -105,6 +105,8 @@ type BinLog struct {
lastLogIndex int64 lastLogIndex int64
batchId uint32 batchId uint32
ch chan struct{}
} }
func NewBinLog(cfg *config.Config) (*BinLog, error) { func NewBinLog(cfg *config.Config) (*BinLog, error) {
@ -121,6 +123,8 @@ func NewBinLog(cfg *config.Config) (*BinLog, error) {
l.logNames = make([]string, 0, 16) l.logNames = make([]string, 0, 16)
l.ch = make(chan struct{})
if err := l.loadIndex(); err != nil { if err := l.loadIndex(); err != nil {
return nil, err return nil, err
} }
@ -375,5 +379,12 @@ func (l *BinLog) Log(args ...[]byte) error {
l.checkLogFileSize() l.checkLogFileSize()
close(l.ch)
l.ch = make(chan struct{})
return nil return nil
} }
func (l *BinLog) Wait() <-chan struct{} {
return l.ch
}

View File

@ -15,12 +15,12 @@ import (
// //
//key and value are both compressed for fast transfer dump on network using snappy //key and value are both compressed for fast transfer dump on network using snappy
type MasterInfo struct { type BinLogAnchor struct {
LogFileIndex int64 LogFileIndex int64
LogPos int64 LogPos int64
} }
func (m *MasterInfo) WriteTo(w io.Writer) error { func (m *BinLogAnchor) WriteTo(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil { if err := binary.Write(w, binary.BigEndian, m.LogFileIndex); err != nil {
return err return err
} }
@ -31,7 +31,7 @@ func (m *MasterInfo) WriteTo(w io.Writer) error {
return nil return nil
} }
func (m *MasterInfo) ReadFrom(r io.Reader) error { func (m *BinLogAnchor) ReadFrom(r io.Reader) error {
err := binary.Read(r, binary.BigEndian, &m.LogFileIndex) err := binary.Read(r, binary.BigEndian, &m.LogFileIndex)
if err != nil { if err != nil {
return err return err
@ -56,7 +56,7 @@ func (l *Ledis) DumpFile(path string) error {
} }
func (l *Ledis) Dump(w io.Writer) error { func (l *Ledis) Dump(w io.Writer) error {
var m *MasterInfo = new(MasterInfo) m := new(BinLogAnchor)
var err error var err error
@ -118,7 +118,7 @@ func (l *Ledis) Dump(w io.Writer) error {
return nil return nil
} }
func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) { func (l *Ledis) LoadDumpFile(path string) (*BinLogAnchor, error) {
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -128,11 +128,11 @@ func (l *Ledis) LoadDumpFile(path string) (*MasterInfo, error) {
return l.LoadDump(f) return l.LoadDump(f)
} }
func (l *Ledis) LoadDump(r io.Reader) (*MasterInfo, error) { func (l *Ledis) LoadDump(r io.Reader) (*BinLogAnchor, error) {
l.wLock.Lock() l.wLock.Lock()
defer l.wLock.Unlock() defer l.wLock.Unlock()
info := new(MasterInfo) info := new(BinLogAnchor)
rb := bufio.NewReaderSize(r, 4096) rb := bufio.NewReaderSize(r, 4096)

View File

@ -8,6 +8,7 @@ import (
"github.com/siddontang/ledisdb/store/driver" "github.com/siddontang/ledisdb/store/driver"
"io" "io"
"os" "os"
"time"
) )
const ( const (
@ -186,7 +187,32 @@ func (l *Ledis) ReplicateFromBinLog(filePath string) error {
return err return err
} }
func (l *Ledis) ReadEventsTo(info *MasterInfo, w io.Writer) (n int, err error) { // try to read events, if no events read, try to wait the new event singal until timeout seconds
func (l *Ledis) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
lastIndex := info.LogFileIndex
lastPos := info.LogPos
n = 0
if l.binlog == nil {
//binlog not supported
info.LogFileIndex = 0
info.LogPos = 0
return
}
n, err = l.ReadEventsTo(info, w)
if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
//no events read
select {
case <-l.binlog.Wait():
case <-time.After(time.Duration(timeout) * time.Second):
}
return l.ReadEventsTo(info, w)
}
return
}
func (l *Ledis) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
n = 0 n = 0
if l.binlog == nil { if l.binlog == nil {
//binlog not supported //binlog not supported

View File

@ -106,7 +106,7 @@ func TestReplication(t *testing.T) {
tx.Rollback() tx.Rollback()
} }
info := new(MasterInfo) info := new(BinLogAnchor)
info.LogFileIndex = 1 info.LogFileIndex = 1
info.LogPos = 0 info.LogPos = 0
var buf bytes.Buffer var buf bytes.Buffer

View File

@ -89,14 +89,14 @@ func syncCommand(c *client) error {
c.syncBuf.Reset() c.syncBuf.Reset()
//reserve space to write master info //reserve space to write binlog anchor
if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil { if _, err := c.syncBuf.Write(reserveInfoSpace); err != nil {
return err return err
} }
m := &ledis.MasterInfo{logIndex, logPos} m := &ledis.BinLogAnchor{logIndex, logPos}
if _, err := c.app.ldb.ReadEventsTo(m, &c.syncBuf); err != nil { if _, err := c.app.ldb.ReadEventsToTimeout(m, &c.syncBuf, 5); err != nil {
return err return err
} else { } else {
buf := c.syncBuf.Bytes() buf := c.syncBuf.Bytes()

View File

@ -9,7 +9,6 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
) )
type info struct { type info struct {
@ -76,8 +75,6 @@ func (i *info) Dump(section string) []byte {
i.dumpServer(buf) i.dumpServer(buf)
case "client": case "client":
i.dumpClients(buf) i.dumpClients(buf)
case "cpu":
i.dumpCPU(buf)
case "mem": case "mem":
i.dumpMem(buf) i.dumpMem(buf)
case "persistence": case "persistence":
@ -103,8 +100,6 @@ func (i *info) dumpAll(buf *bytes.Buffer) {
buf.Write(Delims) buf.Write(Delims)
i.dumpClients(buf) i.dumpClients(buf)
buf.Write(Delims) buf.Write(Delims)
i.dumpCPU(buf)
buf.Write(Delims)
i.dumpMem(buf) i.dumpMem(buf)
buf.Write(Delims) buf.Write(Delims)
i.dumpGoroutine(buf) i.dumpGoroutine(buf)
@ -125,18 +120,6 @@ func (i *info) dumpClients(buf *bytes.Buffer) {
i.dumpPairs(buf, infoPair{"client_num", i.Clients.ConnectedClients}) i.dumpPairs(buf, infoPair{"client_num", i.Clients.ConnectedClients})
} }
func (i *info) dumpCPU(buf *bytes.Buffer) {
buf.WriteString("# CPU\r\n")
var rusage syscall.Rusage
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &rusage); err != nil {
return
}
i.dumpPairs(buf, infoPair{"cpu_sys", rusage.Stime.Usec},
infoPair{"cpu_user", rusage.Utime.Usec})
}
func (i *info) dumpMem(buf *bytes.Buffer) { func (i *info) dumpMem(buf *bytes.Buffer) {
buf.WriteString("# Mem\r\n") buf.WriteString("# Mem\r\n")

View File

@ -204,7 +204,10 @@ func (m *master) runReplication() {
if m.info.LogFileIndex == 0 { if m.info.LogFileIndex == 0 {
//try a fullsync //try a fullsync
if err := m.fullSync(); err != nil { if err := m.fullSync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Warn("full sync error %s", err.Error()) log.Warn("full sync error %s", err.Error())
}
return return
} }
@ -216,24 +219,18 @@ func (m *master) runReplication() {
} }
for { for {
for {
lastIndex := m.info.LogFileIndex
lastPos := m.info.LogPos
if err := m.sync(); err != nil { if err := m.sync(); err != nil {
if m.conn != nil {
//if conn == nil, other close the replication, not error
log.Warn("sync error %s", err.Error()) log.Warn("sync error %s", err.Error())
}
return return
} }
if m.info.LogFileIndex == lastIndex && m.info.LogPos == lastPos {
//sync no data, wait 1s and retry
break
}
}
select { select {
case <-m.quit: case <-m.quit:
return return
case <-time.After(1 * time.Second): default:
break break
} }
} }
@ -271,7 +268,7 @@ func (m *master) fullSync() error {
return err return err
} }
var head *ledis.MasterInfo var head *ledis.BinLogAnchor
head, err = m.app.ldb.LoadDumpFile(dumpPath) head, err = m.app.ldb.LoadDumpFile(dumpPath)
if err != nil { if err != nil {
@ -291,6 +288,7 @@ func (m *master) sync() error {
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr)) logIndexStr, len(logPosStr), logPosStr))
if _, err := m.conn.Write(cmd); err != nil { if _, err := m.conn.Write(cmd); err != nil {
return err return err
} }

View File

@ -29,7 +29,9 @@ func ReadLine(rb *bufio.Reader) ([]byte, error) {
func ReadBulkTo(rb *bufio.Reader, w io.Writer) error { func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
l, err := ReadLine(rb) l, err := ReadLine(rb)
if len(l) == 0 { if err != nil {
return err
} else if len(l) == 0 {
return errBulkFormat return errBulkFormat
} else if l[0] == '$' { } else if l[0] == '$' {
var n int var n int
@ -39,8 +41,11 @@ func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
} else if n == -1 { } else if n == -1 {
return nil return nil
} else { } else {
if _, err = io.CopyN(w, rb, int64(n)); err != nil { var nn int64
if nn, err = io.CopyN(w, rb, int64(n)); err != nil {
return err return err
} else if nn != int64(n) {
return io.ErrShortWrite
} }
if l, err = ReadLine(rb); err != nil { if l, err = ReadLine(rb); err != nil {