mirror of https://github.com/ledisdb/ledisdb.git
readd transaction
This commit is contained in:
parent
595ead55bd
commit
dca71891c3
|
@ -9,7 +9,8 @@ from ledis.exceptions import (
|
||||||
ConnectionError,
|
ConnectionError,
|
||||||
DataError,
|
DataError,
|
||||||
LedisError,
|
LedisError,
|
||||||
ResponseError
|
ResponseError,
|
||||||
|
TxNotBeginError
|
||||||
)
|
)
|
||||||
|
|
||||||
SYM_EMPTY = b('')
|
SYM_EMPTY = b('')
|
||||||
|
@ -199,6 +200,11 @@ class Ledis(object):
|
||||||
"Set a custom Response Callback"
|
"Set a custom Response Callback"
|
||||||
self.response_callbacks[command] = callback
|
self.response_callbacks[command] = callback
|
||||||
|
|
||||||
|
def tx(self):
|
||||||
|
return Transaction(
|
||||||
|
self.connection_pool,
|
||||||
|
self.response_callbacks)
|
||||||
|
|
||||||
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
|
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
|
||||||
|
|
||||||
def execute_command(self, *args, **options):
|
def execute_command(self, *args, **options):
|
||||||
|
@ -964,3 +970,43 @@ class Ledis(object):
|
||||||
|
|
||||||
def scriptflush(self):
|
def scriptflush(self):
|
||||||
return self.execute_command('SCRIPT', 'FLUSH')
|
return self.execute_command('SCRIPT', 'FLUSH')
|
||||||
|
|
||||||
|
|
||||||
|
class Transaction(Ledis):
|
||||||
|
def __init__(self, connection_pool, response_callbacks):
|
||||||
|
self.connection_pool = connection_pool
|
||||||
|
self.response_callbacks = response_callbacks
|
||||||
|
self.connection = None
|
||||||
|
|
||||||
|
def execute_command(self, *args, **options):
|
||||||
|
"Execute a command and return a parsed response"
|
||||||
|
command_name = args[0]
|
||||||
|
|
||||||
|
connection = self.connection
|
||||||
|
if self.connection is None:
|
||||||
|
raise TxNotBeginError
|
||||||
|
|
||||||
|
try:
|
||||||
|
connection.send_command(*args)
|
||||||
|
return self.parse_response(connection, command_name, **options)
|
||||||
|
except ConnectionError:
|
||||||
|
connection.disconnect()
|
||||||
|
connection.send_command(*args)
|
||||||
|
return self.parse_response(connection, command_name, **options)
|
||||||
|
|
||||||
|
def begin(self):
|
||||||
|
self.connection = self.connection_pool.get_connection('begin')
|
||||||
|
return self.execute_command("BEGIN")
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
res = self.execute_command("COMMIT")
|
||||||
|
self.connection_pool.release(self.connection)
|
||||||
|
self.connection = None
|
||||||
|
return res
|
||||||
|
|
||||||
|
def rollback(self):
|
||||||
|
res = self.execute_command("ROLLBACK")
|
||||||
|
self.connection_pool.release(self.connection)
|
||||||
|
self.connection = None
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
|
@ -35,3 +35,5 @@ class DataError(LedisError):
|
||||||
class ExecAbortError(ResponseError):
|
class ExecAbortError(ResponseError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class TxNotBeginError(LedisError):
|
||||||
|
pass
|
|
@ -0,0 +1,46 @@
|
||||||
|
import unittest
|
||||||
|
import sys
|
||||||
|
sys.path.append("..")
|
||||||
|
|
||||||
|
import ledis
|
||||||
|
|
||||||
|
global_l = ledis.Ledis()
|
||||||
|
|
||||||
|
#db that do not support transaction
|
||||||
|
dbs = ["leveldb", "rocksdb", "hyperleveldb", "goleveldb"]
|
||||||
|
check = global_l.info().get("db_name") in dbs
|
||||||
|
|
||||||
|
|
||||||
|
class TestTx(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.l = ledis.Ledis(port=6380)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.l.flushdb()
|
||||||
|
|
||||||
|
@unittest.skipIf(check, reason="db not support transaction")
|
||||||
|
def test_commit(self):
|
||||||
|
tx = self.l.tx()
|
||||||
|
self.l.set("a", "no-tx")
|
||||||
|
assert self.l.get("a") == "no-tx"
|
||||||
|
tx.begin()
|
||||||
|
tx.set("a", "tx")
|
||||||
|
assert self.l.get("a") == "no-tx"
|
||||||
|
assert tx.get("a") == "tx"
|
||||||
|
|
||||||
|
tx.commit()
|
||||||
|
assert self.l.get("a") == "tx"
|
||||||
|
|
||||||
|
@unittest.skipIf(check, reason="db not support transaction")
|
||||||
|
def test_rollback(self):
|
||||||
|
tx = self.l.tx()
|
||||||
|
self.l.set("a", "no-tx")
|
||||||
|
assert self.l.get("a") == "no-tx"
|
||||||
|
|
||||||
|
tx.begin()
|
||||||
|
tx.set("a", "tx")
|
||||||
|
assert tx.get("a") == "tx"
|
||||||
|
assert self.l.get("a") == "no-tx"
|
||||||
|
|
||||||
|
tx.rollback()
|
||||||
|
assert self.l.get("a") == "no-tx"
|
|
@ -125,6 +125,10 @@ module.exports = [
|
||||||
"spersist",
|
"spersist",
|
||||||
"sxscan",
|
"sxscan",
|
||||||
|
|
||||||
|
"begin",
|
||||||
|
"rollback",
|
||||||
|
"commit",
|
||||||
|
|
||||||
"eval",
|
"eval",
|
||||||
"evalsha",
|
"evalsha",
|
||||||
"script",
|
"script",
|
||||||
|
|
|
@ -148,6 +148,11 @@ local commands = {
|
||||||
"flushall",
|
"flushall",
|
||||||
"flushdb",
|
"flushdb",
|
||||||
|
|
||||||
|
-- [[transaction]]
|
||||||
|
"begin",
|
||||||
|
"commit",
|
||||||
|
"rollback",
|
||||||
|
|
||||||
-- [[script]]
|
-- [[script]]
|
||||||
"eval",
|
"eval",
|
||||||
"evalsha",
|
"evalsha",
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
//This file was generated by .tools/generate_commands.py on Tue Sep 09 2014 09:48:57 +0800
|
//This file was generated by .tools/generate_commands.py on Thu Sep 25 2014 09:51:10 +0800
|
||||||
package main
|
package main
|
||||||
|
|
||||||
var helpCommands = [][]string{
|
var helpCommands = [][]string{
|
||||||
{"BCOUNT", "key [start end]", "Bitmap"},
|
{"BCOUNT", "key [start end]", "Bitmap"},
|
||||||
{"BDELETE", "key", "ZSet"},
|
{"BDELETE", "key", "ZSet"},
|
||||||
|
{"BEGIN", "-", "Transaction"},
|
||||||
{"BEXPIRE", "key seconds", "Bitmap"},
|
{"BEXPIRE", "key seconds", "Bitmap"},
|
||||||
{"BEXPIREAT", "key timestamp", "Bitmap"},
|
{"BEXPIREAT", "key timestamp", "Bitmap"},
|
||||||
{"BGET", "key", "Bitmap"},
|
{"BGET", "key", "Bitmap"},
|
||||||
|
@ -14,6 +15,7 @@ var helpCommands = [][]string{
|
||||||
{"BSETBIT", "key offset value", "Bitmap"},
|
{"BSETBIT", "key offset value", "Bitmap"},
|
||||||
{"BTTL", "key", "Bitmap"},
|
{"BTTL", "key", "Bitmap"},
|
||||||
{"BXSCAN", "key [MATCH match] [COUNT count]", "Bitmap"},
|
{"BXSCAN", "key [MATCH match] [COUNT count]", "Bitmap"},
|
||||||
|
{"COMMIT", "-", "Transaction"},
|
||||||
{"DECR", "key", "KV"},
|
{"DECR", "key", "KV"},
|
||||||
{"DECRBY", "key decrement", "KV"},
|
{"DECRBY", "key decrement", "KV"},
|
||||||
{"DEL", "key [key ...]", "KV"},
|
{"DEL", "key [key ...]", "KV"},
|
||||||
|
@ -65,6 +67,7 @@ var helpCommands = [][]string{
|
||||||
{"MSET", "key value [key value ...]", "KV"},
|
{"MSET", "key value [key value ...]", "KV"},
|
||||||
{"PERSIST", "key", "KV"},
|
{"PERSIST", "key", "KV"},
|
||||||
{"PING", "-", "Server"},
|
{"PING", "-", "Server"},
|
||||||
|
{"ROLLBACK", "-", "Transaction"},
|
||||||
{"RPOP", "key", "List"},
|
{"RPOP", "key", "List"},
|
||||||
{"RPUSH", "key value [value ...]", "List"},
|
{"RPUSH", "key value [value ...]", "List"},
|
||||||
{"SADD", "key member [member ...]", "Set"},
|
{"SADD", "key member [member ...]", "Set"},
|
||||||
|
|
|
@ -35,6 +35,15 @@ The same for Del.
|
||||||
|
|
||||||
ZSet only support int64 score, not double in Redis.
|
ZSet only support int64 score, not double in Redis.
|
||||||
|
|
||||||
|
## Transaction
|
||||||
|
|
||||||
|
LedisDB supports ACID transaction using LMDB or BoltDB, maybe later it will support `multi`, `exec`, `discard`.
|
||||||
|
|
||||||
|
Transaction API:
|
||||||
|
|
||||||
|
+ `begin`
|
||||||
|
+ `commit`
|
||||||
|
+ `rollback`
|
||||||
|
|
||||||
## Scan
|
## Scan
|
||||||
|
|
||||||
|
|
|
@ -512,6 +512,22 @@
|
||||||
"readonly": false
|
"readonly": false
|
||||||
},
|
},
|
||||||
|
|
||||||
|
"BEGIN": {
|
||||||
|
"arguments": "-",
|
||||||
|
"group": "Transaction",
|
||||||
|
"readonly": false
|
||||||
|
},
|
||||||
|
"COMMIT": {
|
||||||
|
"arguments": "-",
|
||||||
|
"group": "Transaction",
|
||||||
|
"readonly": false
|
||||||
|
},
|
||||||
|
"ROLLBACK": {
|
||||||
|
"arguments": "-",
|
||||||
|
"group": "Transaction",
|
||||||
|
"readonly": false
|
||||||
|
},
|
||||||
|
|
||||||
"XSCAN": {
|
"XSCAN": {
|
||||||
"arguments": "key [MATCH match] [COUNT count]",
|
"arguments": "key [MATCH match] [COUNT count]",
|
||||||
"group": "KV",
|
"group": "KV",
|
||||||
|
|
|
@ -129,6 +129,10 @@ Table of Contents
|
||||||
- [FLUSHALL](#flushall)
|
- [FLUSHALL](#flushall)
|
||||||
- [FLUSHDB](#flushdb)
|
- [FLUSHDB](#flushdb)
|
||||||
- [INFO [section]](#info-section)
|
- [INFO [section]](#info-section)
|
||||||
|
- [Transaction](#transaction)
|
||||||
|
- [BEGIN](#begin)
|
||||||
|
- [ROLLBACK](#rollback)
|
||||||
|
- [COMMIT](#commit)
|
||||||
- [Script](#script)
|
- [Script](#script)
|
||||||
- [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-)
|
- [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-)
|
||||||
- [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-)
|
- [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-)
|
||||||
|
@ -2498,6 +2502,69 @@ The optional parameter can be used to select a specific section of information:
|
||||||
|
|
||||||
When no parameter is provided, all will return.
|
When no parameter is provided, all will return.
|
||||||
|
|
||||||
|
## Transaction
|
||||||
|
|
||||||
|
### BEGIN
|
||||||
|
|
||||||
|
Marks the start of a transaction block. Subsequent commands will be in a transaction context util using COMMIT or ROLLBACK.
|
||||||
|
|
||||||
|
You must known that `BEGIN` will block any other write operators before you `COMMIT` or `ROLLBACK`. Don't use long-time transaction.
|
||||||
|
|
||||||
|
**Return value**
|
||||||
|
|
||||||
|
Returns `OK` if the backend store engine in use supports transaction, otherwise, returns `Err`.
|
||||||
|
|
||||||
|
**Examples**
|
||||||
|
```
|
||||||
|
ledis> BEGIN
|
||||||
|
OK
|
||||||
|
ledis> SET HELLO WORLD
|
||||||
|
OK
|
||||||
|
ledis> COMMIT
|
||||||
|
OK
|
||||||
|
```
|
||||||
|
|
||||||
|
### ROLLBACK
|
||||||
|
|
||||||
|
Discards all the changes of previously commands in a transaction and restores the connection state to normal.
|
||||||
|
|
||||||
|
**Return value**
|
||||||
|
Returns `OK` if in a transaction context, otherwise, `Err`
|
||||||
|
|
||||||
|
**Examples**
|
||||||
|
```
|
||||||
|
ledis> BEGIN
|
||||||
|
OK
|
||||||
|
ledis> SET HELLO WORLD
|
||||||
|
OK
|
||||||
|
ledis> GET HELLO
|
||||||
|
"WORLD"
|
||||||
|
ledis> ROLLBACK
|
||||||
|
OK
|
||||||
|
ledis> GET HELLO
|
||||||
|
(nil)
|
||||||
|
```
|
||||||
|
|
||||||
|
### COMMIT
|
||||||
|
|
||||||
|
Persists the changes of all the commands in a transaction and restores the connection state to normal.
|
||||||
|
|
||||||
|
**Return value**
|
||||||
|
Returns `OK` if in a transaction context, otherwise, `Err`
|
||||||
|
|
||||||
|
**Examples**
|
||||||
|
```
|
||||||
|
ledis> BEGIN
|
||||||
|
OK
|
||||||
|
ledis> SET HELLO WORLD
|
||||||
|
OK
|
||||||
|
ledis> GET HELLO
|
||||||
|
"WORLD"
|
||||||
|
ledis> COMMIT
|
||||||
|
OK
|
||||||
|
ledis> GET HELLO
|
||||||
|
"WORLD"
|
||||||
|
```
|
||||||
|
|
||||||
## Script
|
## Script
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,8 @@ type batch struct {
|
||||||
|
|
||||||
sync.Locker
|
sync.Locker
|
||||||
|
|
||||||
|
tx *Tx
|
||||||
|
|
||||||
eb *eventBatch
|
eb *eventBatch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,31 +24,12 @@ func (b *batch) Commit() error {
|
||||||
return ErrWriteInROnly
|
return ErrWriteInROnly
|
||||||
}
|
}
|
||||||
|
|
||||||
b.l.commitLock.Lock()
|
if b.tx == nil {
|
||||||
defer b.l.commitLock.Unlock()
|
return b.l.handleCommit(b.eb, b.WriteBatch)
|
||||||
|
|
||||||
var err error
|
|
||||||
if b.l.r != nil {
|
|
||||||
var l *rpl.Log
|
|
||||||
if l, err = b.l.r.Log(b.eb.Bytes()); err != nil {
|
|
||||||
log.Fatal("write wal error %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
b.l.propagate(l)
|
|
||||||
|
|
||||||
if err = b.WriteBatch.Commit(); err != nil {
|
|
||||||
log.Fatal("commit error %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = b.l.r.UpdateCommitID(l.ID); err != nil {
|
|
||||||
log.Fatal("update commit id error %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
} else {
|
} else {
|
||||||
|
if b.l.r != nil {
|
||||||
|
b.tx.eb.Write(b.eb.Bytes())
|
||||||
|
}
|
||||||
return b.WriteBatch.Commit()
|
return b.WriteBatch.Commit()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,20 +76,61 @@ func (l *dbBatchLocker) Unlock() {
|
||||||
l.wrLock.RUnlock()
|
l.wrLock.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type txBatchLocker struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *txBatchLocker) Lock() {}
|
||||||
|
func (l *txBatchLocker) Unlock() {}
|
||||||
|
|
||||||
type multiBatchLocker struct {
|
type multiBatchLocker struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *multiBatchLocker) Lock() {}
|
func (l *multiBatchLocker) Lock() {}
|
||||||
func (l *multiBatchLocker) Unlock() {}
|
func (l *multiBatchLocker) Unlock() {}
|
||||||
|
|
||||||
func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker) *batch {
|
func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
|
||||||
b := new(batch)
|
b := new(batch)
|
||||||
b.l = l
|
b.l = l
|
||||||
b.WriteBatch = wb
|
b.WriteBatch = wb
|
||||||
|
|
||||||
b.Locker = locker
|
b.Locker = locker
|
||||||
|
|
||||||
|
b.tx = tx
|
||||||
b.eb = new(eventBatch)
|
b.eb = new(eventBatch)
|
||||||
|
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type commiter interface {
|
||||||
|
Commit() error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Ledis) handleCommit(eb *eventBatch, c commiter) error {
|
||||||
|
l.commitLock.Lock()
|
||||||
|
defer l.commitLock.Unlock()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if l.r != nil {
|
||||||
|
var rl *rpl.Log
|
||||||
|
if rl, err = l.r.Log(eb.Bytes()); err != nil {
|
||||||
|
log.Fatal("write wal error %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
l.propagate(rl)
|
||||||
|
|
||||||
|
if err = c.Commit(); err != nil {
|
||||||
|
log.Fatal("commit error %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = l.r.UpdateCommitID(rl.ID); err != nil {
|
||||||
|
log.Fatal("update commit id error %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return c.Commit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -93,5 +93,6 @@ var (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DBAutoCommit uint8 = 0x0
|
DBAutoCommit uint8 = 0x0
|
||||||
|
DBInTransaction uint8 = 0x1
|
||||||
DBInMulti uint8 = 0x2
|
DBInMulti uint8 = 0x2
|
||||||
)
|
)
|
||||||
|
|
|
@ -64,7 +64,7 @@ func (l *Ledis) newDB(index uint8) *DB {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) newBatch() *batch {
|
func (db *DB) newBatch() *batch {
|
||||||
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock})
|
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) Index() int {
|
func (db *DB) Index() int {
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (db *DB) Multi() (*Multi, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Multi) newBatch() *batch {
|
func (m *Multi) newBatch() *batch {
|
||||||
return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{})
|
return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Multi) Close() error {
|
func (m *Multi) Close() error {
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
package ledis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/siddontang/ledisdb/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNestTx = errors.New("nest transaction not supported")
|
||||||
|
ErrTxDone = errors.New("Transaction has already been committed or rolled back")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Tx struct {
|
||||||
|
*DB
|
||||||
|
|
||||||
|
tx *store.Tx
|
||||||
|
|
||||||
|
eb *eventBatch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) IsTransaction() bool {
|
||||||
|
return db.status == DBInTransaction
|
||||||
|
}
|
||||||
|
|
||||||
|
// Begin a transaction, it will block all other write operations before calling Commit or Rollback.
|
||||||
|
// You must be very careful to prevent long-time transaction.
|
||||||
|
func (db *DB) Begin() (*Tx, error) {
|
||||||
|
if db.IsTransaction() {
|
||||||
|
return nil, ErrNestTx
|
||||||
|
}
|
||||||
|
|
||||||
|
tx := new(Tx)
|
||||||
|
|
||||||
|
tx.eb = new(eventBatch)
|
||||||
|
|
||||||
|
tx.DB = new(DB)
|
||||||
|
tx.DB.l = db.l
|
||||||
|
|
||||||
|
tx.l.wLock.Lock()
|
||||||
|
|
||||||
|
tx.DB.sdb = db.sdb
|
||||||
|
|
||||||
|
var err error
|
||||||
|
tx.tx, err = db.sdb.Begin()
|
||||||
|
if err != nil {
|
||||||
|
tx.l.wLock.Unlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.DB.bucket = tx.tx
|
||||||
|
|
||||||
|
tx.DB.status = DBInTransaction
|
||||||
|
|
||||||
|
tx.DB.index = db.index
|
||||||
|
|
||||||
|
tx.DB.kvBatch = tx.newBatch()
|
||||||
|
tx.DB.listBatch = tx.newBatch()
|
||||||
|
tx.DB.hashBatch = tx.newBatch()
|
||||||
|
tx.DB.zsetBatch = tx.newBatch()
|
||||||
|
tx.DB.binBatch = tx.newBatch()
|
||||||
|
tx.DB.setBatch = tx.newBatch()
|
||||||
|
|
||||||
|
return tx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tx *Tx) Commit() error {
|
||||||
|
if tx.tx == nil {
|
||||||
|
return ErrTxDone
|
||||||
|
}
|
||||||
|
|
||||||
|
err := tx.l.handleCommit(tx.eb, tx.tx)
|
||||||
|
|
||||||
|
tx.tx = nil
|
||||||
|
|
||||||
|
tx.l.wLock.Unlock()
|
||||||
|
|
||||||
|
tx.DB.bucket = nil
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tx *Tx) Rollback() error {
|
||||||
|
if tx.tx == nil {
|
||||||
|
return ErrTxDone
|
||||||
|
}
|
||||||
|
|
||||||
|
err := tx.tx.Rollback()
|
||||||
|
tx.eb.Reset()
|
||||||
|
tx.tx = nil
|
||||||
|
|
||||||
|
tx.l.wLock.Unlock()
|
||||||
|
tx.DB.bucket = nil
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tx *Tx) newBatch() *batch {
|
||||||
|
return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tx *Tx) Select(index int) error {
|
||||||
|
if index < 0 || index >= int(MaxDBNumber) {
|
||||||
|
return fmt.Errorf("invalid db index %d", index)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.DB.index = uint8(index)
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,220 @@
|
||||||
|
package ledis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/siddontang/ledisdb/config"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testTxRollback(t *testing.T, db *DB) {
|
||||||
|
var err error
|
||||||
|
key1 := []byte("tx_key1")
|
||||||
|
key2 := []byte("tx_key2")
|
||||||
|
field2 := []byte("tx_field2")
|
||||||
|
|
||||||
|
err = db.Set(key1, []byte("value"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.HSet(key2, field2, []byte("value"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var tx *Tx
|
||||||
|
tx, err = db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
err = tx.Set(key1, []byte("1"))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.HSet(key2, field2, []byte("2"))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.HSet([]byte("no_key"), field2, []byte("2"))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := tx.Get(key1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "1" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := tx.HGet(key2, field2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "2" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Rollback()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.Get(key1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "value" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.HGet(key2, field2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "value" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testTxCommit(t *testing.T, db *DB) {
|
||||||
|
var err error
|
||||||
|
key1 := []byte("tx_key1")
|
||||||
|
key2 := []byte("tx_key2")
|
||||||
|
field2 := []byte("tx_field2")
|
||||||
|
|
||||||
|
err = db.Set(key1, []byte("value"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.HSet(key2, field2, []byte("value"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var tx *Tx
|
||||||
|
tx, err = db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
err = tx.Set(key1, []byte("1"))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.HSet(key2, field2, []byte("2"))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := tx.Get(key1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "1" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := tx.HGet(key2, field2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "2" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.Get(key1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "1" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.HGet(key2, field2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "2" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testTxSelect(t *testing.T, db *DB) {
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
tx.Set([]byte("tx_select_1"), []byte("a"))
|
||||||
|
|
||||||
|
tx.Select(1)
|
||||||
|
|
||||||
|
tx.Set([]byte("tx_select_2"), []byte("b"))
|
||||||
|
|
||||||
|
if err = tx.Commit(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.Get([]byte("tx_select_1")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "a" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.Get([]byte("tx_select_2")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if v != nil {
|
||||||
|
t.Fatal("must nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
db, _ = db.l.Select(1)
|
||||||
|
|
||||||
|
if v, err := db.Get([]byte("tx_select_2")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if string(v) != "b" {
|
||||||
|
t.Fatal(string(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, err := db.Get([]byte("tx_select_1")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if v != nil {
|
||||||
|
t.Fatal("must nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testTx(t *testing.T, name string) {
|
||||||
|
cfg := new(config.Config)
|
||||||
|
cfg.DataDir = "/tmp/ledis_test_tx"
|
||||||
|
|
||||||
|
cfg.DBName = name
|
||||||
|
cfg.LMDB.MapSize = 10 * 1024 * 1024
|
||||||
|
cfg.UseReplication = true
|
||||||
|
|
||||||
|
os.RemoveAll(cfg.DataDir)
|
||||||
|
|
||||||
|
l, err := Open(cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer l.Close()
|
||||||
|
|
||||||
|
db, _ := l.Select(0)
|
||||||
|
|
||||||
|
testTxRollback(t, db)
|
||||||
|
testTxCommit(t, db)
|
||||||
|
testTxSelect(t, db)
|
||||||
|
}
|
||||||
|
|
||||||
|
//only lmdb, boltdb support Transaction
|
||||||
|
func TestTx(t *testing.T) {
|
||||||
|
testTx(t, "lmdb")
|
||||||
|
testTx(t, "boltdb")
|
||||||
|
}
|
|
@ -8,10 +8,24 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var txUnsupportedCmds = map[string]struct{}{
|
||||||
|
"select": struct{}{},
|
||||||
|
"slaveof": struct{}{},
|
||||||
|
"fullsync": struct{}{},
|
||||||
|
"sync": struct{}{},
|
||||||
|
"begin": struct{}{},
|
||||||
|
"flushall": struct{}{},
|
||||||
|
"flushdb": struct{}{},
|
||||||
|
"eval": struct{}{},
|
||||||
|
}
|
||||||
|
|
||||||
var scriptUnsupportedCmds = map[string]struct{}{
|
var scriptUnsupportedCmds = map[string]struct{}{
|
||||||
"slaveof": struct{}{},
|
"slaveof": struct{}{},
|
||||||
"fullsync": struct{}{},
|
"fullsync": struct{}{},
|
||||||
"sync": struct{}{},
|
"sync": struct{}{},
|
||||||
|
"begin": struct{}{},
|
||||||
|
"commit": struct{}{},
|
||||||
|
"rollback": struct{}{},
|
||||||
"flushall": struct{}{},
|
"flushall": struct{}{},
|
||||||
"flushdb": struct{}{},
|
"flushdb": struct{}{},
|
||||||
}
|
}
|
||||||
|
@ -57,6 +71,8 @@ type client struct {
|
||||||
|
|
||||||
buf bytes.Buffer
|
buf bytes.Buffer
|
||||||
|
|
||||||
|
tx *ledis.Tx
|
||||||
|
|
||||||
script *ledis.Multi
|
script *ledis.Multi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +99,11 @@ func (c *client) perform() {
|
||||||
} else if exeCmd, ok := regCmds[c.cmd]; !ok {
|
} else if exeCmd, ok := regCmds[c.cmd]; !ok {
|
||||||
err = ErrNotFound
|
err = ErrNotFound
|
||||||
} else {
|
} else {
|
||||||
if c.db.IsInMulti() {
|
if c.db.IsTransaction() {
|
||||||
|
if _, ok := txUnsupportedCmds[c.cmd]; ok {
|
||||||
|
err = fmt.Errorf("%s not supported in transaction", c.cmd)
|
||||||
|
}
|
||||||
|
} else if c.db.IsInMulti() {
|
||||||
if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
|
if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
|
||||||
err = fmt.Errorf("%s not supported in multi", c.cmd)
|
err = fmt.Errorf("%s not supported in multi", c.cmd)
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,9 @@ var httpUnsupportedCommands = map[string]struct{}{
|
||||||
"fullsync": struct{}{},
|
"fullsync": struct{}{},
|
||||||
"sync": struct{}{},
|
"sync": struct{}{},
|
||||||
"quit": struct{}{},
|
"quit": struct{}{},
|
||||||
|
"begin": struct{}{},
|
||||||
|
"commit": struct{}{},
|
||||||
|
"rollback": struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpClient struct {
|
type httpClient struct {
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errTxMiss = errors.New("transaction miss")
|
||||||
|
|
||||||
|
func beginCommand(c *client) error {
|
||||||
|
tx, err := c.db.Begin()
|
||||||
|
if err == nil {
|
||||||
|
c.tx = tx
|
||||||
|
c.db = tx.DB
|
||||||
|
c.resp.writeStatus(OK)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func commitCommand(c *client) error {
|
||||||
|
if c.tx == nil {
|
||||||
|
return errTxMiss
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.tx.Commit()
|
||||||
|
c.db, _ = c.ldb.Select(c.tx.Index())
|
||||||
|
c.tx = nil
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
c.resp.writeStatus(OK)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func rollbackCommand(c *client) error {
|
||||||
|
if c.tx == nil {
|
||||||
|
return errTxMiss
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.tx.Rollback()
|
||||||
|
|
||||||
|
c.db, _ = c.ldb.Select(c.tx.Index())
|
||||||
|
c.tx = nil
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
c.resp.writeStatus(OK)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
register("begin", beginCommand)
|
||||||
|
register("commit", commitCommand)
|
||||||
|
register("rollback", rollbackCommand)
|
||||||
|
}
|
Loading…
Reference in New Issue