forked from mirror/ledisdb
store add transaction support
This commit is contained in:
parent
69f9123777
commit
a77de18dff
|
@ -73,3 +73,12 @@ func (db *DB) RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset
|
|||
func (db *DB) RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *RangeLimitIterator {
|
||||
return NewRevRangeLimitIterator(db.NewIterator(), &Range{min, max, rangeType}, &Limit{offset, count})
|
||||
}
|
||||
|
||||
func (db *DB) Begin() (*Tx, error) {
|
||||
tx, err := db.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Tx{tx}, nil
|
||||
}
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
package driver
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrTxSupport = errors.New("transaction is not supported")
|
||||
)
|
||||
|
||||
type IDB interface {
|
||||
Close() error
|
||||
|
||||
|
@ -11,6 +19,8 @@ type IDB interface {
|
|||
NewIterator() IIterator
|
||||
|
||||
NewWriteBatch() IWriteBatch
|
||||
|
||||
Begin() (Tx, error)
|
||||
}
|
||||
|
||||
type IIterator interface {
|
||||
|
@ -37,3 +47,15 @@ type IWriteBatch interface {
|
|||
Commit() error
|
||||
Rollback() error
|
||||
}
|
||||
|
||||
type Tx interface {
|
||||
Get(key []byte) ([]byte, error)
|
||||
Put(key []byte, value []byte) error
|
||||
Delete(key []byte) error
|
||||
|
||||
NewIterator() IIterator
|
||||
NewWriteBatch() IWriteBatch
|
||||
|
||||
Commit() error
|
||||
Rollback() error
|
||||
}
|
||||
|
|
|
@ -135,3 +135,7 @@ func (db *DB) NewIterator() driver.IIterator {
|
|||
|
||||
return it
|
||||
}
|
||||
|
||||
func (db *DB) Begin() (driver.Tx, error) {
|
||||
return nil, driver.ErrTxSupport
|
||||
}
|
||||
|
|
|
@ -260,3 +260,7 @@ func (db *DB) delete(wo *WriteOptions, key []byte) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) Begin() (driver.Tx, error) {
|
||||
return nil, driver.ErrTxSupport
|
||||
}
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
package mdb
|
||||
|
||||
type batchPut interface {
|
||||
BatchPut([]Write) error
|
||||
}
|
||||
|
||||
type Write struct {
|
||||
Key []byte
|
||||
Value []byte
|
||||
}
|
||||
|
||||
type WriteBatch struct {
|
||||
db *MDB
|
||||
wb []Write
|
||||
batch batchPut
|
||||
wb []Write
|
||||
}
|
||||
|
||||
func (w *WriteBatch) Close() error {
|
||||
|
@ -23,7 +27,7 @@ func (w *WriteBatch) Delete(key []byte) {
|
|||
}
|
||||
|
||||
func (w *WriteBatch) Commit() error {
|
||||
return w.db.BatchPut(w.wb)
|
||||
return w.batch.BatchPut(w.wb)
|
||||
}
|
||||
|
||||
func (w *WriteBatch) Rollback() error {
|
||||
|
|
|
@ -140,6 +140,8 @@ type MDBIterator struct {
|
|||
tx *mdb.Txn
|
||||
valid bool
|
||||
err error
|
||||
|
||||
closeAutoCommit bool
|
||||
}
|
||||
|
||||
func (itr *MDBIterator) Key() []byte {
|
||||
|
@ -201,6 +203,11 @@ func (itr *MDBIterator) Close() error {
|
|||
itr.tx.Abort()
|
||||
return err
|
||||
}
|
||||
|
||||
if !itr.closeAutoCommit {
|
||||
return itr.err
|
||||
}
|
||||
|
||||
if itr.err != nil {
|
||||
itr.tx.Abort()
|
||||
return itr.err
|
||||
|
@ -226,16 +233,16 @@ func (db MDB) iterator(rdonly bool) *MDBIterator {
|
|||
}
|
||||
tx, err := db.env.BeginTxn(nil, flags)
|
||||
if err != nil {
|
||||
return &MDBIterator{nil, nil, nil, nil, false, err}
|
||||
return &MDBIterator{nil, nil, nil, nil, false, err, true}
|
||||
}
|
||||
|
||||
c, err := tx.CursorOpen(db.db)
|
||||
if err != nil {
|
||||
tx.Abort()
|
||||
return &MDBIterator{nil, nil, nil, nil, false, err}
|
||||
return &MDBIterator{nil, nil, nil, nil, false, err, true}
|
||||
}
|
||||
|
||||
return &MDBIterator{nil, nil, c, tx, true, nil}
|
||||
return &MDBIterator{nil, nil, c, tx, true, nil, true}
|
||||
}
|
||||
|
||||
func (db MDB) Close() error {
|
||||
|
@ -253,3 +260,7 @@ func (db MDB) NewIterator() driver.IIterator {
|
|||
func (db MDB) NewWriteBatch() driver.IWriteBatch {
|
||||
return &WriteBatch{&db, []Write{}}
|
||||
}
|
||||
|
||||
func (db MDB) Begin() (driver.Tx, error) {
|
||||
return newTx(db)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
package mdb
|
||||
|
||||
import (
|
||||
mdb "github.com/influxdb/gomdb"
|
||||
"github.com/siddontang/ledisdb/store/driver"
|
||||
)
|
||||
|
||||
type Tx struct {
|
||||
db mdb.DBI
|
||||
tx *mdb.Txn
|
||||
}
|
||||
|
||||
func newTx(db MDB) (*Tx, error) {
|
||||
tx, err := db.env.BeginTxn(nil, uint(0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Tx{db.db, tx}, nil
|
||||
}
|
||||
|
||||
func (t *Tx) Get(key []byte) ([]byte, error) {
|
||||
return t.tx.Get(t.db, key)
|
||||
}
|
||||
|
||||
func (t *Tx) Put(key []byte, value []byte) error {
|
||||
return t.tx.Put(t.db, key, value, mdb.NODUPDATA)
|
||||
}
|
||||
|
||||
func (t *Tx) Delete(key []byte) error {
|
||||
return t.tx.Del(t.db, key, nil)
|
||||
}
|
||||
|
||||
func (t *Tx) NewIterator() driver.IIterator {
|
||||
return t.newIterator()
|
||||
}
|
||||
|
||||
func (t *Tx) newIterator() *MDBIterator {
|
||||
c, err := t.tx.CursorOpen(t.db)
|
||||
if err != nil {
|
||||
return &MDBIterator{nil, nil, nil, nil, false, err, false}
|
||||
}
|
||||
|
||||
return &MDBIterator{nil, nil, c, t.tx, true, nil, false}
|
||||
}
|
||||
|
||||
func (t *Tx) NewWriteBatch() driver.IWriteBatch {
|
||||
return &WriteBatch{t, []Write{}}
|
||||
}
|
||||
|
||||
func (t *Tx) BatchPut(writes []Write) error {
|
||||
itr := t.newIterator()
|
||||
|
||||
for _, w := range writes {
|
||||
if w.Value == nil {
|
||||
itr.key, itr.value, itr.err = itr.c.Get(w.Key, mdb.SET)
|
||||
if itr.err == nil {
|
||||
itr.err = itr.c.Del(0)
|
||||
}
|
||||
} else {
|
||||
itr.err = itr.c.Put(w.Key, w.Value, 0)
|
||||
}
|
||||
|
||||
if itr.err != nil && itr.err != mdb.NotFound {
|
||||
break
|
||||
}
|
||||
}
|
||||
itr.setState()
|
||||
|
||||
return itr.Close()
|
||||
|
||||
}
|
||||
|
||||
func (t *Tx) Rollback() error {
|
||||
t.tx.Abort()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Tx) Commit() error {
|
||||
return t.tx.Commit()
|
||||
}
|
|
@ -30,3 +30,11 @@ func TestLMDB(t *testing.T) {
|
|||
|
||||
db.Close()
|
||||
}
|
||||
|
||||
func TestLMDBTx(t *testing.T) {
|
||||
db := newTestLMDB()
|
||||
|
||||
testTx(db, t)
|
||||
|
||||
db.Close()
|
||||
}
|
||||
|
|
|
@ -277,3 +277,7 @@ func (db *DB) delete(wo *WriteOptions, key []byte) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) Begin() (driver.Tx, error) {
|
||||
return nil, driver.ErrTxSupport
|
||||
}
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/siddontang/ledisdb/store/driver"
|
||||
)
|
||||
|
||||
type Tx struct {
|
||||
driver.Tx
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestTx(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func testTx(db *DB, t *testing.T) {
|
||||
key1 := []byte("1")
|
||||
key2 := []byte("2")
|
||||
|
||||
db.Put(key1, []byte("1"))
|
||||
db.Put(key2, []byte("2"))
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := tx.Put(key1, []byte("a")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := tx.Put(key2, []byte("b")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
it := tx.NewIterator()
|
||||
|
||||
it.Seek(key1)
|
||||
|
||||
if !it.Valid() {
|
||||
t.Fatal("must valid")
|
||||
} else if string(it.Value()) != "a" {
|
||||
t.Fatal(string(it.Value()))
|
||||
}
|
||||
|
||||
it.Close()
|
||||
|
||||
tx.Rollback()
|
||||
|
||||
if v, err := db.Get(key1); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(v) != "1" {
|
||||
t.Fatal(string(v))
|
||||
}
|
||||
|
||||
tx, err = db.Begin()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := tx.Put(key1, []byte("a")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
it = tx.NewIterator()
|
||||
|
||||
it.Seek(key2)
|
||||
|
||||
if !it.Valid() {
|
||||
t.Fatal("must valid")
|
||||
} else if string(it.Value()) != "2" {
|
||||
t.Fatal(string(it.Value()))
|
||||
}
|
||||
|
||||
it.Close()
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if v, err := db.Get(key1); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if string(v) != "a" {
|
||||
t.Fatal(string(v))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue