diff --git a/ledis/batch.go b/ledis/batch.go index c77e91f..61d5cd2 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -55,7 +55,7 @@ func (b *batch) Put(key []byte, value []byte) { func (b *batch) Delete(key []byte) { if b.l.r != nil { - b.Delete(key) + b.eb.Delete(key) } b.WriteBatch.Delete(key) @@ -121,11 +121,13 @@ func (l *Ledis) handleCommit(eb *eventBatch, c commiter) error { if err = c.Commit(); err != nil { log.Fatal("commit error %s", err.Error()) + l.noticeReplication() return err } if err = l.r.UpdateCommitID(rl.ID); err != nil { log.Fatal("update commit id error %s", err.Error()) + l.noticeReplication() return err } diff --git a/ledis/replication.go b/ledis/replication.go index 7b4d30c..6741589 100644 --- a/ledis/replication.go +++ b/ledis/replication.go @@ -23,6 +23,9 @@ func (l *Ledis) ReplicationUsed() bool { } func (l *Ledis) handleReplication() error { + l.wLock.Lock() + defer l.wLock.Unlock() + l.rwg.Add(1) rl := &rpl.Log{} var err error @@ -73,8 +76,6 @@ func (l *Ledis) onReplication() { select { case <-l.rc: l.handleReplication() - case <-time.After(5 * time.Second): - l.handleReplication() case <-l.quit: return } @@ -86,7 +87,8 @@ func (l *Ledis) WaitReplication() error { return ErrRplNotSupport } - AsyncNotify(l.rc) + + l.noticeReplication() l.rwg.Wait() @@ -125,11 +127,15 @@ func (l *Ledis) StoreLogsFromReader(rb io.Reader) error { } - AsyncNotify(l.rc) + l.noticeReplication() return nil } +func (l *Ledis) noticeReplication() { + AsyncNotify(l.rc) +} + func (l *Ledis) StoreLogsFromData(data []byte) error { rb := bytes.NewReader(data)