mirror of https://github.com/ledisdb/ledisdb.git
Use new log interface
This commit is contained in:
parent
8c75a693f4
commit
053da7736c
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/ledisdb",
|
"ImportPath": "github.com/siddontang/ledisdb",
|
||||||
"GoVersion": "go1.3.3",
|
"GoVersion": "go1.4",
|
||||||
"Packages": [
|
"Packages": [
|
||||||
"./..."
|
"./..."
|
||||||
],
|
],
|
||||||
|
@ -11,8 +11,8 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/boltdb/bolt",
|
"ImportPath": "github.com/boltdb/bolt",
|
||||||
"Comment": "v1.0-5-g33e7a07",
|
"Comment": "data/v1-256-ge65c902",
|
||||||
"Rev": "33e7a074e2c470b6d0b7ee23322a1c4cc759044b"
|
"Rev": "e65c9027c35b7ef1014db9e02686889e51aadb2e"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/cupcake/rdb",
|
"ImportPath": "github.com/cupcake/rdb",
|
||||||
|
@ -24,39 +24,39 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/arena",
|
"ImportPath": "github.com/siddontang/go/arena",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/bson",
|
"ImportPath": "github.com/siddontang/go/bson",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/filelock",
|
"ImportPath": "github.com/siddontang/go/filelock",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/hack",
|
"ImportPath": "github.com/siddontang/go/hack",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/ioutil2",
|
"ImportPath": "github.com/siddontang/go/ioutil2",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/log",
|
"ImportPath": "github.com/siddontang/go/log",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/num",
|
"ImportPath": "github.com/siddontang/go/num",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/snappy",
|
"ImportPath": "github.com/siddontang/go/snappy",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/siddontang/go/sync2",
|
"ImportPath": "github.com/siddontang/go/sync2",
|
||||||
"Rev": "8f64946c30746240c2f3bdb606eed9a4aca34478"
|
"Rev": "c2b33271306fcb7c6532efceac33ec45ee2439e0"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
|
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
|
||||||
|
|
|
@ -107,7 +107,7 @@ func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error {
|
||||||
if rl, err = l.r.Log(g.Data()); err != nil {
|
if rl, err = l.r.Log(g.Data()); err != nil {
|
||||||
l.commitLock.Unlock()
|
l.commitLock.Unlock()
|
||||||
|
|
||||||
log.Fatal("write wal error %s", err.Error())
|
log.Fatalf("write wal error %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error {
|
||||||
if err = c.Commit(); err != nil {
|
if err = c.Commit(); err != nil {
|
||||||
l.commitLock.Unlock()
|
l.commitLock.Unlock()
|
||||||
|
|
||||||
log.Fatal("commit error %s", err.Error())
|
log.Fatalf("commit error %s", err.Error())
|
||||||
l.noticeReplication()
|
l.noticeReplication()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,7 @@ func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error {
|
||||||
if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
||||||
l.commitLock.Unlock()
|
l.commitLock.Unlock()
|
||||||
|
|
||||||
log.Fatal("update commit id error %s", err.Error())
|
log.Fatalf("update commit id error %s", err.Error())
|
||||||
l.noticeReplication()
|
l.noticeReplication()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,7 +132,7 @@ func (l *Ledis) flushAll() error {
|
||||||
n++
|
n++
|
||||||
if n == 10000 {
|
if n == 10000 {
|
||||||
if err := w.Commit(); err != nil {
|
if err := w.Commit(); err != nil {
|
||||||
log.Fatal("flush all commit error: %s", err.Error())
|
log.Fatalf("flush all commit error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
n = 0
|
n = 0
|
||||||
|
@ -141,13 +141,13 @@ func (l *Ledis) flushAll() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.Commit(); err != nil {
|
if err := w.Commit(); err != nil {
|
||||||
log.Fatal("flush all commit error: %s", err.Error())
|
log.Fatalf("flush all commit error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.r != nil {
|
if l.r != nil {
|
||||||
if err := l.r.Clear(); err != nil {
|
if err := l.r.Clear(); err != nil {
|
||||||
log.Fatal("flush all replication clear error: %s", err.Error())
|
log.Fatalf("flush all replication clear error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (l *Ledis) handleReplication() error {
|
||||||
for {
|
for {
|
||||||
if err = l.r.NextNeedCommitLog(rl); err != nil {
|
if err = l.r.NextNeedCommitLog(rl); err != nil {
|
||||||
if err != rpl.ErrNoBehindLog {
|
if err != rpl.ErrNoBehindLog {
|
||||||
log.Error("get next commit log err, %s", err.Error)
|
log.Errorf("get next commit log err, %s", err.Error)
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
l.rwg.Done()
|
l.rwg.Done()
|
||||||
|
@ -45,23 +45,23 @@ func (l *Ledis) handleReplication() error {
|
||||||
if rl.Compression == 1 {
|
if rl.Compression == 1 {
|
||||||
//todo optimize
|
//todo optimize
|
||||||
if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil {
|
if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil {
|
||||||
log.Error("decode log error %s", err.Error())
|
log.Errorf("decode log error %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if bd, err := store.NewBatchData(rl.Data); err != nil {
|
if bd, err := store.NewBatchData(rl.Data); err != nil {
|
||||||
log.Error("decode batch log error %s", err.Error())
|
log.Errorf("decode batch log error %s", err.Error())
|
||||||
return err
|
return err
|
||||||
} else if err = bd.Replay(l.rbatch); err != nil {
|
} else if err = bd.Replay(l.rbatch); err != nil {
|
||||||
log.Error("replay batch log error %s", err.Error())
|
log.Errorf("replay batch log error %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
l.commitLock.Lock()
|
l.commitLock.Lock()
|
||||||
if err = l.rbatch.Commit(); err != nil {
|
if err = l.rbatch.Commit(); err != nil {
|
||||||
log.Error("commit log error %s", err.Error())
|
log.Errorf("commit log error %s", err.Error())
|
||||||
} else if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
} else if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
||||||
log.Error("update commit id error %s", err.Error())
|
log.Errorf("update commit id error %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
l.commitLock.Unlock()
|
l.commitLock.Unlock()
|
||||||
|
|
|
@ -320,14 +320,14 @@ func (m *mmapReadFile) ReadAt(buf []byte, offset int64) (int, error) {
|
||||||
func (m *mmapReadFile) Close() error {
|
func (m *mmapReadFile) Close() error {
|
||||||
if m.m != nil {
|
if m.m != nil {
|
||||||
if err := m.m.Unmap(); err != nil {
|
if err := m.m.Unmap(); err != nil {
|
||||||
log.Error("unmap %s error %s", m.name, err.Error())
|
log.Errorf("unmap %s error %s", m.name, err.Error())
|
||||||
}
|
}
|
||||||
m.m = nil
|
m.m = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.f != nil {
|
if m.f != nil {
|
||||||
if err := m.f.Close(); err != nil {
|
if err := m.f.Close(); err != nil {
|
||||||
log.Error("close %s error %s", m.name, err.Error())
|
log.Errorf("close %s error %s", m.name, err.Error())
|
||||||
}
|
}
|
||||||
m.f = nil
|
m.f = nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ func (s *FileStore) storeLog(l *Log) error {
|
||||||
r, err = s.w.Flush()
|
r, err = s.w.Flush()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("write table flush error %s, can not store!!!", err.Error())
|
log.Fatalf("write table flush error %s, can not store!!!", err.Error())
|
||||||
|
|
||||||
s.w.Close()
|
s.w.Close()
|
||||||
|
|
||||||
|
@ -258,7 +258,7 @@ func (s *FileStore) Close() error {
|
||||||
|
|
||||||
if r, err := s.w.Flush(); err != nil {
|
if r, err := s.w.Flush(); err != nil {
|
||||||
if err != errNilHandler {
|
if err != errNilHandler {
|
||||||
log.Error("close err: %s", err.Error())
|
log.Errorf("close err: %s", err.Error())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
r.Close()
|
r.Close()
|
||||||
|
@ -315,10 +315,10 @@ func (s *FileStore) purgeTableReaders(purges []*tableReader) {
|
||||||
metaName := fmtTableMetaName(r.base, r.index)
|
metaName := fmtTableMetaName(r.base, r.index)
|
||||||
r.Close()
|
r.Close()
|
||||||
if err := os.Remove(dataName); err != nil {
|
if err := os.Remove(dataName); err != nil {
|
||||||
log.Error("purge table data %s err: %s", dataName, err.Error())
|
log.Errorf("purge table data %s err: %s", dataName, err.Error())
|
||||||
}
|
}
|
||||||
if err := os.Remove(metaName); err != nil {
|
if err := os.Remove(metaName); err != nil {
|
||||||
log.Error("purge table meta %s err: %s", metaName, err.Error())
|
log.Errorf("purge table meta %s err: %s", metaName, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -337,7 +337,7 @@ func (s *FileStore) load() error {
|
||||||
for _, f := range fs {
|
for _, f := range fs {
|
||||||
if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil {
|
if _, err := fmt.Sscanf(f.Name(), "%08d.data", &index); err == nil {
|
||||||
if r, err = newTableReader(s.base, index, s.cfg.Replication.UseMmap); err != nil {
|
if r, err = newTableReader(s.base, index, s.cfg.Replication.UseMmap); err != nil {
|
||||||
log.Error("load table %s err: %s", f.Name(), err.Error())
|
log.Errorf("load table %s err: %s", f.Name(), err.Error())
|
||||||
} else {
|
} else {
|
||||||
s.rs = append(s.rs, r)
|
s.rs = append(s.rs, r)
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,10 +61,10 @@ func newTableReader(base string, index int64, useMmap bool) (*tableReader, error
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if err = t.check(); err != nil {
|
if err = t.check(); err != nil {
|
||||||
log.Error("check %d error: %s, try to repair", t.index, err.Error())
|
log.Errorf("check %d error: %s, try to repair", t.index, err.Error())
|
||||||
|
|
||||||
if err = t.repair(); err != nil {
|
if err = t.repair(); err != nil {
|
||||||
log.Error("repair %d error: %s", t.index, err.Error())
|
log.Errorf("repair %d error: %s", t.index, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,12 +221,12 @@ func (t *tableReader) repair() error {
|
||||||
nextPos, err = t.decodeLogHead(&l, data, pos)
|
nextPos, err = t.decodeLogHead(&l, data, pos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//if error, we may lost all logs from pos
|
//if error, we may lost all logs from pos
|
||||||
log.Error("%s may lost logs from %d", data.Name(), pos)
|
log.Errorf("%s may lost logs from %d", data.Name(), pos)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.ID == 0 {
|
if l.ID == 0 {
|
||||||
log.Error("%s may lost logs from %d, invalid log 0", data.Name(), pos)
|
log.Errorf("%s may lost logs from %d, invalid log 0", data.Name(), pos)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ func (t *tableReader) repair() error {
|
||||||
if t.last == 0 {
|
if t.last == 0 {
|
||||||
t.last = l.ID
|
t.last = l.ID
|
||||||
} else if l.ID <= t.last {
|
} else if l.ID <= t.last {
|
||||||
log.Error("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID)
|
log.Errorf("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,7 +260,7 @@ func (t *tableReader) repair() error {
|
||||||
data.SetOffset(pos)
|
data.SetOffset(pos)
|
||||||
|
|
||||||
if _, err = data.Write(magic); err != nil {
|
if _, err = data.Write(magic); err != nil {
|
||||||
log.Error("write magic error %s", err.Error())
|
log.Errorf("write magic error %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = data.Close(); err != nil {
|
if err = data.Close(); err != nil {
|
||||||
|
@ -387,18 +387,18 @@ func (t *tableWriter) SetSyncType(tp int) {
|
||||||
func (t *tableWriter) close() {
|
func (t *tableWriter) close() {
|
||||||
if t.meta != nil {
|
if t.meta != nil {
|
||||||
if err := t.meta.Close(); err != nil {
|
if err := t.meta.Close(); err != nil {
|
||||||
log.Fatal("close log meta error %s", err.Error())
|
log.Fatalf("close log meta error %s", err.Error())
|
||||||
}
|
}
|
||||||
t.meta = nil
|
t.meta = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.data != nil {
|
if t.data != nil {
|
||||||
if _, err := t.data.Write(magic); err != nil {
|
if _, err := t.data.Write(magic); err != nil {
|
||||||
log.Fatal("write magic error %s", err.Error())
|
log.Fatalf("write magic error %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.data.Close(); err != nil {
|
if err := t.data.Close(); err != nil {
|
||||||
log.Fatal("close log data error %s", err.Error())
|
log.Fatalf("close log data error %s", err.Error())
|
||||||
}
|
}
|
||||||
t.data = nil
|
t.data = nil
|
||||||
}
|
}
|
||||||
|
@ -519,7 +519,7 @@ func (t *tableWriter) storeLog(l *Log) error {
|
||||||
|
|
||||||
if t.syncType == 2 {
|
if t.syncType == 2 {
|
||||||
if err := t.data.Sync(); err != nil {
|
if err := t.data.Sync(); err != nil {
|
||||||
log.Error("sync table error %s", err.Error())
|
log.Errorf("sync table error %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (r *Replication) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.updateCommitID(r.commitID, true); err != nil {
|
if err := r.updateCommitID(r.commitID, true); err != nil {
|
||||||
log.Error("update commit id err %s", err.Error())
|
log.Errorf("update commit id err %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.commitLog != nil {
|
if r.commitLog != nil {
|
||||||
|
@ -301,7 +301,7 @@ func (r *Replication) run() {
|
||||||
err := r.s.PurgeExpired(int64(n))
|
err := r.s.PurgeExpired(int64(n))
|
||||||
r.m.Unlock()
|
r.m.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("purge expired log error %s", err.Error())
|
log.Errorf("purge expired log error %s", err.Error())
|
||||||
}
|
}
|
||||||
case <-syncTc.C:
|
case <-syncTc.C:
|
||||||
if r.cfg.Replication.SyncLog == 1 {
|
if r.cfg.Replication.SyncLog == 1 {
|
||||||
|
@ -309,7 +309,7 @@ func (r *Replication) run() {
|
||||||
err := r.s.Sync()
|
err := r.s.Sync()
|
||||||
r.m.Unlock()
|
r.m.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("sync store error %s", err.Error())
|
log.Errorf("sync store error %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if r.cfg.Replication.SyncLog != 2 {
|
if r.cfg.Replication.SyncLog != 2 {
|
||||||
|
@ -319,7 +319,7 @@ func (r *Replication) run() {
|
||||||
r.m.Unlock()
|
r.m.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("sync commitid error %s", err.Error())
|
log.Errorf("sync commitid error %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-r.quit:
|
case <-r.quit:
|
||||||
|
|
|
@ -132,7 +132,7 @@ func (w *httpWriter) genericWrite(result interface{}) {
|
||||||
case "msgpack":
|
case "msgpack":
|
||||||
writeMsgPack(&m, w.w)
|
writeMsgPack(&m, w.w)
|
||||||
default:
|
default:
|
||||||
log.Error("invalid content type %s", w.contentType)
|
log.Errorf("invalid content type %s", w.contentType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,7 @@ func (c *respClient) run() {
|
||||||
n := runtime.Stack(buf, false)
|
n := runtime.Stack(buf, false)
|
||||||
buf = buf[0:n]
|
buf = buf[0:n]
|
||||||
|
|
||||||
log.Fatal("client run panic %s:%v", buf, e)
|
log.Fatalf("client run panic %s:%v", buf, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.client.close()
|
c.client.close()
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (m *master) runReplication(restart bool) {
|
||||||
defer m.wg.Done()
|
defer m.wg.Done()
|
||||||
|
|
||||||
if err := m.resetConn(); err != nil {
|
if err := m.resetConn(); err != nil {
|
||||||
log.Error("reset conn error %s", err.Error())
|
log.Errorf("reset conn error %s", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,20 +116,20 @@ func (m *master) runReplication(restart bool) {
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
if _, err := m.conn.Do("ping"); err != nil {
|
if _, err := m.conn.Do("ping"); err != nil {
|
||||||
log.Error("ping master %s error %s, try 2s later", m.addr, err.Error())
|
log.Errorf("ping master %s error %s, try 2s later", m.addr, err.Error())
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.replConf(); err != nil {
|
if err := m.replConf(); err != nil {
|
||||||
log.Error("replconf error %s", err.Error())
|
log.Errorf("replconf error %s", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if restart {
|
if restart {
|
||||||
if err := m.fullSync(); err != nil {
|
if err := m.fullSync(); err != nil {
|
||||||
log.Error("restart fullsync error %s", err.Error())
|
log.Errorf("restart fullsync error %s", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,7 +140,7 @@ func (m *master) runReplication(restart bool) {
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
if err := m.sync(); err != nil {
|
if err := m.sync(); err != nil {
|
||||||
log.Error("sync error %s", err.Error())
|
log.Errorf("sync error %s", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -183,12 +183,12 @@ func (m *master) fullSync() error {
|
||||||
err = m.conn.ReceiveBulkTo(f)
|
err = m.conn.ReceiveBulkTo(f)
|
||||||
f.Close()
|
f.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("read dump data error %s", err.Error())
|
log.Errorf("read dump data error %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = m.app.ldb.LoadDumpFile(dumpPath); err != nil {
|
if _, err = m.app.ldb.LoadDumpFile(dumpPath); err != nil {
|
||||||
log.Error("load dump file error %s", err.Error())
|
log.Errorf("load dump file error %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,7 +319,7 @@ func (app *App) removeSlave(c *client, activeQuit bool) {
|
||||||
|
|
||||||
if _, ok := app.slaves[addr]; ok {
|
if _, ok := app.slaves[addr]; ok {
|
||||||
delete(app.slaves, addr)
|
delete(app.slaves, addr)
|
||||||
log.Info("remove slave %s", addr)
|
log.Infof("remove slave %s", addr)
|
||||||
if activeQuit {
|
if activeQuit {
|
||||||
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
|
asyncNotifyUint64(app.slaveSyncAck, c.lastLogID.Get())
|
||||||
}
|
}
|
||||||
|
@ -372,7 +372,7 @@ func (app *App) publishNewLog(l *rpl.Log) {
|
||||||
//slave has already owned this log
|
//slave has already owned this log
|
||||||
n++
|
n++
|
||||||
} else if lastLogID > logId {
|
} else if lastLogID > logId {
|
||||||
log.Error("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, lastLogID, logId)
|
log.Errorf("invalid slave %s, lastlogid %d > %d", s.slaveListeningAddr, lastLogID, logId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,7 +390,7 @@ func (app *App) publishNewLog(l *rpl.Log) {
|
||||||
for i := 0; i < slaveNum; i++ {
|
for i := 0; i < slaveNum; i++ {
|
||||||
id := <-app.slaveSyncAck
|
id := <-app.slaveSyncAck
|
||||||
if id < logId {
|
if id < logId {
|
||||||
log.Info("some slave may close with last logid %d < %d", id, logId)
|
log.Infof("some slave may close with last logid %d < %d", id, logId)
|
||||||
} else {
|
} else {
|
||||||
n++
|
n++
|
||||||
if n >= total {
|
if n >= total {
|
||||||
|
|
|
@ -76,20 +76,20 @@ func (s *snapshotStore) checkSnapshots() error {
|
||||||
cfg := s.cfg
|
cfg := s.cfg
|
||||||
snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path)
|
snapshots, err := ioutil.ReadDir(cfg.Snapshot.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("read %s error: %s", cfg.Snapshot.Path, err.Error())
|
log.Errorf("read %s error: %s", cfg.Snapshot.Path, err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
names := []string{}
|
names := []string{}
|
||||||
for _, info := range snapshots {
|
for _, info := range snapshots {
|
||||||
if path.Ext(info.Name()) == ".tmp" {
|
if path.Ext(info.Name()) == ".tmp" {
|
||||||
log.Error("temp snapshot file name %s, try remove", info.Name())
|
log.Errorf("temp snapshot file name %s, try remove", info.Name())
|
||||||
os.Remove(path.Join(cfg.Snapshot.Path, info.Name()))
|
os.Remove(path.Join(cfg.Snapshot.Path, info.Name()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := parseSnapshotName(info.Name()); err != nil {
|
if _, err := parseSnapshotName(info.Name()); err != nil {
|
||||||
log.Error("invalid snapshot file name %s, err: %s", info.Name(), err.Error())
|
log.Errorf("invalid snapshot file name %s, err: %s", info.Name(), err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,7 +115,7 @@ func (s *snapshotStore) run() {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
s.Lock()
|
s.Lock()
|
||||||
if err := s.checkSnapshots(); err != nil {
|
if err := s.checkSnapshots(); err != nil {
|
||||||
log.Error("check snapshots error %s", err.Error())
|
log.Errorf("check snapshots error %s", err.Error())
|
||||||
}
|
}
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
|
@ -144,7 +144,7 @@ func (s *snapshotStore) purge(create bool) {
|
||||||
|
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if err := os.Remove(s.snapshotPath(name)); err != nil {
|
if err := os.Remove(s.snapshotPath(name)); err != nil {
|
||||||
log.Error("purge snapshot %s error %s", name, err.Error())
|
log.Errorf("purge snapshot %s error %s", name, err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue