remove begin, commit, rollback

simplify codes and features now, we don’t use transaction never, and it
is not universal for all backend storages and even introduces
complexity.
This commit is contained in:
siddontang 2015-03-14 09:10:00 +08:00
parent 09d57e28f6
commit dddbd5f146
20 changed files with 500 additions and 599 deletions

View File

@ -14,7 +14,6 @@ LedisDB now supports multiple different databases as backends.
+ Rich data structure: KV, List, Hash, ZSet, Set.
+ Data storage is not limited by RAM.
+ Various backends supported: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, RAM.
+ Supports transactions using LMDB or BoltDB.
+ Supports Lua scripting.
+ Supports expiration and TTL.
+ Can be managed via redis-cli.
@ -176,7 +175,6 @@ See [Clients](https://github.com/siddontang/ledisdb/wiki/Clients) to find or con
## Caveat
+ Changing the backend database at runtime is very dangerous. Data validation is not guaranteed if this is done.
+ Beginning a transaction will block any other write operators until `commit` or `rollback` is called. Avoid long-running transactions.
+ `pcall` and `xpcall` are not supported in Lua. See the README in [golua](https://github.com/aarzilli/golua).

View File

@ -1,15 +1,13 @@
//This file was generated by .tools/generate_commands.py on Thu Mar 05 2015 15:42:49 +0800
//This file was generated by .tools/generate_commands.py on Sat Mar 14 2015 08:58:32 +0800
package main
var helpCommands = [][]string{
{"APPEND", "key value", "KV"},
{"BEGIN", "-", "Transaction"},
{"BITCOUNT", "key [start] [end]", "KV"},
{"BITOP", "operation destkey key [key ...]", "KV"},
{"BITPOS", "key bit [start] [end]", "KV"},
{"BLPOP", "key [key ...] timeout", "List"},
{"BRPOP", "key [key ...] timeout", "List"},
{"COMMIT", "-", "Transaction"},
{"CONFIG GET", "parameter", "Server"},
{"CONFIG REWRITE", "-", "Server"},
{"DECR", "key", "KV"},
@ -70,7 +68,6 @@ var helpCommands = [][]string{
{"PING", "-", "Server"},
{"RESTORE", "key ttl value", "Server"},
{"ROLE", "-", "Server"},
{"ROLLBACK", "-", "Transaction"},
{"RPOP", "key", "List"},
{"RPUSH", "key value [value ...]", "List"},
{"SADD", "key member [member ...]", "Set"},

View File

@ -29,15 +29,6 @@ The same for Del.
ZSet only support int64 score, not double in Redis.
## Transaction
LedisDB supports ACID transaction using LMDB or BoltDB, maybe later it will support `multi`, `exec`, `discard`.
Transaction API:
+ `begin`
+ `commit`
+ `rollback`
## Scan

View File

@ -490,22 +490,6 @@
"readonly": true
},
"BEGIN": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"COMMIT": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"ROLLBACK": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"FLUSHALL": {
"arguments": "-",
"group": "Server",

View File

@ -148,10 +148,6 @@ Most of the Ledisdb's commands are the same as Redis's, you can see the redis co
- [CONFIG REWRITE](#config-rewrite)
- [RESTORE key ttl value](#restore-key-ttl-value)
- [ROLE](#role)
- [Transaction](#transaction)
- [BEGIN](#begin)
- [ROLLBACK](#rollback)
- [COMMIT](#commit)
- [Script](#script)
- [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-)
- [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-)
@ -2612,70 +2608,6 @@ Slave output:
4. The slave replication state, includes connect, connecting, sync and connected.
5. The slave current replication binlog id.
## Transaction
### BEGIN
Marks the start of a transaction block. Subsequent commands will be in a transaction context util using COMMIT or ROLLBACK.
You must known that `BEGIN` will block any other write operators before you `COMMIT` or `ROLLBACK`. Don't use long-time transaction.
**Return value**
Returns `OK` if the backend store engine in use supports transaction, otherwise, returns `Err`.
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> COMMIT
OK
```
### ROLLBACK
Discards all the changes of previously commands in a transaction and restores the connection state to normal.
**Return value**
Returns `OK` if in a transaction context, otherwise, `Err`
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> GET HELLO
"WORLD"
ledis> ROLLBACK
OK
ledis> GET HELLO
(nil)
```
### COMMIT
Persists the changes of all the commands in a transaction and restores the connection state to normal.
**Return value**
Returns `OK` if in a transaction context, otherwise, `Err`
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> GET HELLO
"WORLD"
ledis> COMMIT
OK
ledis> GET HELLO
"WORLD"
```
## Script
LedisDB's script is refer to Redis, you can see more [http://redis.io/commands/eval](http://redis.io/commands/eval)

View File

@ -14,7 +14,7 @@ type batch struct {
sync.Locker
tx *Tx
// tx *Tx
}
func (b *batch) Commit() error {
@ -22,16 +22,18 @@ func (b *batch) Commit() error {
return ErrWriteInROnly
}
if b.tx == nil {
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
} else {
if b.l.r != nil {
if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
return err
}
}
return b.WriteBatch.Commit()
}
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
// if b.tx == nil {
// return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
// } else {
// if b.l.r != nil {
// if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
// return err
// }
// }
// return b.WriteBatch.Commit()
// }
}
func (b *batch) Lock() {
@ -66,27 +68,25 @@ func (l *dbBatchLocker) Unlock() {
l.wrLock.RUnlock()
}
type txBatchLocker struct {
}
// type txBatchLocker struct {
// }
func (l *txBatchLocker) Lock() {}
func (l *txBatchLocker) Unlock() {}
// func (l *txBatchLocker) Lock() {}
// func (l *txBatchLocker) Unlock() {}
type multiBatchLocker struct {
}
// type multiBatchLocker struct {
// }
func (l *multiBatchLocker) Lock() {}
func (l *multiBatchLocker) Unlock() {}
// func (l *multiBatchLocker) Lock() {}
// func (l *multiBatchLocker) Unlock() {}
func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker) *batch {
b := new(batch)
b.l = l
b.WriteBatch = wb
b.Locker = locker
b.tx = tx
return b
}

View File

@ -69,7 +69,7 @@ func (l *Ledis) newDB(index uint8) *DB {
}
func (db *DB) newBatch() *batch {
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock})
}
func (db *DB) Index() int {

View File

@ -1,75 +1,75 @@
package ledis
import (
"errors"
"fmt"
)
// import (
// "errors"
// "fmt"
// )
var (
ErrNestMulti = errors.New("nest multi not supported")
ErrMultiDone = errors.New("multi has been closed")
)
// var (
// ErrNestMulti = errors.New("nest multi not supported")
// ErrMultiDone = errors.New("multi has been closed")
// )
type Multi struct {
*DB
}
// type Multi struct {
// *DB
// }
func (db *DB) IsInMulti() bool {
return db.status == DBInMulti
}
// func (db *DB) IsInMulti() bool {
// return db.status == DBInMulti
// }
// begin a mutli to execute commands,
// it will block any other write operations before you close the multi, unlike transaction, mutli can not rollback
func (db *DB) Multi() (*Multi, error) {
if db.IsInMulti() {
return nil, ErrNestMulti
}
// // begin a mutli to execute commands,
// // it will block any other write operations before you close the multi, unlike transaction, mutli can not rollback
// func (db *DB) Multi() (*Multi, error) {
// if db.IsInMulti() {
// return nil, ErrNestMulti
// }
m := new(Multi)
// m := new(Multi)
m.DB = new(DB)
m.DB.status = DBInMulti
// m.DB = new(DB)
// m.DB.status = DBInMulti
m.DB.l = db.l
// m.DB.l = db.l
m.l.wLock.Lock()
// m.l.wLock.Lock()
m.DB.sdb = db.sdb
// m.DB.sdb = db.sdb
m.DB.bucket = db.sdb
// m.DB.bucket = db.sdb
m.DB.index = db.index
// m.DB.index = db.index
m.DB.kvBatch = m.newBatch()
m.DB.listBatch = m.newBatch()
m.DB.hashBatch = m.newBatch()
m.DB.zsetBatch = m.newBatch()
// m.DB.binBatch = m.newBatch()
m.DB.setBatch = m.newBatch()
// m.DB.kvBatch = m.newBatch()
// m.DB.listBatch = m.newBatch()
// m.DB.hashBatch = m.newBatch()
// m.DB.zsetBatch = m.newBatch()
// // m.DB.binBatch = m.newBatch()
// m.DB.setBatch = m.newBatch()
m.DB.lbkeys = db.lbkeys
// m.DB.lbkeys = db.lbkeys
return m, nil
}
// return m, nil
// }
func (m *Multi) newBatch() *batch {
return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
}
// func (m *Multi) newBatch() *batch {
// return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
// }
func (m *Multi) Close() error {
if m.bucket == nil {
return ErrMultiDone
}
m.l.wLock.Unlock()
m.bucket = nil
return nil
}
// func (m *Multi) Close() error {
// if m.bucket == nil {
// return ErrMultiDone
// }
// m.l.wLock.Unlock()
// m.bucket = nil
// return nil
// }
func (m *Multi) Select(index int) error {
if index < 0 || index >= int(m.l.cfg.Databases) {
return fmt.Errorf("invalid db index %d", index)
}
// func (m *Multi) Select(index int) error {
// if index < 0 || index >= int(m.l.cfg.Databases) {
// return fmt.Errorf("invalid db index %d", index)
// }
m.DB.index = uint8(index)
return nil
}
// m.DB.index = uint8(index)
// return nil
// }

View File

@ -1,51 +1,51 @@
package ledis
import (
"sync"
"testing"
)
// import (
// "sync"
// "testing"
// )
func TestMulti(t *testing.T) {
db := getTestDB()
// func TestMulti(t *testing.T) {
// db := getTestDB()
key := []byte("test_multi_1")
v1 := []byte("v1")
v2 := []byte("v2")
// key := []byte("test_multi_1")
// v1 := []byte("v1")
// v2 := []byte("v2")
m, err := db.Multi()
if err != nil {
t.Fatal(err)
}
// m, err := db.Multi()
// if err != nil {
// t.Fatal(err)
// }
wg := sync.WaitGroup{}
// wg := sync.WaitGroup{}
wg.Add(1)
// wg.Add(1)
go func() {
if err := db.Set(key, v2); err != nil {
t.Fatal(err)
}
wg.Done()
}()
// go func() {
// if err := db.Set(key, v2); err != nil {
// t.Fatal(err)
// }
// wg.Done()
// }()
if err := m.Set(key, v1); err != nil {
t.Fatal(err)
}
// if err := m.Set(key, v1); err != nil {
// t.Fatal(err)
// }
if v, err := m.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(v1) {
t.Fatal(string(v))
}
// if v, err := m.Get(key); err != nil {
// t.Fatal(err)
// } else if string(v) != string(v1) {
// t.Fatal(string(v))
// }
m.Close()
// m.Close()
wg.Wait()
// wg.Wait()
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(v2) {
t.Fatal(string(v))
}
// if v, err := db.Get(key); err != nil {
// t.Fatal(err)
// } else if string(v) != string(v2) {
// t.Fatal(string(v))
// }
}
// }

View File

@ -66,11 +66,11 @@ func TestReplication(t *testing.T) {
db.HSet([]byte("b"), []byte("2"), []byte("value"))
db.HSet([]byte("c"), []byte("3"), []byte("value"))
m, _ := db.Multi()
m.Set([]byte("a1"), []byte("value"))
m.Set([]byte("b1"), []byte("value"))
m.Set([]byte("c1"), []byte("value"))
m.Close()
// m, _ := db.Multi()
// m.Set([]byte("a1"), []byte("value"))
// m.Set([]byte("b1"), []byte("value"))
// m.Set([]byte("c1"), []byte("value"))
// m.Close()
slave.FlushAll()

View File

@ -1,114 +1,114 @@
package ledis
import (
"errors"
"fmt"
"github.com/siddontang/go/log"
"github.com/siddontang/ledisdb/store"
)
// import (
// "errors"
// "fmt"
// "github.com/siddontang/go/log"
// "github.com/siddontang/ledisdb/store"
// )
var (
ErrNestTx = errors.New("nest transaction not supported")
ErrTxDone = errors.New("Transaction has already been committed or rolled back")
)
// var (
// ErrNestTx = errors.New("nest transaction not supported")
// ErrTxDone = errors.New("Transaction has already been committed or rolled back")
// )
type Tx struct {
*DB
// type Tx struct {
// *DB
tx *store.Tx
// tx *store.Tx
data *store.BatchData
}
// data *store.BatchData
// }
func (db *DB) IsTransaction() bool {
return db.status == DBInTransaction
}
// func (db *DB) IsTransaction() bool {
// return db.status == DBInTransaction
// }
// Begin a transaction, it will block all other write operations before calling Commit or Rollback.
// You must be very careful to prevent long-time transaction.
func (db *DB) Begin() (*Tx, error) {
log.Warn("Transaction support will be removed later, use your own risk!!!")
// // Begin a transaction, it will block all other write operations before calling Commit or Rollback.
// // You must be very careful to prevent long-time transaction.
// func (db *DB) Begin() (*Tx, error) {
// log.Warn("Transaction support will be removed later, use your own risk!!!")
if db.IsTransaction() {
return nil, ErrNestTx
}
// if db.IsTransaction() {
// return nil, ErrNestTx
// }
tx := new(Tx)
// tx := new(Tx)
tx.data = &store.BatchData{}
// tx.data = &store.BatchData{}
tx.DB = new(DB)
tx.DB.l = db.l
// tx.DB = new(DB)
// tx.DB.l = db.l
tx.l.wLock.Lock()
// tx.l.wLock.Lock()
tx.DB.sdb = db.sdb
// tx.DB.sdb = db.sdb
var err error
tx.tx, err = db.sdb.Begin()
if err != nil {
tx.l.wLock.Unlock()
return nil, err
}
// var err error
// tx.tx, err = db.sdb.Begin()
// if err != nil {
// tx.l.wLock.Unlock()
// return nil, err
// }
tx.DB.bucket = tx.tx
// tx.DB.bucket = tx.tx
tx.DB.status = DBInTransaction
// tx.DB.status = DBInTransaction
tx.DB.index = db.index
// tx.DB.index = db.index
tx.DB.kvBatch = tx.newBatch()
tx.DB.listBatch = tx.newBatch()
tx.DB.hashBatch = tx.newBatch()
tx.DB.zsetBatch = tx.newBatch()
tx.DB.setBatch = tx.newBatch()
// tx.DB.kvBatch = tx.newBatch()
// tx.DB.listBatch = tx.newBatch()
// tx.DB.hashBatch = tx.newBatch()
// tx.DB.zsetBatch = tx.newBatch()
// tx.DB.setBatch = tx.newBatch()
tx.DB.lbkeys = db.lbkeys
// tx.DB.lbkeys = db.lbkeys
return tx, nil
}
// return tx, nil
// }
func (tx *Tx) Commit() error {
if tx.tx == nil {
return ErrTxDone
}
// func (tx *Tx) Commit() error {
// if tx.tx == nil {
// return ErrTxDone
// }
err := tx.l.handleCommit(tx.data, tx.tx)
tx.data.Reset()
// err := tx.l.handleCommit(tx.data, tx.tx)
// tx.data.Reset()
tx.tx = nil
// tx.tx = nil
tx.l.wLock.Unlock()
// tx.l.wLock.Unlock()
tx.DB.bucket = nil
// tx.DB.bucket = nil
return err
}
// return err
// }
func (tx *Tx) Rollback() error {
if tx.tx == nil {
return ErrTxDone
}
// func (tx *Tx) Rollback() error {
// if tx.tx == nil {
// return ErrTxDone
// }
err := tx.tx.Rollback()
tx.data.Reset()
tx.tx = nil
// err := tx.tx.Rollback()
// tx.data.Reset()
// tx.tx = nil
tx.l.wLock.Unlock()
tx.DB.bucket = nil
// tx.l.wLock.Unlock()
// tx.DB.bucket = nil
return err
}
// return err
// }
func (tx *Tx) newBatch() *batch {
return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
}
// func (tx *Tx) newBatch() *batch {
// return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
// }
func (tx *Tx) Select(index int) error {
if index < 0 || index >= int(tx.l.cfg.Databases) {
return fmt.Errorf("invalid db index %d", index)
}
// func (tx *Tx) Select(index int) error {
// if index < 0 || index >= int(tx.l.cfg.Databases) {
// return fmt.Errorf("invalid db index %d", index)
// }
tx.DB.index = uint8(index)
return nil
}
// tx.DB.index = uint8(index)
// return nil
// }

View File

@ -1,224 +1,224 @@
package ledis
import (
"github.com/siddontang/ledisdb/config"
"os"
"strings"
"testing"
)
// import (
// "github.com/siddontang/ledisdb/config"
// "os"
// "strings"
// "testing"
// )
func testTxRollback(t *testing.T, db *DB) {
var err error
key1 := []byte("tx_key1")
key2 := []byte("tx_key2")
field2 := []byte("tx_field2")
// func testTxRollback(t *testing.T, db *DB) {
// var err error
// key1 := []byte("tx_key1")
// key2 := []byte("tx_key2")
// field2 := []byte("tx_field2")
err = db.Set(key1, []byte("value"))
if err != nil {
t.Fatal(err)
}
// err = db.Set(key1, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
_, err = db.HSet(key2, field2, []byte("value"))
if err != nil {
t.Fatal(err)
}
// _, err = db.HSet(key2, field2, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
var tx *Tx
tx, err = db.Begin()
if err != nil {
t.Fatal(err)
}
// var tx *Tx
// tx, err = db.Begin()
// if err != nil {
// t.Fatal(err)
// }
defer tx.Rollback()
// defer tx.Rollback()
err = tx.Set(key1, []byte("1"))
// err = tx.Set(key1, []byte("1"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
_, err = tx.HSet(key2, field2, []byte("2"))
// _, err = tx.HSet(key2, field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
_, err = tx.HSet([]byte("no_key"), field2, []byte("2"))
// _, err = tx.HSet([]byte("no_key"), field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
if v, err := tx.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
// if v, err := tx.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "1" {
// t.Fatal(string(v))
// }
if v, err := tx.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
// if v, err := tx.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "2" {
// t.Fatal(string(v))
// }
err = tx.Rollback()
if err != nil {
t.Fatal(err)
}
// err = tx.Rollback()
// if err != nil {
// t.Fatal(err)
// }
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "value" {
t.Fatal(string(v))
}
// if v, err := db.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "value" {
// t.Fatal(string(v))
// }
if v, err := db.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "value" {
t.Fatal(string(v))
}
}
// if v, err := db.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "value" {
// t.Fatal(string(v))
// }
// }
func testTxCommit(t *testing.T, db *DB) {
var err error
key1 := []byte("tx_key1")
key2 := []byte("tx_key2")
field2 := []byte("tx_field2")
// func testTxCommit(t *testing.T, db *DB) {
// var err error
// key1 := []byte("tx_key1")
// key2 := []byte("tx_key2")
// field2 := []byte("tx_field2")
err = db.Set(key1, []byte("value"))
if err != nil {
t.Fatal(err)
}
// err = db.Set(key1, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
_, err = db.HSet(key2, field2, []byte("value"))
if err != nil {
t.Fatal(err)
}
// _, err = db.HSet(key2, field2, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
var tx *Tx
tx, err = db.Begin()
if err != nil {
t.Fatal(err)
}
// var tx *Tx
// tx, err = db.Begin()
// if err != nil {
// t.Fatal(err)
// }
defer tx.Rollback()
// defer tx.Rollback()
err = tx.Set(key1, []byte("1"))
// err = tx.Set(key1, []byte("1"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
_, err = tx.HSet(key2, field2, []byte("2"))
// _, err = tx.HSet(key2, field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
if v, err := tx.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
// if v, err := tx.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "1" {
// t.Fatal(string(v))
// }
if v, err := tx.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
// if v, err := tx.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "2" {
// t.Fatal(string(v))
// }
err = tx.Commit()
if err != nil {
t.Fatal(err)
}
// err = tx.Commit()
// if err != nil {
// t.Fatal(err)
// }
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
// if v, err := db.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "1" {
// t.Fatal(string(v))
// }
if v, err := db.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
}
// if v, err := db.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "2" {
// t.Fatal(string(v))
// }
// }
func testTxSelect(t *testing.T, db *DB) {
tx, err := db.Begin()
if err != nil {
t.Fatal(err)
}
// func testTxSelect(t *testing.T, db *DB) {
// tx, err := db.Begin()
// if err != nil {
// t.Fatal(err)
// }
defer tx.Rollback()
// defer tx.Rollback()
tx.Set([]byte("tx_select_1"), []byte("a"))
// tx.Set([]byte("tx_select_1"), []byte("a"))
tx.Select(1)
// tx.Select(1)
tx.Set([]byte("tx_select_2"), []byte("b"))
// tx.Set([]byte("tx_select_2"), []byte("b"))
if err = tx.Commit(); err != nil {
t.Fatal(err)
}
// if err = tx.Commit(); err != nil {
// t.Fatal(err)
// }
if v, err := db.Get([]byte("tx_select_1")); err != nil {
t.Fatal(err)
} else if string(v) != "a" {
t.Fatal(string(v))
}
// if v, err := db.Get([]byte("tx_select_1")); err != nil {
// t.Fatal(err)
// } else if string(v) != "a" {
// t.Fatal(string(v))
// }
if v, err := db.Get([]byte("tx_select_2")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
// if v, err := db.Get([]byte("tx_select_2")); err != nil {
// t.Fatal(err)
// } else if v != nil {
// t.Fatal("must nil")
// }
db, _ = db.l.Select(1)
// db, _ = db.l.Select(1)
if v, err := db.Get([]byte("tx_select_2")); err != nil {
t.Fatal(err)
} else if string(v) != "b" {
t.Fatal(string(v))
}
// if v, err := db.Get([]byte("tx_select_2")); err != nil {
// t.Fatal(err)
// } else if string(v) != "b" {
// t.Fatal(string(v))
// }
if v, err := db.Get([]byte("tx_select_1")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
}
// if v, err := db.Get([]byte("tx_select_1")); err != nil {
// t.Fatal(err)
// } else if v != nil {
// t.Fatal("must nil")
// }
// }
func testTx(t *testing.T, name string) {
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/ledis_test_tx"
// func testTx(t *testing.T, name string) {
// cfg := config.NewConfigDefault()
// cfg.DataDir = "/tmp/ledis_test_tx"
cfg.DBName = name
cfg.LMDB.MapSize = 10 * 1024 * 1024
//cfg.UseReplication = true
// cfg.DBName = name
// cfg.LMDB.MapSize = 10 * 1024 * 1024
// //cfg.UseReplication = true
os.RemoveAll(cfg.DataDir)
// os.RemoveAll(cfg.DataDir)
l, err := Open(cfg)
if err != nil {
if strings.Contains(err.Error(), "not registered") {
return
}
t.Fatal(err)
}
// l, err := Open(cfg)
// if err != nil {
// if strings.Contains(err.Error(), "not registered") {
// return
// }
// t.Fatal(err)
// }
defer l.Close()
// defer l.Close()
db, _ := l.Select(0)
// db, _ := l.Select(0)
testTxRollback(t, db)
testTxCommit(t, db)
testTxSelect(t, db)
}
// testTxRollback(t, db)
// testTxCommit(t, db)
// testTxSelect(t, db)
// }
//only lmdb, boltdb support Transaction
func TestTx(t *testing.T) {
testTx(t, "lmdb")
testTx(t, "boltdb")
}
// //only lmdb, boltdb support Transaction
// func TestTx(t *testing.T) {
// testTx(t, "lmdb")
// testTx(t, "boltdb")
// }

View File

@ -30,7 +30,7 @@ type App struct {
info *info
s *script
script *script
// handle slaves
slock sync.Mutex

View File

@ -2,38 +2,38 @@ package server
import (
"bytes"
"fmt"
// "fmt"
"github.com/siddontang/go/sync2"
"github.com/siddontang/ledisdb/ledis"
"io"
"time"
)
var txUnsupportedCmds = map[string]struct{}{
"select": struct{}{},
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"begin": struct{}{},
"flushall": struct{}{},
"flushdb": struct{}{},
"eval": struct{}{},
"xmigrate": struct{}{},
"xmigratedb": struct{}{},
}
// var txUnsupportedCmds = map[string]struct{}{
// "select": struct{}{},
// "slaveof": struct{}{},
// "fullsync": struct{}{},
// "sync": struct{}{},
// "begin": struct{}{},
// "flushall": struct{}{},
// "flushdb": struct{}{},
// "eval": struct{}{},
// "xmigrate": struct{}{},
// "xmigratedb": struct{}{},
// }
var scriptUnsupportedCmds = map[string]struct{}{
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"begin": struct{}{},
"commit": struct{}{},
"rollback": struct{}{},
"flushall": struct{}{},
"flushdb": struct{}{},
"xmigrate": struct{}{},
"xmigratedb": struct{}{},
}
// var scriptUnsupportedCmds = map[string]struct{}{
// "slaveof": struct{}{},
// "fullsync": struct{}{},
// "sync": struct{}{},
// "begin": struct{}{},
// "commit": struct{}{},
// "rollback": struct{}{},
// "flushall": struct{}{},
// "flushdb": struct{}{},
// "xmigrate": struct{}{},
// "xmigratedb": struct{}{},
// }
type responseWriter interface {
writeError(error)
@ -73,9 +73,9 @@ type client struct {
buf bytes.Buffer
tx *ledis.Tx
// tx *ledis.Tx
script *ledis.Multi
// script *ledis.Multi
slaveListeningAddr string
}
@ -104,19 +104,22 @@ func (c *client) perform() {
} else if exeCmd, ok := regCmds[c.cmd]; !ok {
err = ErrNotFound
} else {
if c.db.IsTransaction() {
if _, ok := txUnsupportedCmds[c.cmd]; ok {
err = fmt.Errorf("%s not supported in transaction", c.cmd)
}
} else if c.db.IsInMulti() {
if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
err = fmt.Errorf("%s not supported in multi", c.cmd)
}
}
// if c.db.IsTransaction() {
// if _, ok := txUnsupportedCmds[c.cmd]; ok {
// err = fmt.Errorf("%s not supported in transaction", c.cmd)
// }
// } else if c.db.IsInMulti() {
// if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
// err = fmt.Errorf("%s not supported in multi", c.cmd)
// }
// }
// if err == nil {
// err = exeCmd(c)
// }
err = exeCmd(c)
if err == nil {
err = exeCmd(c)
}
}
if c.app.access != nil {

View File

@ -105,10 +105,10 @@ func (c *respClient) run() {
c.conn.Close()
if c.tx != nil {
c.tx.Rollback()
c.tx = nil
}
// if c.tx != nil {
// c.tx.Rollback()
// c.tx = nil
// }
c.app.removeSlave(c.client, c.activeQuit)

View File

@ -37,12 +37,7 @@ func parseEvalArgs(l *lua.State, c *client) error {
}
func evalGenericCommand(c *client, evalSha1 bool) error {
m, err := c.db.Multi()
if err != nil {
return err
}
s := c.app.s
s := c.app.script
luaClient := s.c
l := s.l
@ -53,15 +48,13 @@ func evalGenericCommand(c *client, evalSha1 bool) error {
defer func() {
l.SetTop(base)
luaClient.db = nil
luaClient.script = nil
// luaClient.script = nil
s.Unlock()
m.Close()
}()
luaClient.db = m.DB
luaClient.script = m
luaClient.db = c.db
// luaClient.script = m
luaClient.remoteAddr = c.remoteAddr
if err := parseEvalArgs(l, c); err != nil {
@ -101,7 +94,6 @@ func evalGenericCommand(c *client, evalSha1 bool) error {
return err
} else {
r := luaReplyToLedisReply(l)
m.Close()
if v, ok := r.(error); ok {
return v
@ -122,7 +114,7 @@ func evalshaCommand(c *client) error {
}
func scriptCommand(c *client) error {
s := c.app.s
s := c.app.script
l := s.l
s.Lock()
@ -155,7 +147,7 @@ func scriptCommand(c *client) error {
}
func scriptLoadCommand(c *client) error {
s := c.app.s
s := c.app.script
l := s.l
if len(c.args) != 2 {
@ -181,7 +173,7 @@ func scriptLoadCommand(c *client) error {
}
func scriptExistsCommand(c *client) error {
s := c.app.s
s := c.app.script
if len(c.args) < 2 {
return ErrCmdParams
@ -201,7 +193,7 @@ func scriptExistsCommand(c *client) error {
}
func scriptFlushCommand(c *client) error {
s := c.app.s
s := c.app.script
l := s.l
if len(c.args) != 1 {

View File

@ -31,19 +31,26 @@ func selectCommand(c *client) error {
if index, err := strconv.Atoi(hack.String(c.args[0])); err != nil {
return err
} else {
if c.db.IsInMulti() {
if err := c.script.Select(index); err != nil {
return err
} else {
c.db = c.script.DB
}
// if c.db.IsInMulti() {
// if err := c.script.Select(index); err != nil {
// return err
// } else {
// c.db = c.script.DB
// }
// } else {
// if db, err := c.ldb.Select(index); err != nil {
// return err
// } else {
// c.db = db
// }
// }
if db, err := c.ldb.Select(index); err != nil {
return err
} else {
if db, err := c.ldb.Select(index); err != nil {
return err
} else {
c.db = db
}
c.db = db
}
c.resp.writeStatus(OK)
}

View File

@ -1,57 +1,57 @@
package server
import (
"errors"
)
// import (
// "errors"
// )
var errTxMiss = errors.New("transaction miss")
// var errTxMiss = errors.New("transaction miss")
func beginCommand(c *client) error {
tx, err := c.db.Begin()
if err == nil {
c.tx = tx
c.db = tx.DB
c.resp.writeStatus(OK)
}
// func beginCommand(c *client) error {
// tx, err := c.db.Begin()
// if err == nil {
// c.tx = tx
// c.db = tx.DB
// c.resp.writeStatus(OK)
// }
return err
}
// return err
// }
func commitCommand(c *client) error {
if c.tx == nil {
return errTxMiss
}
// func commitCommand(c *client) error {
// if c.tx == nil {
// return errTxMiss
// }
err := c.tx.Commit()
c.db, _ = c.ldb.Select(c.tx.Index())
c.tx = nil
// err := c.tx.Commit()
// c.db, _ = c.ldb.Select(c.tx.Index())
// c.tx = nil
if err == nil {
c.resp.writeStatus(OK)
}
// if err == nil {
// c.resp.writeStatus(OK)
// }
return err
}
// return err
// }
func rollbackCommand(c *client) error {
if c.tx == nil {
return errTxMiss
}
// func rollbackCommand(c *client) error {
// if c.tx == nil {
// return errTxMiss
// }
err := c.tx.Rollback()
// err := c.tx.Rollback()
c.db, _ = c.ldb.Select(c.tx.Index())
c.tx = nil
// c.db, _ = c.ldb.Select(c.tx.Index())
// c.tx = nil
if err == nil {
c.resp.writeStatus(OK)
}
// if err == nil {
// c.resp.writeStatus(OK)
// }
return err
}
// return err
// }
func init() {
register("begin", beginCommand)
register("commit", commitCommand)
register("rollback", rollbackCommand)
}
// func init() {
// register("begin", beginCommand)
// register("commit", commitCommand)
// register("rollback", rollbackCommand)
// }

View File

@ -152,7 +152,7 @@ func (app *App) openScript() {
s.chunks = make(map[string]struct{})
app.s = s
app.script = s
l := lua.NewState()
@ -204,9 +204,9 @@ func (app *App) openScript() {
}
func (app *App) closeScript() {
app.s.l.Close()
delMapState(app.s.l)
app.s = nil
app.script.l.Close()
delMapState(app.script.l)
app.script = nil
}
var mapState = map[*lua.State]*script{}

View File

@ -115,16 +115,13 @@ func TestLuaCall(t *testing.T) {
defer app.Close()
db, _ := app.ldb.Select(0)
m, _ := db.Multi()
defer m.Close()
luaClient := app.s.c
luaClient.db = m.DB
luaClient.script = m
luaClient := app.script.c
luaClient.db = db
l := app.s.l
l := app.script.l
err := app.s.l.DoString(testScript1)
err := app.script.l.DoString(testScript1)
if err != nil {
t.Fatal(err)
}
@ -138,7 +135,7 @@ func TestLuaCall(t *testing.T) {
t.Fatal(fmt.Sprintf("%v %T", v, v))
}
err = app.s.l.DoString(testScript2)
err = app.script.l.DoString(testScript2)
if err != nil {
t.Fatal(err)
}
@ -148,7 +145,7 @@ func TestLuaCall(t *testing.T) {
t.Fatal(fmt.Sprintf("%v %T", v, v))
}
err = app.s.l.DoString(testScript3)
err = app.script.l.DoString(testScript3)
if err != nil {
t.Fatal(err)
}
@ -159,7 +156,7 @@ func TestLuaCall(t *testing.T) {
t.Fatal(string(v))
}
err = app.s.l.DoString(testScript4)
err = app.script.l.DoString(testScript4)
if err != nil {
t.Fatal(err)
}