bugfix for replication

This commit is contained in:
siddontang 2014-09-28 20:37:57 +08:00
parent b4d82d7e34
commit 69e489dd1b
2 changed files with 13 additions and 5 deletions

View File

@ -55,7 +55,7 @@ func (b *batch) Put(key []byte, value []byte) {
func (b *batch) Delete(key []byte) { func (b *batch) Delete(key []byte) {
if b.l.r != nil { if b.l.r != nil {
b.Delete(key) b.eb.Delete(key)
} }
b.WriteBatch.Delete(key) b.WriteBatch.Delete(key)
@ -121,11 +121,13 @@ func (l *Ledis) handleCommit(eb *eventBatch, c commiter) error {
if err = c.Commit(); err != nil { if err = c.Commit(); err != nil {
log.Fatal("commit error %s", err.Error()) log.Fatal("commit error %s", err.Error())
l.noticeReplication()
return err return err
} }
if err = l.r.UpdateCommitID(rl.ID); err != nil { if err = l.r.UpdateCommitID(rl.ID); err != nil {
log.Fatal("update commit id error %s", err.Error()) log.Fatal("update commit id error %s", err.Error())
l.noticeReplication()
return err return err
} }

View File

@ -23,6 +23,9 @@ func (l *Ledis) ReplicationUsed() bool {
} }
func (l *Ledis) handleReplication() error { func (l *Ledis) handleReplication() error {
l.wLock.Lock()
defer l.wLock.Unlock()
l.rwg.Add(1) l.rwg.Add(1)
rl := &rpl.Log{} rl := &rpl.Log{}
var err error var err error
@ -73,8 +76,6 @@ func (l *Ledis) onReplication() {
select { select {
case <-l.rc: case <-l.rc:
l.handleReplication() l.handleReplication()
case <-time.After(5 * time.Second):
l.handleReplication()
case <-l.quit: case <-l.quit:
return return
} }
@ -86,7 +87,8 @@ func (l *Ledis) WaitReplication() error {
return ErrRplNotSupport return ErrRplNotSupport
} }
AsyncNotify(l.rc)
l.noticeReplication()
l.rwg.Wait() l.rwg.Wait()
@ -125,11 +127,15 @@ func (l *Ledis) StoreLogsFromReader(rb io.Reader) error {
} }
AsyncNotify(l.rc) l.noticeReplication()
return nil return nil
} }
func (l *Ledis) noticeReplication() {
AsyncNotify(l.rc)
}
func (l *Ledis) StoreLogsFromData(data []byte) error { func (l *Ledis) StoreLogsFromData(data []byte) error {
rb := bytes.NewReader(data) rb := bytes.NewReader(data)