From dca71891c3b6ffb1e734b13dbb16d039eb92cae9 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 25 Sep 2014 10:44:07 +0800 Subject: [PATCH] readd transaction --- client/ledis-py/ledis/client.py | 48 +++++- client/ledis-py/ledis/exceptions.py | 2 + client/ledis-py/tests/test_tx.py | 46 ++++++ client/nodejs/ledis/lib/commands.js | 4 + client/openresty/ledis.lua | 5 + cmd/ledis-cli/const.go | 5 +- doc/DiffRedis.md | 9 ++ doc/commands.json | 16 ++ doc/commands.md | 67 +++++++++ ledis/batch.go | 74 ++++++---- ledis/const.go | 5 +- ledis/ledis_db.go | 2 +- ledis/multi.go | 2 +- ledis/tx.go | 109 ++++++++++++++ ledis/tx_test.go | 220 ++++++++++++++++++++++++++++ server/client.go | 22 ++- server/client_http.go | 3 + server/cmd_tx.go | 57 +++++++ 18 files changed, 664 insertions(+), 32 deletions(-) create mode 100644 client/ledis-py/tests/test_tx.py create mode 100644 ledis/tx.go create mode 100644 ledis/tx_test.go create mode 100644 server/cmd_tx.go diff --git a/client/ledis-py/ledis/client.py b/client/ledis-py/ledis/client.py index 420900f..17cc1c4 100644 --- a/client/ledis-py/ledis/client.py +++ b/client/ledis-py/ledis/client.py @@ -9,7 +9,8 @@ from ledis.exceptions import ( ConnectionError, DataError, LedisError, - ResponseError + ResponseError, + TxNotBeginError ) SYM_EMPTY = b('') @@ -199,6 +200,11 @@ class Ledis(object): "Set a custom Response Callback" self.response_callbacks[command] = callback + def tx(self): + return Transaction( + self.connection_pool, + self.response_callbacks) + #### COMMAND EXECUTION AND PROTOCOL PARSING #### def execute_command(self, *args, **options): @@ -964,3 +970,43 @@ class Ledis(object): def scriptflush(self): return self.execute_command('SCRIPT', 'FLUSH') + + +class Transaction(Ledis): + def __init__(self, connection_pool, response_callbacks): + self.connection_pool = connection_pool + self.response_callbacks = response_callbacks + self.connection = None + + def execute_command(self, *args, **options): + "Execute a command and return a parsed response" + command_name = args[0] + + connection = self.connection + if self.connection is None: + raise TxNotBeginError + + try: + connection.send_command(*args) + return self.parse_response(connection, command_name, **options) + except ConnectionError: + connection.disconnect() + connection.send_command(*args) + return self.parse_response(connection, command_name, **options) + + def begin(self): + self.connection = self.connection_pool.get_connection('begin') + return self.execute_command("BEGIN") + + def commit(self): + res = self.execute_command("COMMIT") + self.connection_pool.release(self.connection) + self.connection = None + return res + + def rollback(self): + res = self.execute_command("ROLLBACK") + self.connection_pool.release(self.connection) + self.connection = None + return res + diff --git a/client/ledis-py/ledis/exceptions.py b/client/ledis-py/ledis/exceptions.py index f92e530..9150db6 100644 --- a/client/ledis-py/ledis/exceptions.py +++ b/client/ledis-py/ledis/exceptions.py @@ -35,3 +35,5 @@ class DataError(LedisError): class ExecAbortError(ResponseError): pass +class TxNotBeginError(LedisError): + pass \ No newline at end of file diff --git a/client/ledis-py/tests/test_tx.py b/client/ledis-py/tests/test_tx.py new file mode 100644 index 0000000..cfbab20 --- /dev/null +++ b/client/ledis-py/tests/test_tx.py @@ -0,0 +1,46 @@ +import unittest +import sys +sys.path.append("..") + +import ledis + +global_l = ledis.Ledis() + +#db that do not support transaction +dbs = ["leveldb", "rocksdb", "hyperleveldb", "goleveldb"] +check = global_l.info().get("db_name") in dbs + + +class TestTx(unittest.TestCase): + def setUp(self): + self.l = ledis.Ledis(port=6380) + + def tearDown(self): + self.l.flushdb() + + @unittest.skipIf(check, reason="db not support transaction") + def test_commit(self): + tx = self.l.tx() + self.l.set("a", "no-tx") + assert self.l.get("a") == "no-tx" + tx.begin() + tx.set("a", "tx") + assert self.l.get("a") == "no-tx" + assert tx.get("a") == "tx" + + tx.commit() + assert self.l.get("a") == "tx" + + @unittest.skipIf(check, reason="db not support transaction") + def test_rollback(self): + tx = self.l.tx() + self.l.set("a", "no-tx") + assert self.l.get("a") == "no-tx" + + tx.begin() + tx.set("a", "tx") + assert tx.get("a") == "tx" + assert self.l.get("a") == "no-tx" + + tx.rollback() + assert self.l.get("a") == "no-tx" \ No newline at end of file diff --git a/client/nodejs/ledis/lib/commands.js b/client/nodejs/ledis/lib/commands.js index 41dc97e..f116444 100644 --- a/client/nodejs/ledis/lib/commands.js +++ b/client/nodejs/ledis/lib/commands.js @@ -125,6 +125,10 @@ module.exports = [ "spersist", "sxscan", + "begin", + "rollback", + "commit", + "eval", "evalsha", "script", diff --git a/client/openresty/ledis.lua b/client/openresty/ledis.lua index 26a384a..07c3f2b 100644 --- a/client/openresty/ledis.lua +++ b/client/openresty/ledis.lua @@ -148,6 +148,11 @@ local commands = { "flushall", "flushdb", + -- [[transaction]] + "begin", + "commit", + "rollback", + -- [[script]] "eval", "evalsha", diff --git a/cmd/ledis-cli/const.go b/cmd/ledis-cli/const.go index 4fc7f67..842866b 100644 --- a/cmd/ledis-cli/const.go +++ b/cmd/ledis-cli/const.go @@ -1,9 +1,10 @@ -//This file was generated by .tools/generate_commands.py on Tue Sep 09 2014 09:48:57 +0800 +//This file was generated by .tools/generate_commands.py on Thu Sep 25 2014 09:51:10 +0800 package main var helpCommands = [][]string{ {"BCOUNT", "key [start end]", "Bitmap"}, {"BDELETE", "key", "ZSet"}, + {"BEGIN", "-", "Transaction"}, {"BEXPIRE", "key seconds", "Bitmap"}, {"BEXPIREAT", "key timestamp", "Bitmap"}, {"BGET", "key", "Bitmap"}, @@ -14,6 +15,7 @@ var helpCommands = [][]string{ {"BSETBIT", "key offset value", "Bitmap"}, {"BTTL", "key", "Bitmap"}, {"BXSCAN", "key [MATCH match] [COUNT count]", "Bitmap"}, + {"COMMIT", "-", "Transaction"}, {"DECR", "key", "KV"}, {"DECRBY", "key decrement", "KV"}, {"DEL", "key [key ...]", "KV"}, @@ -65,6 +67,7 @@ var helpCommands = [][]string{ {"MSET", "key value [key value ...]", "KV"}, {"PERSIST", "key", "KV"}, {"PING", "-", "Server"}, + {"ROLLBACK", "-", "Transaction"}, {"RPOP", "key", "List"}, {"RPUSH", "key value [value ...]", "List"}, {"SADD", "key member [member ...]", "Set"}, diff --git a/doc/DiffRedis.md b/doc/DiffRedis.md index 5c597fe..ee1618c 100644 --- a/doc/DiffRedis.md +++ b/doc/DiffRedis.md @@ -35,6 +35,15 @@ The same for Del. ZSet only support int64 score, not double in Redis. +## Transaction + +LedisDB supports ACID transaction using LMDB or BoltDB, maybe later it will support `multi`, `exec`, `discard`. + +Transaction API: + ++ `begin` ++ `commit` ++ `rollback` ## Scan diff --git a/doc/commands.json b/doc/commands.json index 921a688..828186d 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -512,6 +512,22 @@ "readonly": false }, + "BEGIN": { + "arguments": "-", + "group": "Transaction", + "readonly": false + }, + "COMMIT": { + "arguments": "-", + "group": "Transaction", + "readonly": false + }, + "ROLLBACK": { + "arguments": "-", + "group": "Transaction", + "readonly": false + }, + "XSCAN": { "arguments": "key [MATCH match] [COUNT count]", "group": "KV", diff --git a/doc/commands.md b/doc/commands.md index 0317384..0809131 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -129,6 +129,10 @@ Table of Contents - [FLUSHALL](#flushall) - [FLUSHDB](#flushdb) - [INFO [section]](#info-section) +- [Transaction](#transaction) + - [BEGIN](#begin) + - [ROLLBACK](#rollback) + - [COMMIT](#commit) - [Script](#script) - [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-) - [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-) @@ -2498,6 +2502,69 @@ The optional parameter can be used to select a specific section of information: When no parameter is provided, all will return. +## Transaction + +### BEGIN + +Marks the start of a transaction block. Subsequent commands will be in a transaction context util using COMMIT or ROLLBACK. + +You must known that `BEGIN` will block any other write operators before you `COMMIT` or `ROLLBACK`. Don't use long-time transaction. + +**Return value** + +Returns `OK` if the backend store engine in use supports transaction, otherwise, returns `Err`. + +**Examples** +``` +ledis> BEGIN +OK +ledis> SET HELLO WORLD +OK +ledis> COMMIT +OK +``` + +### ROLLBACK + +Discards all the changes of previously commands in a transaction and restores the connection state to normal. + +**Return value** +Returns `OK` if in a transaction context, otherwise, `Err` + +**Examples** +``` +ledis> BEGIN +OK +ledis> SET HELLO WORLD +OK +ledis> GET HELLO +"WORLD" +ledis> ROLLBACK +OK +ledis> GET HELLO +(nil) +``` + +### COMMIT + +Persists the changes of all the commands in a transaction and restores the connection state to normal. + +**Return value** +Returns `OK` if in a transaction context, otherwise, `Err` + +**Examples** +``` +ledis> BEGIN +OK +ledis> SET HELLO WORLD +OK +ledis> GET HELLO +"WORLD" +ledis> COMMIT +OK +ledis> GET HELLO +"WORLD" +``` ## Script diff --git a/ledis/batch.go b/ledis/batch.go index 4ed3bac..c77e91f 100644 --- a/ledis/batch.go +++ b/ledis/batch.go @@ -14,6 +14,8 @@ type batch struct { sync.Locker + tx *Tx + eb *eventBatch } @@ -22,31 +24,12 @@ func (b *batch) Commit() error { return ErrWriteInROnly } - b.l.commitLock.Lock() - defer b.l.commitLock.Unlock() - - var err error - if b.l.r != nil { - var l *rpl.Log - if l, err = b.l.r.Log(b.eb.Bytes()); err != nil { - log.Fatal("write wal error %s", err.Error()) - return err - } - - b.l.propagate(l) - - if err = b.WriteBatch.Commit(); err != nil { - log.Fatal("commit error %s", err.Error()) - return err - } - - if err = b.l.r.UpdateCommitID(l.ID); err != nil { - log.Fatal("update commit id error %s", err.Error()) - return err - } - - return nil + if b.tx == nil { + return b.l.handleCommit(b.eb, b.WriteBatch) } else { + if b.l.r != nil { + b.tx.eb.Write(b.eb.Bytes()) + } return b.WriteBatch.Commit() } } @@ -93,20 +76,61 @@ func (l *dbBatchLocker) Unlock() { l.wrLock.RUnlock() } +type txBatchLocker struct { +} + +func (l *txBatchLocker) Lock() {} +func (l *txBatchLocker) Unlock() {} + type multiBatchLocker struct { } func (l *multiBatchLocker) Lock() {} func (l *multiBatchLocker) Unlock() {} -func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker) *batch { +func (l *Ledis) newBatch(wb store.WriteBatch, locker sync.Locker, tx *Tx) *batch { b := new(batch) b.l = l b.WriteBatch = wb b.Locker = locker + b.tx = tx b.eb = new(eventBatch) return b } + +type commiter interface { + Commit() error +} + +func (l *Ledis) handleCommit(eb *eventBatch, c commiter) error { + l.commitLock.Lock() + defer l.commitLock.Unlock() + + var err error + if l.r != nil { + var rl *rpl.Log + if rl, err = l.r.Log(eb.Bytes()); err != nil { + log.Fatal("write wal error %s", err.Error()) + return err + } + + l.propagate(rl) + + if err = c.Commit(); err != nil { + log.Fatal("commit error %s", err.Error()) + return err + } + + if err = l.r.UpdateCommitID(rl.ID); err != nil { + log.Fatal("update commit id error %s", err.Error()) + return err + } + + return nil + } else { + return c.Commit() + } +} diff --git a/ledis/const.go b/ledis/const.go index 7144629..3b30123 100644 --- a/ledis/const.go +++ b/ledis/const.go @@ -92,6 +92,7 @@ var ( ) const ( - DBAutoCommit uint8 = 0x0 - DBInMulti uint8 = 0x2 + DBAutoCommit uint8 = 0x0 + DBInTransaction uint8 = 0x1 + DBInMulti uint8 = 0x2 ) diff --git a/ledis/ledis_db.go b/ledis/ledis_db.go index 70eaf5a..6a8eb9c 100644 --- a/ledis/ledis_db.go +++ b/ledis/ledis_db.go @@ -64,7 +64,7 @@ func (l *Ledis) newDB(index uint8) *DB { } func (db *DB) newBatch() *batch { - return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}) + return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil) } func (db *DB) Index() int { diff --git a/ledis/multi.go b/ledis/multi.go index 0ae4727..a549c2c 100644 --- a/ledis/multi.go +++ b/ledis/multi.go @@ -51,7 +51,7 @@ func (db *DB) Multi() (*Multi, error) { } func (m *Multi) newBatch() *batch { - return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}) + return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil) } func (m *Multi) Close() error { diff --git a/ledis/tx.go b/ledis/tx.go new file mode 100644 index 0000000..a5ff883 --- /dev/null +++ b/ledis/tx.go @@ -0,0 +1,109 @@ +package ledis + +import ( + "errors" + "fmt" + "github.com/siddontang/ledisdb/store" +) + +var ( + ErrNestTx = errors.New("nest transaction not supported") + ErrTxDone = errors.New("Transaction has already been committed or rolled back") +) + +type Tx struct { + *DB + + tx *store.Tx + + eb *eventBatch +} + +func (db *DB) IsTransaction() bool { + return db.status == DBInTransaction +} + +// Begin a transaction, it will block all other write operations before calling Commit or Rollback. +// You must be very careful to prevent long-time transaction. +func (db *DB) Begin() (*Tx, error) { + if db.IsTransaction() { + return nil, ErrNestTx + } + + tx := new(Tx) + + tx.eb = new(eventBatch) + + tx.DB = new(DB) + tx.DB.l = db.l + + tx.l.wLock.Lock() + + tx.DB.sdb = db.sdb + + var err error + tx.tx, err = db.sdb.Begin() + if err != nil { + tx.l.wLock.Unlock() + return nil, err + } + + tx.DB.bucket = tx.tx + + tx.DB.status = DBInTransaction + + tx.DB.index = db.index + + tx.DB.kvBatch = tx.newBatch() + tx.DB.listBatch = tx.newBatch() + tx.DB.hashBatch = tx.newBatch() + tx.DB.zsetBatch = tx.newBatch() + tx.DB.binBatch = tx.newBatch() + tx.DB.setBatch = tx.newBatch() + + return tx, nil +} + +func (tx *Tx) Commit() error { + if tx.tx == nil { + return ErrTxDone + } + + err := tx.l.handleCommit(tx.eb, tx.tx) + + tx.tx = nil + + tx.l.wLock.Unlock() + + tx.DB.bucket = nil + + return err +} + +func (tx *Tx) Rollback() error { + if tx.tx == nil { + return ErrTxDone + } + + err := tx.tx.Rollback() + tx.eb.Reset() + tx.tx = nil + + tx.l.wLock.Unlock() + tx.DB.bucket = nil + + return err +} + +func (tx *Tx) newBatch() *batch { + return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx) +} + +func (tx *Tx) Select(index int) error { + if index < 0 || index >= int(MaxDBNumber) { + return fmt.Errorf("invalid db index %d", index) + } + + tx.DB.index = uint8(index) + return nil +} diff --git a/ledis/tx_test.go b/ledis/tx_test.go new file mode 100644 index 0000000..cb3a7f0 --- /dev/null +++ b/ledis/tx_test.go @@ -0,0 +1,220 @@ +package ledis + +import ( + "github.com/siddontang/ledisdb/config" + "os" + "testing" +) + +func testTxRollback(t *testing.T, db *DB) { + var err error + key1 := []byte("tx_key1") + key2 := []byte("tx_key2") + field2 := []byte("tx_field2") + + err = db.Set(key1, []byte("value")) + if err != nil { + t.Fatal(err) + } + + _, err = db.HSet(key2, field2, []byte("value")) + if err != nil { + t.Fatal(err) + } + + var tx *Tx + tx, err = db.Begin() + if err != nil { + t.Fatal(err) + } + + defer tx.Rollback() + + err = tx.Set(key1, []byte("1")) + + if err != nil { + t.Fatal(err) + } + + _, err = tx.HSet(key2, field2, []byte("2")) + + if err != nil { + t.Fatal(err) + } + + _, err = tx.HSet([]byte("no_key"), field2, []byte("2")) + + if err != nil { + t.Fatal(err) + } + + if v, err := tx.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "1" { + t.Fatal(string(v)) + } + + if v, err := tx.HGet(key2, field2); err != nil { + t.Fatal(err) + } else if string(v) != "2" { + t.Fatal(string(v)) + } + + err = tx.Rollback() + if err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "value" { + t.Fatal(string(v)) + } + + if v, err := db.HGet(key2, field2); err != nil { + t.Fatal(err) + } else if string(v) != "value" { + t.Fatal(string(v)) + } +} + +func testTxCommit(t *testing.T, db *DB) { + var err error + key1 := []byte("tx_key1") + key2 := []byte("tx_key2") + field2 := []byte("tx_field2") + + err = db.Set(key1, []byte("value")) + if err != nil { + t.Fatal(err) + } + + _, err = db.HSet(key2, field2, []byte("value")) + if err != nil { + t.Fatal(err) + } + + var tx *Tx + tx, err = db.Begin() + if err != nil { + t.Fatal(err) + } + + defer tx.Rollback() + + err = tx.Set(key1, []byte("1")) + + if err != nil { + t.Fatal(err) + } + + _, err = tx.HSet(key2, field2, []byte("2")) + + if err != nil { + t.Fatal(err) + } + + if v, err := tx.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "1" { + t.Fatal(string(v)) + } + + if v, err := tx.HGet(key2, field2); err != nil { + t.Fatal(err) + } else if string(v) != "2" { + t.Fatal(string(v)) + } + + err = tx.Commit() + if err != nil { + t.Fatal(err) + } + + if v, err := db.Get(key1); err != nil { + t.Fatal(err) + } else if string(v) != "1" { + t.Fatal(string(v)) + } + + if v, err := db.HGet(key2, field2); err != nil { + t.Fatal(err) + } else if string(v) != "2" { + t.Fatal(string(v)) + } +} + +func testTxSelect(t *testing.T, db *DB) { + tx, err := db.Begin() + if err != nil { + t.Fatal(err) + } + + defer tx.Rollback() + + tx.Set([]byte("tx_select_1"), []byte("a")) + + tx.Select(1) + + tx.Set([]byte("tx_select_2"), []byte("b")) + + if err = tx.Commit(); err != nil { + t.Fatal(err) + } + + if v, err := db.Get([]byte("tx_select_1")); err != nil { + t.Fatal(err) + } else if string(v) != "a" { + t.Fatal(string(v)) + } + + if v, err := db.Get([]byte("tx_select_2")); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatal("must nil") + } + + db, _ = db.l.Select(1) + + if v, err := db.Get([]byte("tx_select_2")); err != nil { + t.Fatal(err) + } else if string(v) != "b" { + t.Fatal(string(v)) + } + + if v, err := db.Get([]byte("tx_select_1")); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatal("must nil") + } +} + +func testTx(t *testing.T, name string) { + cfg := new(config.Config) + cfg.DataDir = "/tmp/ledis_test_tx" + + cfg.DBName = name + cfg.LMDB.MapSize = 10 * 1024 * 1024 + cfg.UseReplication = true + + os.RemoveAll(cfg.DataDir) + + l, err := Open(cfg) + if err != nil { + t.Fatal(err) + } + + defer l.Close() + + db, _ := l.Select(0) + + testTxRollback(t, db) + testTxCommit(t, db) + testTxSelect(t, db) +} + +//only lmdb, boltdb support Transaction +func TestTx(t *testing.T) { + testTx(t, "lmdb") + testTx(t, "boltdb") +} diff --git a/server/client.go b/server/client.go index 4474086..2ccea7a 100644 --- a/server/client.go +++ b/server/client.go @@ -8,10 +8,24 @@ import ( "time" ) +var txUnsupportedCmds = map[string]struct{}{ + "select": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "eval": struct{}{}, +} + var scriptUnsupportedCmds = map[string]struct{}{ "slaveof": struct{}{}, "fullsync": struct{}{}, "sync": struct{}{}, + "begin": struct{}{}, + "commit": struct{}{}, + "rollback": struct{}{}, "flushall": struct{}{}, "flushdb": struct{}{}, } @@ -57,6 +71,8 @@ type client struct { buf bytes.Buffer + tx *ledis.Tx + script *ledis.Multi } @@ -83,7 +99,11 @@ func (c *client) perform() { } else if exeCmd, ok := regCmds[c.cmd]; !ok { err = ErrNotFound } else { - if c.db.IsInMulti() { + if c.db.IsTransaction() { + if _, ok := txUnsupportedCmds[c.cmd]; ok { + err = fmt.Errorf("%s not supported in transaction", c.cmd) + } + } else if c.db.IsInMulti() { if _, ok := scriptUnsupportedCmds[c.cmd]; ok { err = fmt.Errorf("%s not supported in multi", c.cmd) } diff --git a/server/client_http.go b/server/client_http.go index 115e44b..057ba6b 100644 --- a/server/client_http.go +++ b/server/client_http.go @@ -24,6 +24,9 @@ var httpUnsupportedCommands = map[string]struct{}{ "fullsync": struct{}{}, "sync": struct{}{}, "quit": struct{}{}, + "begin": struct{}{}, + "commit": struct{}{}, + "rollback": struct{}{}, } type httpClient struct { diff --git a/server/cmd_tx.go b/server/cmd_tx.go new file mode 100644 index 0000000..19eb5c1 --- /dev/null +++ b/server/cmd_tx.go @@ -0,0 +1,57 @@ +package server + +import ( + "errors" +) + +var errTxMiss = errors.New("transaction miss") + +func beginCommand(c *client) error { + tx, err := c.db.Begin() + if err == nil { + c.tx = tx + c.db = tx.DB + c.resp.writeStatus(OK) + } + + return err +} + +func commitCommand(c *client) error { + if c.tx == nil { + return errTxMiss + } + + err := c.tx.Commit() + c.db, _ = c.ldb.Select(c.tx.Index()) + c.tx = nil + + if err == nil { + c.resp.writeStatus(OK) + } + + return err +} + +func rollbackCommand(c *client) error { + if c.tx == nil { + return errTxMiss + } + + err := c.tx.Rollback() + + c.db, _ = c.ldb.Select(c.tx.Index()) + c.tx = nil + + if err == nil { + c.resp.writeStatus(OK) + } + + return err +} + +func init() { + register("begin", beginCommand) + register("commit", commitCommand) + register("rollback", rollbackCommand) +}