forked from mirror/ledisdb
try to fix replication race test error
This commit is contained in:
parent
232b1543e0
commit
fdcaa0c4cc
|
@ -27,11 +27,11 @@ type Ledis struct {
|
|||
wg sync.WaitGroup
|
||||
|
||||
//for replication
|
||||
r *rpl.Replication
|
||||
rc chan struct{}
|
||||
rbatch *store.WriteBatch
|
||||
rwg sync.WaitGroup
|
||||
rhs []NewLogEventHandler
|
||||
r *rpl.Replication
|
||||
rc chan struct{}
|
||||
rbatch *store.WriteBatch
|
||||
rDoneCh chan struct{}
|
||||
rhs []NewLogEventHandler
|
||||
|
||||
wLock sync.RWMutex //allow one write at same time
|
||||
commitLock sync.Mutex //allow one write commit at same time
|
||||
|
@ -77,6 +77,7 @@ func Open(cfg *config.Config) (*Ledis, error) {
|
|||
|
||||
l.rc = make(chan struct{}, 1)
|
||||
l.rbatch = l.ldb.NewWriteBatch()
|
||||
l.rDoneCh = make(chan struct{}, 1)
|
||||
|
||||
l.wg.Add(1)
|
||||
go l.onReplication()
|
||||
|
|
|
@ -28,8 +28,10 @@ func (l *Ledis) handleReplication() error {
|
|||
l.wLock.Lock()
|
||||
defer l.wLock.Unlock()
|
||||
|
||||
l.rwg.Add(1)
|
||||
defer AsyncNotify(l.rDoneCh)
|
||||
|
||||
rl := &rpl.Log{}
|
||||
|
||||
var err error
|
||||
for {
|
||||
if err = l.r.NextNeedCommitLog(rl); err != nil {
|
||||
|
@ -37,7 +39,6 @@ func (l *Ledis) handleReplication() error {
|
|||
log.Errorf("get next commit log err, %s", err.Error)
|
||||
return err
|
||||
} else {
|
||||
l.rwg.Done()
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
|
@ -95,18 +96,20 @@ func (l *Ledis) WaitReplication() error {
|
|||
|
||||
}
|
||||
|
||||
l.noticeReplication()
|
||||
l.rwg.Wait()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
l.noticeReplication()
|
||||
|
||||
select {
|
||||
case <-l.rDoneCh:
|
||||
case <-l.quit:
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
b, err := l.r.CommitIDBehind()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if b {
|
||||
l.noticeReplication()
|
||||
l.rwg.Wait()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
} else {
|
||||
} else if !b {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue