mirror of https://github.com/ledisdb/ledisdb.git
bugfix for replication
This commit is contained in:
parent
42f6a3679f
commit
6fac39c96b
|
@ -49,6 +49,9 @@ func (b *replBatch) Commit() error {
|
|||
}
|
||||
}
|
||||
|
||||
b.events = [][]byte{}
|
||||
b.lastHead = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -64,6 +67,8 @@ func (l *Ledis) replicateEvent(b *replBatch, event []byte) error {
|
|||
return errInvalidBinLogEvent
|
||||
}
|
||||
|
||||
b.events = append(b.events, event)
|
||||
|
||||
logType := uint8(event[0])
|
||||
switch logType {
|
||||
case BinLogTypePut:
|
||||
|
@ -83,10 +88,6 @@ func (l *Ledis) replicatePutEvent(b *replBatch, event []byte) error {
|
|||
|
||||
b.wb.Put(key, value)
|
||||
|
||||
if b.l.binlog != nil {
|
||||
b.events = append(b.events, event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -98,16 +99,11 @@ func (l *Ledis) replicateDeleteEvent(b *replBatch, event []byte) error {
|
|||
|
||||
b.wb.Delete(key)
|
||||
|
||||
if b.l.binlog != nil {
|
||||
b.events = append(b.events, event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
|
||||
head := &BinLogHead{}
|
||||
var dataBuf bytes.Buffer
|
||||
var err error
|
||||
|
||||
for {
|
||||
|
@ -119,6 +115,8 @@ func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) er
|
|||
}
|
||||
}
|
||||
|
||||
var dataBuf bytes.Buffer
|
||||
|
||||
if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -127,8 +125,6 @@ func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) er
|
|||
if err != nil && err != ErrSkipEvent {
|
||||
return err
|
||||
}
|
||||
|
||||
dataBuf.Reset()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue