update watigroup add place

This commit is contained in:
siddontang 2014-09-27 20:13:13 +08:00
parent 7df7af8b54
commit b4d82d7e34
2 changed files with 15 additions and 12 deletions

View File

@ -74,6 +74,7 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) {
l.rc = make(chan struct{}, 8) l.rc = make(chan struct{}, 8)
l.rbatch = l.ldb.NewWriteBatch() l.rbatch = l.ldb.NewWriteBatch()
l.wg.Add(1)
go l.onReplication() go l.onReplication()
//first we must try wait all replication ok //first we must try wait all replication ok
@ -87,6 +88,7 @@ func Open2(cfg *config.Config, flags int) (*Ledis, error) {
l.dbs[i] = l.newDB(i) l.dbs[i] = l.newDB(i)
} }
l.wg.Add(1)
go l.onDataExpired() go l.onDataExpired()
return l, nil return l, nil
@ -176,7 +178,6 @@ func (l *Ledis) SetReadOnly(b bool) {
} }
func (l *Ledis) onDataExpired() { func (l *Ledis) onDataExpired() {
l.wg.Add(1)
defer l.wg.Done() defer l.wg.Done()
var executors []*elimination = make([]*elimination, len(l.dbs)) var executors []*elimination = make([]*elimination, len(l.dbs))

View File

@ -22,19 +22,18 @@ func (l *Ledis) ReplicationUsed() bool {
return l.r != nil return l.r != nil
} }
func (l *Ledis) handleReplication() { func (l *Ledis) handleReplication() error {
l.commitLock.Lock()
defer l.commitLock.Unlock()
l.rwg.Add(1) l.rwg.Add(1)
rl := &rpl.Log{} rl := &rpl.Log{}
var err error
for { for {
if err := l.r.NextNeedCommitLog(rl); err != nil { if err = l.r.NextNeedCommitLog(rl); err != nil {
if err != rpl.ErrNoBehindLog { if err != rpl.ErrNoBehindLog {
log.Error("get next commit log err, %s", err.Error) log.Error("get next commit log err, %s", err.Error)
return err
} else { } else {
l.rwg.Done() l.rwg.Done()
return return nil
} }
} else { } else {
l.rbatch.Rollback() l.rbatch.Rollback()
@ -43,18 +42,22 @@ func (l *Ledis) handleReplication() {
//todo optimize //todo optimize
if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil { if rl.Data, err = snappy.Decode(nil, rl.Data); err != nil {
log.Error("decode log error %s", err.Error()) log.Error("decode log error %s", err.Error())
return return err
} }
} }
decodeEventBatch(l.rbatch, rl.Data) decodeEventBatch(l.rbatch, rl.Data)
if err := l.rbatch.Commit(); err != nil { l.commitLock.Lock()
if err = l.rbatch.Commit(); err != nil {
log.Error("commit log error %s", err.Error()) log.Error("commit log error %s", err.Error())
return
} else if err = l.r.UpdateCommitID(rl.ID); err != nil { } else if err = l.r.UpdateCommitID(rl.ID); err != nil {
log.Error("update commit id error %s", err.Error()) log.Error("update commit id error %s", err.Error())
return }
l.commitLock.Unlock()
if err != nil {
return err
} }
} }
@ -62,7 +65,6 @@ func (l *Ledis) handleReplication() {
} }
func (l *Ledis) onReplication() { func (l *Ledis) onReplication() {
l.wg.Add(1)
defer l.wg.Done() defer l.wg.Done()
AsyncNotify(l.rc) AsyncNotify(l.rc)