diff --git a/ledis/ledis.go b/ledis/ledis.go index 0052b3a..949452c 100644 --- a/ledis/ledis.go +++ b/ledis/ledis.go @@ -74,6 +74,7 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) { l.rc = make(chan struct{}, 8) l.rbatch = l.ldb.NewWriteBatch() + l.wg.Add(1) go l.onReplication() //first we must try wait all replication ok @@ -87,6 +88,7 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) { l.dbs[i] = l.newDB(i) } + l.wg.Add(1) go l.onDataExpired() return l, nil @@ -176,7 +178,6 @@ func (l *Ledis) SetReadOnly(b bool) { } func (l *Ledis) onDataExpired() { - l.wg.Add(1) defer l.wg.Done() var executors []*elimination = make([]*elimination, len(l.dbs)) diff --git a/ledis/replication.go b/ledis/replication.go index 1fa1531..7b4d30c 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -22,19 +22,18 @@ func (l *Ledis) ReplicationUsed() bool { return l.r != nil } -func (l *Ledis) handleReplication() { - l.commitLock.Lock() - defer l.commitLock.Unlock() - +func (l *Ledis) handleReplication() error { l.rwg.Add(1) rl := &rpl.Log{} + var err error for { - if err := l.r.NextNeedCommitLog(rl); err != nil { + if err = l.r.NextNeedCommitLog(rl); err != nil { if err != rpl.ErrNoBehindLog { log.Error("get next commit log err, %s", err.Error) + return err } else { l.rwg.Done() - return + return nil } } else { l.rbatch.Rollback() @@ -43,18 +42,22 @@ func (l *Ledis) handleReplication() { //todo optimize if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil { log.Error("decode log error %s", err.Error()) - return + return err } } decodeEventBatch(l.rbatch, rl.Data) - if err := l.rbatch.Commit(); err != nil { + l.commitLock.Lock() + if err = l.rbatch.Commit(); err != nil { log.Error("commit log error %s", err.Error()) - return } else if err = l.r.UpdateCommitID(rl.ID); err != nil { log.Error("update commit id error %s", err.Error()) - return + } + + l.commitLock.Unlock() + if err != nil { + return err } } @@ -62,7 +65,6 @@ func (l *Ledis) handleReplication() { } func (l *Ledis) onReplication() { - l.wg.Add(1) defer l.wg.Done() AsyncNotify(l.rc)