ledisdb/ledis/batch.go

140 lines
2.2 KiB
Go
Raw Normal View History

2014-09-02 13:55:12 +04:00
package ledis
import (
2015-05-04 17:42:28 +03:00
"sync"
2014-09-24 05:46:36 +04:00
"github.com/siddontang/go/log"
2014-09-22 13:50:51 +04:00
"github.com/siddontang/ledisdb/rpl"
2014-09-02 13:55:12 +04:00
"github.com/siddontang/ledisdb/store"
)
type batch struct {
l *Ledis
2014-10-15 06:18:20 +04:00
*store.WriteBatch
2014-09-02 13:55:12 +04:00
sync.Locker
// tx *Tx
2014-09-02 13:55:12 +04:00
}
func (b *batch) Commit() error {
2014-11-14 11:00:03 +03:00
if b.l.cfg.GetReadonly() {
2014-09-22 13:50:51 +04:00
return ErrWriteInROnly
2014-09-17 19:06:42 +04:00
}
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
// if b.tx == nil {
// return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
// } else {
// if b.l.r != nil {
// if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
// return err
// }
// }
// return b.WriteBatch.Commit()
// }
2014-09-02 13:55:12 +04:00
}
func (b *batch) Lock() {
b.Locker.Lock()
}
func (b *batch) Unlock() {
b.WriteBatch.Rollback()
b.Locker.Unlock()
}
func (b *batch) Put(key []byte, value []byte) {
b.WriteBatch.Put(key, value)
}
func (b *batch) Delete(key []byte) {
b.WriteBatch.Delete(key)
}
type dbBatchLocker struct {
l *sync.Mutex
wrLock *sync.RWMutex
}
func (l *dbBatchLocker) Lock() {
l.wrLock.RLock()
l.l.Lock()
}
func (l *dbBatchLocker) Unlock() {
l.l.Unlock()
l.wrLock.RUnlock()
}
// type txBatchLocker struct {
// }
2014-09-25 06:44:07 +04:00
// func (l *txBatchLocker) Lock() {}
// func (l *txBatchLocker) Unlock() {}
2014-09-25 06:44:07 +04:00
// type multiBatchLocker struct {
// }
2014-09-02 13:55:12 +04:00
// func (l *multiBatchLocker) Lock() {}
// func (l *multiBatchLocker) Unlock() {}
2014-09-02 13:55:12 +04:00
func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker) *batch {
2014-09-02 13:55:12 +04:00
b := new(batch)
b.l = l
b.WriteBatch = wb
b.Locker = locker
return b
}
2014-09-25 06:44:07 +04:00
type commiter interface {
Commit() error
}
type commitDataGetter interface {
Data() []byte
}
func (l *Ledis) handleCommit(g commitDataGetter, c commiter) error {
2014-09-25 06:44:07 +04:00
l.commitLock.Lock()
var err error
if l.r != nil {
var rl *rpl.Log
if rl, err = l.r.Log(g.Data()); err != nil {
l.commitLock.Unlock()
2015-01-12 05:32:03 +03:00
log.Fatalf("write wal error %s", err.Error())
2014-09-25 06:44:07 +04:00
return err
}
l.propagate(rl)
if err = c.Commit(); err != nil {
l.commitLock.Unlock()
2015-01-12 05:32:03 +03:00
log.Fatalf("commit error %s", err.Error())
2014-09-28 16:37:57 +04:00
l.noticeReplication()
2014-09-25 06:44:07 +04:00
return err
}
if err = l.r.UpdateCommitID(rl.ID); err != nil {
l.commitLock.Unlock()
2015-01-12 05:32:03 +03:00
log.Fatalf("update commit id error %s", err.Error())
2014-09-28 16:37:57 +04:00
l.noticeReplication()
2014-09-25 06:44:07 +04:00
return err
}
} else {
err = c.Commit()
2014-09-25 06:44:07 +04:00
}
l.commitLock.Unlock()
return err
2014-09-25 06:44:07 +04:00
}