Merge branch 'develop'

This commit is contained in:
siddontang 2015-03-16 13:25:38 +08:00
commit 3d6eedfd0a
73 changed files with 1015 additions and 809 deletions

6
Godeps/Godeps.json generated
View File

@ -60,7 +60,7 @@
},
{
"ImportPath": "github.com/siddontang/goredis",
"Rev": "6d2857b0488d1e8b9f96b46802eacb68e29fb003"
"Rev": "f711beb9ecead18cf638a898610aa2c24ccb6dc7"
},
{
"ImportPath": "github.com/siddontang/rdb",
@ -74,10 +74,6 @@
"ImportPath": "github.com/syndtr/gosnappy/snappy",
"Rev": "ce8acff4829e0c2458a67ead32390ac0a381c862"
},
{
"ImportPath": "github.com/szferi/gomdb",
"Rev": "6bcb5a8059f9655a259774650dbe0cad422767a3"
},
{
"ImportPath": "github.com/ugorji/go/codec",
"Rev": "71c2886f5a673a35f909803f38ece5810165097b"

View File

@ -13,6 +13,10 @@ type PoolConn struct {
}
func (c *PoolConn) Close() {
if c.Conn.isClosed() {
return
}
c.c.put(c.Conn)
}

View File

@ -8,6 +8,7 @@ import (
"io"
"net"
"strconv"
"sync/atomic"
"time"
)
@ -37,6 +38,8 @@ type Conn struct {
totalReadSize sizeWriter
totalWriteSize sizeWriter
closed int32
}
func Connect(addr string) (*Conn, error) {
@ -55,11 +58,23 @@ func ConnectWithSize(addr string, readSize int, writeSize int) (*Conn, error) {
c.br = bufio.NewReaderSize(io.TeeReader(c.c, &c.totalReadSize), readSize)
c.bw = bufio.NewWriterSize(io.MultiWriter(c.c, &c.totalWriteSize), writeSize)
atomic.StoreInt32(&c.closed, 0)
return c, nil
}
func (c *Conn) Close() {
if atomic.LoadInt32(&c.closed) == 1 {
return
}
c.c.Close()
atomic.StoreInt32(&c.closed, 1)
}
func (c *Conn) isClosed() bool {
return atomic.LoadInt32(&c.closed) == 1
}
func (c *Conn) GetTotalReadSize() int64 {

View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

View File

@ -16,13 +16,13 @@ all: build
build:
$(GO) install -tags 'linenoise $(GO_BUILD_TAGS)' ./...
build_use_lmdb:
build_lmdb:
$(GO) install -tags 'linenoise $(GO_BUILD_TAGS) lmdb' ./...
test:
$(GO) test --race -tags '$(GO_BUILD_TAGS)' ./...
test_use_lmdb:
test_lmdb:
$(GO) test --race -tags '$(GO_BUILD_TAGS) lmdb' ./...
test_ledis:

View File

@ -14,7 +14,6 @@ LedisDB now supports multiple different databases as backends.
+ Rich data structure: KV, List, Hash, ZSet, Set.
+ Data storage is not limited by RAM.
+ Various backends supported: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, RAM.
+ Supports transactions using LMDB or BoltDB.
+ Supports Lua scripting.
+ Supports expiration and TTL.
+ Can be managed via redis-cli.
@ -176,7 +175,6 @@ See [Clients](https://github.com/siddontang/ledisdb/wiki/Clients) to find or con
## Caveat
+ Changing the backend database at runtime is very dangerous. Data validation is not guaranteed if this is done.
+ Beginning a transaction will block any other write operators until `commit` or `rollback` is called. Avoid long-running transactions.
+ `pcall` and `xpcall` are not supported in Lua. See the README in [golua](https://github.com/aarzilli/golua).

View File

@ -1,15 +1,13 @@
//This file was generated by .tools/generate_commands.py on Thu Mar 05 2015 15:42:49 +0800
//This file was generated by .tools/generate_commands.py on Sat Mar 14 2015 08:58:32 +0800
package main
var helpCommands = [][]string{
{"APPEND", "key value", "KV"},
{"BEGIN", "-", "Transaction"},
{"BITCOUNT", "key [start] [end]", "KV"},
{"BITOP", "operation destkey key [key ...]", "KV"},
{"BITPOS", "key bit [start] [end]", "KV"},
{"BLPOP", "key [key ...] timeout", "List"},
{"BRPOP", "key [key ...] timeout", "List"},
{"COMMIT", "-", "Transaction"},
{"CONFIG GET", "parameter", "Server"},
{"CONFIG REWRITE", "-", "Server"},
{"DECR", "key", "KV"},
@ -70,7 +68,6 @@ var helpCommands = [][]string{
{"PING", "-", "Server"},
{"RESTORE", "key ttl value", "Server"},
{"ROLE", "-", "Server"},
{"ROLLBACK", "-", "Transaction"},
{"RPOP", "key", "List"},
{"RPUSH", "key value [value ...]", "List"},
{"SADD", "key member [member ...]", "Set"},

View File

@ -214,10 +214,7 @@ func (cfg *Config) adjust() {
cfg.ConnReadBufferSize = getDefault(4*KB, cfg.ConnReadBufferSize)
cfg.ConnWriteBufferSize = getDefault(4*KB, cfg.ConnWriteBufferSize)
cfg.TTLCheckInterval = getDefault(1, cfg.TTLCheckInterval)
cfg.Databases = getDefault(0, cfg.Databases)
if cfg.Databases > 16 {
cfg.Databases = 16
}
cfg.Databases = getDefault(16, cfg.Databases)
}
func (cfg *LevelDBConfig) adjust() {

View File

@ -29,15 +29,6 @@ The same for Del.
ZSet only support int64 score, not double in Redis.
## Transaction
LedisDB supports ACID transaction using LMDB or BoltDB, maybe later it will support `multi`, `exec`, `discard`.
Transaction API:
+ `begin`
+ `commit`
+ `rollback`
## Scan

View File

@ -490,22 +490,6 @@
"readonly": true
},
"BEGIN": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"COMMIT": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"ROLLBACK": {
"arguments": "-",
"group": "Transaction",
"readonly": false
},
"FLUSHALL": {
"arguments": "-",
"group": "Server",

View File

@ -148,10 +148,6 @@ Most of the Ledisdb's commands are the same as Redis's, you can see the redis co
- [CONFIG REWRITE](#config-rewrite)
- [RESTORE key ttl value](#restore-key-ttl-value)
- [ROLE](#role)
- [Transaction](#transaction)
- [BEGIN](#begin)
- [ROLLBACK](#rollback)
- [COMMIT](#commit)
- [Script](#script)
- [EVAL script numkeys key [key ...] arg [arg ...]](#eval-script-numkeys-key-key--arg-arg-)
- [EVALSHA sha1 numkeys key [key ...] arg [arg ...]](#evalsha-sha1-numkeys-key-key--arg-arg-)
@ -2612,70 +2608,6 @@ Slave output:
4. The slave replication state, includes connect, connecting, sync and connected.
5. The slave current replication binlog id.
## Transaction
### BEGIN
Marks the start of a transaction block. Subsequent commands will be in a transaction context util using COMMIT or ROLLBACK.
You must known that `BEGIN` will block any other write operators before you `COMMIT` or `ROLLBACK`. Don't use long-time transaction.
**Return value**
Returns `OK` if the backend store engine in use supports transaction, otherwise, returns `Err`.
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> COMMIT
OK
```
### ROLLBACK
Discards all the changes of previously commands in a transaction and restores the connection state to normal.
**Return value**
Returns `OK` if in a transaction context, otherwise, `Err`
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> GET HELLO
"WORLD"
ledis> ROLLBACK
OK
ledis> GET HELLO
(nil)
```
### COMMIT
Persists the changes of all the commands in a transaction and restores the connection state to normal.
**Return value**
Returns `OK` if in a transaction context, otherwise, `Err`
**Examples**
```
ledis> BEGIN
OK
ledis> SET HELLO WORLD
OK
ledis> GET HELLO
"WORLD"
ledis> COMMIT
OK
ledis> GET HELLO
"WORLD"
```
## Script
LedisDB's script is refer to Redis, you can see more [http://redis.io/commands/eval](http://redis.io/commands/eval)

View File

@ -14,7 +14,7 @@ type batch struct {
sync.Locker
tx *Tx
// tx *Tx
}
func (b *batch) Commit() error {
@ -22,16 +22,18 @@ func (b *batch) Commit() error {
return ErrWriteInROnly
}
if b.tx == nil {
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
} else {
if b.l.r != nil {
if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
return err
}
}
return b.WriteBatch.Commit()
}
return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
// if b.tx == nil {
// return b.l.handleCommit(b.WriteBatch, b.WriteBatch)
// } else {
// if b.l.r != nil {
// if err := b.tx.data.Append(b.WriteBatch.BatchData()); err != nil {
// return err
// }
// }
// return b.WriteBatch.Commit()
// }
}
func (b *batch) Lock() {
@ -66,27 +68,25 @@ func (l *dbBatchLocker) Unlock() {
l.wrLock.RUnlock()
}
type txBatchLocker struct {
}
// type txBatchLocker struct {
// }
func (l *txBatchLocker) Lock() {}
func (l *txBatchLocker) Unlock() {}
// func (l *txBatchLocker) Lock() {}
// func (l *txBatchLocker) Unlock() {}
type multiBatchLocker struct {
}
// type multiBatchLocker struct {
// }
func (l *multiBatchLocker) Lock() {}
func (l *multiBatchLocker) Unlock() {}
// func (l *multiBatchLocker) Lock() {}
// func (l *multiBatchLocker) Unlock() {}
func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker, tx *Tx) *batch {
func (l *Ledis) newBatch(wb *store.WriteBatch, locker sync.Locker) *batch {
b := new(batch)
b.l = l
b.WriteBatch = wb
b.Locker = locker
b.tx = tx
return b
}

View File

@ -104,6 +104,8 @@ var (
)
const (
MaxDatabases int = 10240
//max key size
MaxKeySize int = 1024
@ -127,11 +129,11 @@ var (
ErrRplNotSupport = errors.New("replication not support")
)
const (
DBAutoCommit uint8 = 0x0
DBInTransaction uint8 = 0x1
DBInMulti uint8 = 0x2
)
// const (
// DBAutoCommit uint8 = 0x0
// DBInTransaction uint8 = 0x1
// DBInMulti uint8 = 0x2
// )
const (
BitAND = "and"

View File

@ -18,7 +18,11 @@ func formatEventKey(buf []byte, k []byte) ([]byte, error) {
buf = append(buf, fmt.Sprintf("%s ", TypeName[k[1]])...)
db := new(DB)
db.index = k[0]
index, _, err := decodeDBIndex(k)
if err != nil {
return nil, err
}
db.setIndex(index)
//to do format at respective place

View File

@ -18,7 +18,9 @@ type Ledis struct {
cfg *config.Config
ldb *store.DB
dbs []*DB
dbLock sync.Mutex
dbs map[int]*DB
quit chan struct{}
wg sync.WaitGroup
@ -35,7 +37,8 @@ type Ledis struct {
lock io.Closer
tcs []*ttlChecker
ttlCheckers []*ttlChecker
ttlCheckerCh chan *ttlChecker
}
func Open(cfg *config.Config) (*Ledis, error) {
@ -84,10 +87,7 @@ func Open(cfg *config.Config) (*Ledis, error) {
l.r = nil
}
l.dbs = make([]*DB, cfg.Databases)
for i := 0; i < cfg.Databases; i++ {
l.dbs[i] = l.newDB(uint8(i))
}
l.dbs = make(map[int]*DB, 16)
l.checkTTL()
@ -112,11 +112,26 @@ func (l *Ledis) Close() {
}
func (l *Ledis) Select(index int) (*DB, error) {
if index < 0 || index >= len(l.dbs) {
return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, len(l.dbs)-1)
if index < 0 || index >= MaxDatabases {
return nil, fmt.Errorf("invalid db index %d, must in [0, %d]", index, MaxDatabases-1)
}
return l.dbs[index], nil
l.dbLock.Lock()
defer l.dbLock.Unlock()
db, ok := l.dbs[index]
if ok {
return db, nil
}
db = l.newDB(index)
l.dbs[index] = db
go func(db *DB) {
l.ttlCheckerCh <- db.ttlChecker
}(db)
return db, nil
}
// Flush All will clear all data and replication logs
@ -176,19 +191,8 @@ func (l *Ledis) IsReadOnly() bool {
}
func (l *Ledis) checkTTL() {
l.tcs = make([]*ttlChecker, len(l.dbs))
for i, db := range l.dbs {
c := newTTLChecker(db)
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
// c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
l.tcs[i] = c
}
l.ttlCheckers = make([]*ttlChecker, 0, 16)
l.ttlCheckerCh = make(chan *ttlChecker, 16)
if l.cfg.TTLCheckInterval == 0 {
l.cfg.TTLCheckInterval = 1
@ -208,9 +212,12 @@ func (l *Ledis) checkTTL() {
break
}
for _, c := range l.tcs {
for _, c := range l.ttlCheckers {
c.check()
}
case c := <-l.ttlCheckerCh:
l.ttlCheckers = append(l.ttlCheckers, c)
c.check()
case <-l.quit:
return
}

View File

@ -1,6 +1,8 @@
package ledis
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/siddontang/ledisdb/store"
"sync"
@ -30,7 +32,10 @@ type DB struct {
bucket ibucket
index uint8
index int
// buffer to store index varint
indexVarBuf []byte
kvBatch *batch
listBatch *batch
@ -39,12 +44,14 @@ type DB struct {
// binBatch *batch
setBatch *batch
status uint8
// status uint8
ttlChecker *ttlChecker
lbkeys *lBlockKeys
}
func (l *Ledis) newDB(index uint8) *DB {
func (l *Ledis) newDB(index int) *DB {
d := new(DB)
d.l = l
@ -53,8 +60,8 @@ func (l *Ledis) newDB(index uint8) *DB {
d.bucket = d.sdb
d.status = DBAutoCommit
d.index = index
// d.status = DBAutoCommit
d.setIndex(index)
d.kvBatch = d.newBatch()
d.listBatch = d.newBatch()
@ -65,20 +72,70 @@ func (l *Ledis) newDB(index uint8) *DB {
d.lbkeys = newLBlockKeys()
d.ttlChecker = d.newTTLChecker()
return d
}
func decodeDBIndex(buf []byte) (int, int, error) {
index, n := binary.Uvarint(buf)
if n == 0 {
return 0, 0, fmt.Errorf("buf is too small to save index")
} else if n < 0 {
return 0, 0, fmt.Errorf("value larger than 64 bits")
} else if index > uint64(MaxDatabases) {
return 0, 0, fmt.Errorf("value %d is larger than max databases %d", index, MaxDatabases)
}
return int(index), n, nil
}
func (db *DB) setIndex(index int) {
db.index = index
// the most size for varint is 10 bytes
buf := make([]byte, 10)
n := binary.PutUvarint(buf, uint64(index))
db.indexVarBuf = buf[0:n]
}
func (db *DB) checkKeyIndex(buf []byte) (int, error) {
if len(buf) < len(db.indexVarBuf) {
return 0, fmt.Errorf("key is too small")
} else if !bytes.Equal(db.indexVarBuf, buf[0:len(db.indexVarBuf)]) {
return 0, fmt.Errorf("invalid db index")
}
return len(db.indexVarBuf), nil
}
func (db *DB) newTTLChecker() *ttlChecker {
c := new(ttlChecker)
c.db = db
c.txs = make([]*batch, maxDataType)
c.cbs = make([]onExpired, maxDataType)
c.nc = 0
c.register(KVType, db.kvBatch, db.delete)
c.register(ListType, db.listBatch, db.lDelete)
c.register(HashType, db.hashBatch, db.hDelete)
c.register(ZSetType, db.zsetBatch, db.zDelete)
// c.register(BitType, db.binBatch, db.bDelete)
c.register(SetType, db.setBatch, db.sDelete)
return c
}
func (db *DB) newBatch() *batch {
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock})
}
func (db *DB) Index() int {
return int(db.index)
}
func (db *DB) IsAutoCommit() bool {
return db.status == DBAutoCommit
}
// func (db *DB) IsAutoCommit() bool {
// return db.status == DBAutoCommit
// }
func (db *DB) FlushAll() (drop int64, err error) {
all := [...](func() (int64, error)){

View File

@ -37,24 +37,85 @@ func TestDB(t *testing.T) {
func TestSelect(t *testing.T) {
db0, _ := testLedis.Select(0)
db1, _ := testLedis.Select(1)
db1024, _ := testLedis.Select(1024)
key0 := []byte("db0_test_key")
key1 := []byte("db1_test_key")
testSelect(t, db0)
testSelect(t, db1)
testSelect(t, db1024)
}
db0.Set(key0, []byte("0"))
db1.Set(key1, []byte("1"))
if v, err := db0.Get(key0); err != nil {
func testSelect(t *testing.T, db *DB) {
key := []byte("test_select_key")
value := []byte("value")
if err := db.Set(key, value); err != nil {
t.Fatal(err)
} else if string(v) != "0" {
}
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
if v, err := db1.Get(key1); err != nil {
if _, err := db.Expire(key, 100); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
}
if _, err := db.TTL(key); err != nil {
t.Fatal(err)
}
if _, err := db.Persist(key); err != nil {
t.Fatal(err)
}
key = []byte("test_select_list_key")
if _, err := db.LPush(key, value); err != nil {
t.Fatal(err)
}
if _, err := db.LRange(key, 0, 100); err != nil {
t.Fatal(err)
}
if v, err := db.LPop(key); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
key = []byte("test_select_hash_key")
if _, err := db.HSet(key, []byte("a"), value); err != nil {
t.Fatal(err)
}
if v, err := db.HGet(key, []byte("a")); err != nil {
t.Fatal(err)
} else if string(v) != string(value) {
t.Fatal(string(v))
}
key = []byte("test_select_set_key")
if _, err := db.SAdd(key, []byte("a"), []byte("b")); err != nil {
t.Fatal(err)
}
if n, err := db.SIsMember(key, []byte("a")); err != nil {
t.Fatal(err)
} else if n != 1 {
t.Fatal(n)
}
key = []byte("test_select_zset_key")
if _, err := db.ZAdd(key, ScorePair{1, []byte("a")}, ScorePair{2, []byte("b")}); err != nil {
t.Fatal(err)
}
if v, err := db.ZRangeByScore(key, 0, 100, 0, -1); err != nil {
t.Fatal(err)
} else if len(v) != 2 {
t.Fatal(len(v))
}
}
func TestFlush(t *testing.T) {

View File

@ -1,75 +1,75 @@
package ledis
import (
"errors"
"fmt"
)
// import (
// "errors"
// "fmt"
// )
var (
ErrNestMulti = errors.New("nest multi not supported")
ErrMultiDone = errors.New("multi has been closed")
)
// var (
// ErrNestMulti = errors.New("nest multi not supported")
// ErrMultiDone = errors.New("multi has been closed")
// )
type Multi struct {
*DB
}
// type Multi struct {
// *DB
// }
func (db *DB) IsInMulti() bool {
return db.status == DBInMulti
}
// func (db *DB) IsInMulti() bool {
// return db.status == DBInMulti
// }
// begin a mutli to execute commands,
// it will block any other write operations before you close the multi, unlike transaction, mutli can not rollback
func (db *DB) Multi() (*Multi, error) {
if db.IsInMulti() {
return nil, ErrNestMulti
}
// // begin a mutli to execute commands,
// // it will block any other write operations before you close the multi, unlike transaction, mutli can not rollback
// func (db *DB) Multi() (*Multi, error) {
// if db.IsInMulti() {
// return nil, ErrNestMulti
// }
m := new(Multi)
// m := new(Multi)
m.DB = new(DB)
m.DB.status = DBInMulti
// m.DB = new(DB)
// m.DB.status = DBInMulti
m.DB.l = db.l
// m.DB.l = db.l
m.l.wLock.Lock()
// m.l.wLock.Lock()
m.DB.sdb = db.sdb
// m.DB.sdb = db.sdb
m.DB.bucket = db.sdb
// m.DB.bucket = db.sdb
m.DB.index = db.index
// m.DB.index = db.index
m.DB.kvBatch = m.newBatch()
m.DB.listBatch = m.newBatch()
m.DB.hashBatch = m.newBatch()
m.DB.zsetBatch = m.newBatch()
// m.DB.binBatch = m.newBatch()
m.DB.setBatch = m.newBatch()
// m.DB.kvBatch = m.newBatch()
// m.DB.listBatch = m.newBatch()
// m.DB.hashBatch = m.newBatch()
// m.DB.zsetBatch = m.newBatch()
// // m.DB.binBatch = m.newBatch()
// m.DB.setBatch = m.newBatch()
m.DB.lbkeys = db.lbkeys
// m.DB.lbkeys = db.lbkeys
return m, nil
}
// return m, nil
// }
func (m *Multi) newBatch() *batch {
return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
}
// func (m *Multi) newBatch() *batch {
// return m.l.newBatch(m.bucket.NewWriteBatch(), &multiBatchLocker{}, nil)
// }
func (m *Multi) Close() error {
if m.bucket == nil {
return ErrMultiDone
}
m.l.wLock.Unlock()
m.bucket = nil
return nil
}
// func (m *Multi) Close() error {
// if m.bucket == nil {
// return ErrMultiDone
// }
// m.l.wLock.Unlock()
// m.bucket = nil
// return nil
// }
func (m *Multi) Select(index int) error {
if index < 0 || index >= int(m.l.cfg.Databases) {
return fmt.Errorf("invalid db index %d", index)
}
// func (m *Multi) Select(index int) error {
// if index < 0 || index >= int(m.l.cfg.Databases) {
// return fmt.Errorf("invalid db index %d", index)
// }
m.DB.index = uint8(index)
return nil
}
// m.DB.index = uint8(index)
// return nil
// }

View File

@ -1,51 +1,51 @@
package ledis
import (
"sync"
"testing"
)
// import (
// "sync"
// "testing"
// )
func TestMulti(t *testing.T) {
db := getTestDB()
// func TestMulti(t *testing.T) {
// db := getTestDB()
key := []byte("test_multi_1")
v1 := []byte("v1")
v2 := []byte("v2")
// key := []byte("test_multi_1")
// v1 := []byte("v1")
// v2 := []byte("v2")
m, err := db.Multi()
if err != nil {
t.Fatal(err)
}
// m, err := db.Multi()
// if err != nil {
// t.Fatal(err)
// }
wg := sync.WaitGroup{}
// wg := sync.WaitGroup{}
wg.Add(1)
// wg.Add(1)
go func() {
if err := db.Set(key, v2); err != nil {
t.Fatal(err)
}
wg.Done()
}()
// go func() {
// if err := db.Set(key, v2); err != nil {
// t.Fatal(err)
// }
// wg.Done()
// }()
if err := m.Set(key, v1); err != nil {
t.Fatal(err)
}
// if err := m.Set(key, v1); err != nil {
// t.Fatal(err)
// }
if v, err := m.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(v1) {
t.Fatal(string(v))
}
// if v, err := m.Get(key); err != nil {
// t.Fatal(err)
// } else if string(v) != string(v1) {
// t.Fatal(string(v))
// }
m.Close()
// m.Close()
wg.Wait()
// wg.Wait()
if v, err := db.Get(key); err != nil {
t.Fatal(err)
} else if string(v) != string(v2) {
t.Fatal(string(v))
}
// if v, err := db.Get(key); err != nil {
// t.Fatal(err)
// } else if string(v) != string(v2) {
// t.Fatal(string(v))
// }
}
// }

View File

@ -66,11 +66,11 @@ func TestReplication(t *testing.T) {
db.HSet([]byte("b"), []byte("2"), []byte("value"))
db.HSet([]byte("c"), []byte("3"), []byte("value"))
m, _ := db.Multi()
m.Set([]byte("a1"), []byte("value"))
m.Set([]byte("b1"), []byte("value"))
m.Set([]byte("c1"), []byte("value"))
m.Close()
// m, _ := db.Multi()
// m.Set([]byte("a1"), []byte("value"))
// m.Set([]byte("b1"), []byte("value"))
// m.Set([]byte("c1"), []byte("value"))
// m.Close()
slave.FlushAll()

View File

@ -178,11 +178,22 @@ func (db *DB) encodeScanKey(storeDataType byte, key []byte) ([]byte, error) {
}
}
func (db *DB) decodeScanKey(storeDataType byte, ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != storeDataType {
return nil, errMetaKey
func (db *DB) decodeScanKey(storeDataType byte, ek []byte) (key []byte, err error) {
switch storeDataType {
case KVType:
key, err = db.decodeKVKey(ek)
case LMetaType:
key, err = db.lDecodeMetaKey(ek)
case HSizeType:
key, err = db.hDecodeSizeKey(ek)
case ZSizeType:
key, err = db.zDecodeSizeKey(ek)
case SSizeType:
key, err = db.sDecodeSizeKey(ek)
default:
err = errDataType
}
return ek[2:], nil
return
}
// for specail data scan

View File

@ -31,29 +31,41 @@ func checkHashKFSize(key []byte, field []byte) error {
}
func (db *DB) hEncodeSizeKey(key []byte) []byte {
buf := make([]byte, len(key)+2)
buf := make([]byte, len(key)+1+len(db.indexVarBuf))
buf[0] = db.index
buf[1] = HSizeType
pos := 0
n := copy(buf, db.indexVarBuf)
pos += n
buf[pos] = HSizeType
pos++
copy(buf[pos:], key)
copy(buf[2:], key)
return buf
}
func (db *DB) hDecodeSizeKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != HSizeType {
return nil, errHSizeKey
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
return ek[2:], nil
if pos+1 > len(ek) || ek[pos] != HSizeType {
return nil, errHSizeKey
}
pos++
return ek[pos:], nil
}
func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte {
buf := make([]byte, len(key)+len(field)+1+1+2+1)
buf := make([]byte, len(key)+len(field)+1+1+2+len(db.indexVarBuf))
pos := 0
buf[pos] = db.index
pos++
n := copy(buf, db.indexVarBuf)
pos += n
buf[pos] = HashType
pos++
@ -71,15 +83,24 @@ func (db *DB) hEncodeHashKey(key []byte, field []byte) []byte {
}
func (db *DB) hDecodeHashKey(ek []byte) ([]byte, []byte, error) {
if len(ek) < 5 || ek[0] != db.index || ek[1] != HashType {
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, nil, err
}
if pos+1 > len(ek) || ek[pos] != HashType {
return nil, nil, errHashKey
}
pos++
if pos+2 > len(ek) {
return nil, nil, errHashKey
}
pos := 2
keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
pos += 2
if keyLen+5 > len(ek) {
if keyLen+pos > len(ek) {
return nil, nil, errHashKey
}

View File

@ -33,19 +33,26 @@ func checkValueSize(value []byte) error {
}
func (db *DB) encodeKVKey(key []byte) []byte {
ek := make([]byte, len(key)+2)
ek[0] = db.index
ek[1] = KVType
copy(ek[2:], key)
ek := make([]byte, len(key)+1+len(db.indexVarBuf))
pos := copy(ek, db.indexVarBuf)
ek[pos] = KVType
pos++
copy(ek[pos:], key)
return ek
}
func (db *DB) decodeKVKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != KVType {
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
if pos+1 > len(ek) || ek[pos] != KVType {
return nil, errKVKey
}
return ek[2:], nil
pos++
return ek[pos:], nil
}
func (db *DB) encodeKVMinKey() []byte {

View File

@ -24,28 +24,34 @@ var errListKey = errors.New("invalid list key")
var errListSeq = errors.New("invalid list sequence, overflow")
func (db *DB) lEncodeMetaKey(key []byte) []byte {
buf := make([]byte, len(key)+2)
buf[0] = db.index
buf[1] = LMetaType
buf := make([]byte, len(key)+1+len(db.indexVarBuf))
pos := copy(buf, db.indexVarBuf)
buf[pos] = LMetaType
pos++
copy(buf[2:], key)
copy(buf[pos:], key)
return buf
}
func (db *DB) lDecodeMetaKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != LMetaType {
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
if pos+1 > len(ek) || ek[pos] != LMetaType {
return nil, errLMetaKey
}
return ek[2:], nil
pos++
return ek[pos:], nil
}
func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
buf := make([]byte, len(key)+8)
buf := make([]byte, len(key)+7+len(db.indexVarBuf))
pos := copy(buf, db.indexVarBuf)
pos := 0
buf[pos] = db.index
pos++
buf[pos] = ListType
pos++
@ -61,19 +67,33 @@ func (db *DB) lEncodeListKey(key []byte, seq int32) []byte {
}
func (db *DB) lDecodeListKey(ek []byte) (key []byte, seq int32, err error) {
if len(ek) < 8 || ek[0] != db.index || ek[1] != ListType {
pos := 0
pos, err = db.checkKeyIndex(ek)
if err != nil {
return
}
if pos+1 > len(ek) || ek[pos] != ListType {
err = errListKey
return
}
keyLen := int(binary.BigEndian.Uint16(ek[2:]))
if keyLen+8 != len(ek) {
pos++
if pos+2 > len(ek) {
err = errListKey
return
}
key = ek[4 : 4+keyLen]
seq = int32(binary.BigEndian.Uint32(ek[4+keyLen:]))
keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
pos += 2
if keyLen+pos+4 != len(ek) {
err = errListKey
return
}
key = ek[pos : pos+keyLen]
seq = int32(binary.BigEndian.Uint32(ek[pos+keyLen:]))
return
}
@ -521,11 +541,8 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([
} else if v != nil {
return []interface{}{key, v}, nil
} else {
if db.IsAutoCommit() {
//block wait can not be supported in transaction and multi
db.lbkeys.wait(key, ch)
bkeys = append(bkeys, key)
}
db.lbkeys.wait(key, ch)
bkeys = append(bkeys, key)
}
}
if len(bkeys) == 0 {
@ -575,12 +592,6 @@ func (db *DB) lblockPop(keys [][]byte, whereSeq int32, timeout time.Duration) ([
}
func (db *DB) lSignalAsReady(key []byte, num int) {
if db.status == DBInTransaction {
//for transaction, only data can be pushed after tx commit and it is hard to signal
//so we don't handle it now
return
}
db.lbkeys.signal(key, num)
}

View File

@ -29,29 +29,36 @@ func checkSetKMSize(key []byte, member []byte) error {
}
func (db *DB) sEncodeSizeKey(key []byte) []byte {
buf := make([]byte, len(key)+2)
buf := make([]byte, len(key)+1+len(db.indexVarBuf))
buf[0] = db.index
buf[1] = SSizeType
pos := copy(buf, db.indexVarBuf)
buf[pos] = SSizeType
copy(buf[2:], key)
pos++
copy(buf[pos:], key)
return buf
}
func (db *DB) sDecodeSizeKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != SSizeType {
return nil, errSSizeKey
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
return ek[2:], nil
if pos+1 > len(ek) || ek[pos] != SSizeType {
return nil, errSSizeKey
}
pos++
return ek[pos:], nil
}
func (db *DB) sEncodeSetKey(key []byte, member []byte) []byte {
buf := make([]byte, len(key)+len(member)+1+1+2+1)
buf := make([]byte, len(key)+len(member)+1+1+2+len(db.indexVarBuf))
pos := copy(buf, db.indexVarBuf)
pos := 0
buf[pos] = db.index
pos++
buf[pos] = SetType
pos++
@ -69,15 +76,25 @@ func (db *DB) sEncodeSetKey(key []byte, member []byte) []byte {
}
func (db *DB) sDecodeSetKey(ek []byte) ([]byte, []byte, error) {
if len(ek) < 5 || ek[0] != db.index || ek[1] != SetType {
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, nil, err
}
if pos+1 > len(ek) || ek[pos] != SetType {
return nil, nil, errSetKey
}
pos++
if pos+2 > len(ek) {
return nil, nil, errSetKey
}
pos := 2
keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
pos += 2
if keyLen+5 > len(ek) {
if keyLen+pos > len(ek) {
return nil, nil, errSetKey
}

View File

@ -28,11 +28,12 @@ type ttlChecker struct {
var errExpType = errors.New("invalid expire type")
func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
buf := make([]byte, len(key)+11)
buf := make([]byte, len(key)+10+len(db.indexVarBuf))
buf[0] = db.index
buf[1] = ExpTimeType
pos := 2
pos := copy(buf, db.indexVarBuf)
buf[pos] = ExpTimeType
pos++
binary.BigEndian.PutUint64(buf[pos:], uint64(when))
pos += 8
@ -46,12 +47,13 @@ func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
}
func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
buf := make([]byte, len(key)+3)
buf := make([]byte, len(key)+2+len(db.indexVarBuf))
buf[0] = db.index
buf[1] = ExpMetaType
buf[2] = dataType
pos := 3
pos := copy(buf, db.indexVarBuf)
buf[pos] = ExpMetaType
pos++
buf[pos] = dataType
pos++
copy(buf[pos:], key)
@ -59,19 +61,29 @@ func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
}
func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType {
pos, err := db.checkKeyIndex(mk)
if err != nil {
return 0, nil, err
}
if pos+2 > len(mk) || mk[pos] != ExpMetaType {
return 0, nil, errExpMetaKey
}
return mk[2], mk[3:], nil
return mk[pos+1], mk[pos+2:], nil
}
func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType {
pos, err := db.checkKeyIndex(tk)
if err != nil {
return 0, nil, 0, err
}
if pos+10 > len(tk) || tk[pos] != ExpTimeType {
return 0, nil, 0, errExpTimeKey
}
return tk[10], tk[11:], int64(binary.BigEndian.Uint64(tk[2:])), nil
return tk[pos+9], tk[pos+10:], int64(binary.BigEndian.Uint64(tk[pos+1:])), nil
}
func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) {
@ -85,8 +97,7 @@ func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
t.Put(tk, mk)
t.Put(mk, PutInt64(when))
tc := db.l.tcs[db.index]
tc.setNextCheckTime(when, false)
db.ttlChecker.setNextCheckTime(when, false)
}
func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
@ -121,15 +132,6 @@ func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) {
}
}
func newTTLChecker(db *DB) *ttlChecker {
c := new(ttlChecker)
c.db = db
c.txs = make([]*batch, maxDataType)
c.cbs = make([]onExpired, maxDataType)
c.nc = 0
return c
}
func (c *ttlChecker) register(dataType byte, t *batch, f onExpired) {
c.txs[dataType] = t
c.cbs[dataType] = f

View File

@ -437,3 +437,31 @@ func TestExpCompose(t *testing.T) {
return
}
func TestTTLCodec(t *testing.T) {
db := getTestDB()
key := []byte("key")
ek := db.expEncodeTimeKey(KVType, key, 10)
if tp, k, when, err := db.expDecodeTimeKey(ek); err != nil {
t.Fatal(err)
} else if tp != KVType {
t.Fatal(tp, KVType)
} else if string(k) != "key" {
t.Fatal(string(k))
} else if when != 10 {
t.Fatal(when)
}
ek = db.expEncodeMetaKey(KVType, key)
if tp, k, err := db.expDecodeMetaKey(ek); err != nil {
t.Fatal(err)
} else if tp != KVType {
t.Fatal(tp, KVType)
} else if string(k) != "key" {
t.Fatal(string(k))
}
}

View File

@ -51,28 +51,31 @@ func checkZSetKMSize(key []byte, member []byte) error {
}
func (db *DB) zEncodeSizeKey(key []byte) []byte {
buf := make([]byte, len(key)+2)
buf[0] = db.index
buf[1] = ZSizeType
copy(buf[2:], key)
buf := make([]byte, len(key)+1+len(db.indexVarBuf))
pos := copy(buf, db.indexVarBuf)
buf[pos] = ZSizeType
pos++
copy(buf[pos:], key)
return buf
}
func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) {
if len(ek) < 2 || ek[0] != db.index || ek[1] != ZSizeType {
return nil, errZSizeKey
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, err
}
return ek[2:], nil
if pos+1 > len(ek) || ek[pos] != ZSizeType {
return nil, errZSizeKey
}
pos++
return ek[pos:], nil
}
func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
buf := make([]byte, len(key)+len(member)+5)
buf := make([]byte, len(key)+len(member)+4+len(db.indexVarBuf))
pos := 0
buf[pos] = db.index
pos++
pos := copy(buf, db.indexVarBuf)
buf[pos] = ZSetType
pos++
@ -92,22 +95,35 @@ func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
}
func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) {
if len(ek) < 5 || ek[0] != db.index || ek[1] != ZSetType {
pos, err := db.checkKeyIndex(ek)
if err != nil {
return nil, nil, err
}
if pos+1 > len(ek) || ek[pos] != ZSetType {
return nil, nil, errZSetKey
}
keyLen := int(binary.BigEndian.Uint16(ek[2:]))
if keyLen+5 > len(ek) {
pos++
if pos+2 > len(ek) {
return nil, nil, errZSetKey
}
key := ek[4 : 4+keyLen]
if ek[4+keyLen] != zsetStartMemSep {
keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
if keyLen+pos > len(ek) {
return nil, nil, errZSetKey
}
member := ek[5+keyLen:]
pos += 2
key := ek[pos : pos+keyLen]
if ek[pos+keyLen] != zsetStartMemSep {
return nil, nil, errZSetKey
}
pos++
member := ek[pos+keyLen:]
return key, member, nil
}
@ -123,11 +139,9 @@ func (db *DB) zEncodeStopSetKey(key []byte) []byte {
}
func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte {
buf := make([]byte, len(key)+len(member)+14)
buf := make([]byte, len(key)+len(member)+13+len(db.indexVarBuf))
pos := 0
buf[pos] = db.index
pos++
pos := copy(buf, db.indexVarBuf)
buf[pos] = ZScoreType
pos++
@ -166,20 +180,38 @@ func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte {
}
func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) {
if len(ek) < 14 || ek[0] != db.index || ek[1] != ZScoreType {
pos := 0
pos, err = db.checkKeyIndex(ek)
if err != nil {
return
}
if pos+1 > len(ek) || ek[pos] != ZScoreType {
err = errZScoreKey
return
}
pos++
if pos+2 > len(ek) {
err = errZScoreKey
return
}
keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
pos += 2
if keyLen+pos > len(ek) {
err = errZScoreKey
return
}
keyLen := int(binary.BigEndian.Uint16(ek[2:]))
if keyLen+14 > len(ek) {
key = ek[pos : pos+keyLen]
pos += keyLen
if pos+10 > len(ek) {
err = errZScoreKey
return
}
key = ek[4 : 4+keyLen]
pos := 4 + keyLen
if (ek[pos] != zsetNScoreSep) && (ek[pos] != zsetPScoreSep) {
err = errZScoreKey
return

View File

@ -1,114 +1,114 @@
package ledis
import (
"errors"
"fmt"
"github.com/siddontang/go/log"
"github.com/siddontang/ledisdb/store"
)
// import (
// "errors"
// "fmt"
// "github.com/siddontang/go/log"
// "github.com/siddontang/ledisdb/store"
// )
var (
ErrNestTx = errors.New("nest transaction not supported")
ErrTxDone = errors.New("Transaction has already been committed or rolled back")
)
// var (
// ErrNestTx = errors.New("nest transaction not supported")
// ErrTxDone = errors.New("Transaction has already been committed or rolled back")
// )
type Tx struct {
*DB
// type Tx struct {
// *DB
tx *store.Tx
// tx *store.Tx
data *store.BatchData
}
// data *store.BatchData
// }
func (db *DB) IsTransaction() bool {
return db.status == DBInTransaction
}
// func (db *DB) IsTransaction() bool {
// return db.status == DBInTransaction
// }
// Begin a transaction, it will block all other write operations before calling Commit or Rollback.
// You must be very careful to prevent long-time transaction.
func (db *DB) Begin() (*Tx, error) {
log.Warn("Transaction support will be removed later, use your own risk!!!")
// // Begin a transaction, it will block all other write operations before calling Commit or Rollback.
// // You must be very careful to prevent long-time transaction.
// func (db *DB) Begin() (*Tx, error) {
// log.Warn("Transaction support will be removed later, use your own risk!!!")
if db.IsTransaction() {
return nil, ErrNestTx
}
// if db.IsTransaction() {
// return nil, ErrNestTx
// }
tx := new(Tx)
// tx := new(Tx)
tx.data = &store.BatchData{}
// tx.data = &store.BatchData{}
tx.DB = new(DB)
tx.DB.l = db.l
// tx.DB = new(DB)
// tx.DB.l = db.l
tx.l.wLock.Lock()
// tx.l.wLock.Lock()
tx.DB.sdb = db.sdb
// tx.DB.sdb = db.sdb
var err error
tx.tx, err = db.sdb.Begin()
if err != nil {
tx.l.wLock.Unlock()
return nil, err
}
// var err error
// tx.tx, err = db.sdb.Begin()
// if err != nil {
// tx.l.wLock.Unlock()
// return nil, err
// }
tx.DB.bucket = tx.tx
// tx.DB.bucket = tx.tx
tx.DB.status = DBInTransaction
// tx.DB.status = DBInTransaction
tx.DB.index = db.index
// tx.DB.index = db.index
tx.DB.kvBatch = tx.newBatch()
tx.DB.listBatch = tx.newBatch()
tx.DB.hashBatch = tx.newBatch()
tx.DB.zsetBatch = tx.newBatch()
tx.DB.setBatch = tx.newBatch()
// tx.DB.kvBatch = tx.newBatch()
// tx.DB.listBatch = tx.newBatch()
// tx.DB.hashBatch = tx.newBatch()
// tx.DB.zsetBatch = tx.newBatch()
// tx.DB.setBatch = tx.newBatch()
tx.DB.lbkeys = db.lbkeys
// tx.DB.lbkeys = db.lbkeys
return tx, nil
}
// return tx, nil
// }
func (tx *Tx) Commit() error {
if tx.tx == nil {
return ErrTxDone
}
// func (tx *Tx) Commit() error {
// if tx.tx == nil {
// return ErrTxDone
// }
err := tx.l.handleCommit(tx.data, tx.tx)
tx.data.Reset()
// err := tx.l.handleCommit(tx.data, tx.tx)
// tx.data.Reset()
tx.tx = nil
// tx.tx = nil
tx.l.wLock.Unlock()
// tx.l.wLock.Unlock()
tx.DB.bucket = nil
// tx.DB.bucket = nil
return err
}
// return err
// }
func (tx *Tx) Rollback() error {
if tx.tx == nil {
return ErrTxDone
}
// func (tx *Tx) Rollback() error {
// if tx.tx == nil {
// return ErrTxDone
// }
err := tx.tx.Rollback()
tx.data.Reset()
tx.tx = nil
// err := tx.tx.Rollback()
// tx.data.Reset()
// tx.tx = nil
tx.l.wLock.Unlock()
tx.DB.bucket = nil
// tx.l.wLock.Unlock()
// tx.DB.bucket = nil
return err
}
// return err
// }
func (tx *Tx) newBatch() *batch {
return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
}
// func (tx *Tx) newBatch() *batch {
// return tx.l.newBatch(tx.tx.NewWriteBatch(), &txBatchLocker{}, tx)
// }
func (tx *Tx) Select(index int) error {
if index < 0 || index >= int(tx.l.cfg.Databases) {
return fmt.Errorf("invalid db index %d", index)
}
// func (tx *Tx) Select(index int) error {
// if index < 0 || index >= int(tx.l.cfg.Databases) {
// return fmt.Errorf("invalid db index %d", index)
// }
tx.DB.index = uint8(index)
return nil
}
// tx.DB.index = uint8(index)
// return nil
// }

View File

@ -1,224 +1,224 @@
package ledis
import (
"github.com/siddontang/ledisdb/config"
"os"
"strings"
"testing"
)
// import (
// "github.com/siddontang/ledisdb/config"
// "os"
// "strings"
// "testing"
// )
func testTxRollback(t *testing.T, db *DB) {
var err error
key1 := []byte("tx_key1")
key2 := []byte("tx_key2")
field2 := []byte("tx_field2")
// func testTxRollback(t *testing.T, db *DB) {
// var err error
// key1 := []byte("tx_key1")
// key2 := []byte("tx_key2")
// field2 := []byte("tx_field2")
err = db.Set(key1, []byte("value"))
if err != nil {
t.Fatal(err)
}
// err = db.Set(key1, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
_, err = db.HSet(key2, field2, []byte("value"))
if err != nil {
t.Fatal(err)
}
// _, err = db.HSet(key2, field2, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
var tx *Tx
tx, err = db.Begin()
if err != nil {
t.Fatal(err)
}
// var tx *Tx
// tx, err = db.Begin()
// if err != nil {
// t.Fatal(err)
// }
defer tx.Rollback()
// defer tx.Rollback()
err = tx.Set(key1, []byte("1"))
// err = tx.Set(key1, []byte("1"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
_, err = tx.HSet(key2, field2, []byte("2"))
// _, err = tx.HSet(key2, field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
_, err = tx.HSet([]byte("no_key"), field2, []byte("2"))
// _, err = tx.HSet([]byte("no_key"), field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
if v, err := tx.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
// if v, err := tx.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "1" {
// t.Fatal(string(v))
// }
if v, err := tx.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
// if v, err := tx.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "2" {
// t.Fatal(string(v))
// }
err = tx.Rollback()
if err != nil {
t.Fatal(err)
}
// err = tx.Rollback()
// if err != nil {
// t.Fatal(err)
// }
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "value" {
t.Fatal(string(v))
}
// if v, err := db.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "value" {
// t.Fatal(string(v))
// }
if v, err := db.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "value" {
t.Fatal(string(v))
}
}
// if v, err := db.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "value" {
// t.Fatal(string(v))
// }
// }
func testTxCommit(t *testing.T, db *DB) {
var err error
key1 := []byte("tx_key1")
key2 := []byte("tx_key2")
field2 := []byte("tx_field2")
// func testTxCommit(t *testing.T, db *DB) {
// var err error
// key1 := []byte("tx_key1")
// key2 := []byte("tx_key2")
// field2 := []byte("tx_field2")
err = db.Set(key1, []byte("value"))
if err != nil {
t.Fatal(err)
}
// err = db.Set(key1, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
_, err = db.HSet(key2, field2, []byte("value"))
if err != nil {
t.Fatal(err)
}
// _, err = db.HSet(key2, field2, []byte("value"))
// if err != nil {
// t.Fatal(err)
// }
var tx *Tx
tx, err = db.Begin()
if err != nil {
t.Fatal(err)
}
// var tx *Tx
// tx, err = db.Begin()
// if err != nil {
// t.Fatal(err)
// }
defer tx.Rollback()
// defer tx.Rollback()
err = tx.Set(key1, []byte("1"))
// err = tx.Set(key1, []byte("1"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
_, err = tx.HSet(key2, field2, []byte("2"))
// _, err = tx.HSet(key2, field2, []byte("2"))
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
if v, err := tx.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
// if v, err := tx.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "1" {
// t.Fatal(string(v))
// }
if v, err := tx.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
// if v, err := tx.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "2" {
// t.Fatal(string(v))
// }
err = tx.Commit()
if err != nil {
t.Fatal(err)
}
// err = tx.Commit()
// if err != nil {
// t.Fatal(err)
// }
if v, err := db.Get(key1); err != nil {
t.Fatal(err)
} else if string(v) != "1" {
t.Fatal(string(v))
}
// if v, err := db.Get(key1); err != nil {
// t.Fatal(err)
// } else if string(v) != "1" {
// t.Fatal(string(v))
// }
if v, err := db.HGet(key2, field2); err != nil {
t.Fatal(err)
} else if string(v) != "2" {
t.Fatal(string(v))
}
}
// if v, err := db.HGet(key2, field2); err != nil {
// t.Fatal(err)
// } else if string(v) != "2" {
// t.Fatal(string(v))
// }
// }
func testTxSelect(t *testing.T, db *DB) {
tx, err := db.Begin()
if err != nil {
t.Fatal(err)
}
// func testTxSelect(t *testing.T, db *DB) {
// tx, err := db.Begin()
// if err != nil {
// t.Fatal(err)
// }
defer tx.Rollback()
// defer tx.Rollback()
tx.Set([]byte("tx_select_1"), []byte("a"))
// tx.Set([]byte("tx_select_1"), []byte("a"))
tx.Select(1)
// tx.Select(1)
tx.Set([]byte("tx_select_2"), []byte("b"))
// tx.Set([]byte("tx_select_2"), []byte("b"))
if err = tx.Commit(); err != nil {
t.Fatal(err)
}
// if err = tx.Commit(); err != nil {
// t.Fatal(err)
// }
if v, err := db.Get([]byte("tx_select_1")); err != nil {
t.Fatal(err)
} else if string(v) != "a" {
t.Fatal(string(v))
}
// if v, err := db.Get([]byte("tx_select_1")); err != nil {
// t.Fatal(err)
// } else if string(v) != "a" {
// t.Fatal(string(v))
// }
if v, err := db.Get([]byte("tx_select_2")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
// if v, err := db.Get([]byte("tx_select_2")); err != nil {
// t.Fatal(err)
// } else if v != nil {
// t.Fatal("must nil")
// }
db, _ = db.l.Select(1)
// db, _ = db.l.Select(1)
if v, err := db.Get([]byte("tx_select_2")); err != nil {
t.Fatal(err)
} else if string(v) != "b" {
t.Fatal(string(v))
}
// if v, err := db.Get([]byte("tx_select_2")); err != nil {
// t.Fatal(err)
// } else if string(v) != "b" {
// t.Fatal(string(v))
// }
if v, err := db.Get([]byte("tx_select_1")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatal("must nil")
}
}
// if v, err := db.Get([]byte("tx_select_1")); err != nil {
// t.Fatal(err)
// } else if v != nil {
// t.Fatal("must nil")
// }
// }
func testTx(t *testing.T, name string) {
cfg := config.NewConfigDefault()
cfg.DataDir = "/tmp/ledis_test_tx"
// func testTx(t *testing.T, name string) {
// cfg := config.NewConfigDefault()
// cfg.DataDir = "/tmp/ledis_test_tx"
cfg.DBName = name
cfg.LMDB.MapSize = 10 * 1024 * 1024
//cfg.UseReplication = true
// cfg.DBName = name
// cfg.LMDB.MapSize = 10 * 1024 * 1024
// //cfg.UseReplication = true
os.RemoveAll(cfg.DataDir)
// os.RemoveAll(cfg.DataDir)
l, err := Open(cfg)
if err != nil {
if strings.Contains(err.Error(), "not registered") {
return
}
t.Fatal(err)
}
// l, err := Open(cfg)
// if err != nil {
// if strings.Contains(err.Error(), "not registered") {
// return
// }
// t.Fatal(err)
// }
defer l.Close()
// defer l.Close()
db, _ := l.Select(0)
// db, _ := l.Select(0)
testTxRollback(t, db)
testTxCommit(t, db)
testTxSelect(t, db)
}
// testTxRollback(t, db)
// testTxCommit(t, db)
// testTxSelect(t, db)
// }
//only lmdb, boltdb support Transaction
func TestTx(t *testing.T) {
testTx(t, "lmdb")
testTx(t, "boltdb")
}
// //only lmdb, boltdb support Transaction
// func TestTx(t *testing.T) {
// testTx(t, "lmdb")
// testTx(t, "boltdb")
// }

View File

@ -30,7 +30,7 @@ type App struct {
info *info
s *script
script *script
// handle slaves
slock sync.Mutex

View File

@ -2,38 +2,38 @@ package server
import (
"bytes"
"fmt"
// "fmt"
"github.com/siddontang/go/sync2"
"github.com/siddontang/ledisdb/ledis"
"io"
"time"
)
var txUnsupportedCmds = map[string]struct{}{
"select": struct{}{},
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"begin": struct{}{},
"flushall": struct{}{},
"flushdb": struct{}{},
"eval": struct{}{},
"xmigrate": struct{}{},
"xmigratedb": struct{}{},
}
// var txUnsupportedCmds = map[string]struct{}{
// "select": struct{}{},
// "slaveof": struct{}{},
// "fullsync": struct{}{},
// "sync": struct{}{},
// "begin": struct{}{},
// "flushall": struct{}{},
// "flushdb": struct{}{},
// "eval": struct{}{},
// "xmigrate": struct{}{},
// "xmigratedb": struct{}{},
// }
var scriptUnsupportedCmds = map[string]struct{}{
"slaveof": struct{}{},
"fullsync": struct{}{},
"sync": struct{}{},
"begin": struct{}{},
"commit": struct{}{},
"rollback": struct{}{},
"flushall": struct{}{},
"flushdb": struct{}{},
"xmigrate": struct{}{},
"xmigratedb": struct{}{},
}
// var scriptUnsupportedCmds = map[string]struct{}{
// "slaveof": struct{}{},
// "fullsync": struct{}{},
// "sync": struct{}{},
// "begin": struct{}{},
// "commit": struct{}{},
// "rollback": struct{}{},
// "flushall": struct{}{},
// "flushdb": struct{}{},
// "xmigrate": struct{}{},
// "xmigratedb": struct{}{},
// }
type responseWriter interface {
writeError(error)
@ -73,9 +73,9 @@ type client struct {
buf bytes.Buffer
tx *ledis.Tx
// tx *ledis.Tx
script *ledis.Multi
// script *ledis.Multi
slaveListeningAddr string
}
@ -104,19 +104,22 @@ func (c *client) perform() {
} else if exeCmd, ok := regCmds[c.cmd]; !ok {
err = ErrNotFound
} else {
if c.db.IsTransaction() {
if _, ok := txUnsupportedCmds[c.cmd]; ok {
err = fmt.Errorf("%s not supported in transaction", c.cmd)
}
} else if c.db.IsInMulti() {
if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
err = fmt.Errorf("%s not supported in multi", c.cmd)
}
}
// if c.db.IsTransaction() {
// if _, ok := txUnsupportedCmds[c.cmd]; ok {
// err = fmt.Errorf("%s not supported in transaction", c.cmd)
// }
// } else if c.db.IsInMulti() {
// if _, ok := scriptUnsupportedCmds[c.cmd]; ok {
// err = fmt.Errorf("%s not supported in multi", c.cmd)
// }
// }
// if err == nil {
// err = exeCmd(c)
// }
err = exeCmd(c)
if err == nil {
err = exeCmd(c)
}
}
if c.app.access != nil {

View File

@ -105,10 +105,10 @@ func (c *respClient) run() {
c.conn.Close()
if c.tx != nil {
c.tx.Rollback()
c.tx = nil
}
// if c.tx != nil {
// c.tx.Rollback()
// c.tx = nil
// }
c.app.removeSlave(c.client, c.activeQuit)

View File

@ -8,7 +8,7 @@ import (
"fmt"
"github.com/siddontang/go/hack"
"github.com/siddontang/ledisdb/lua"
"github.com/siddontang/ledisdb/vendor/lua"
"strconv"
"strings"
)
@ -37,12 +37,7 @@ func parseEvalArgs(l *lua.State, c *client) error {
}
func evalGenericCommand(c *client, evalSha1 bool) error {
m, err := c.db.Multi()
if err != nil {
return err
}
s := c.app.s
s := c.app.script
luaClient := s.c
l := s.l
@ -53,15 +48,13 @@ func evalGenericCommand(c *client, evalSha1 bool) error {
defer func() {
l.SetTop(base)
luaClient.db = nil
luaClient.script = nil
// luaClient.script = nil
s.Unlock()
m.Close()
}()
luaClient.db = m.DB
luaClient.script = m
luaClient.db = c.db
// luaClient.script = m
luaClient.remoteAddr = c.remoteAddr
if err := parseEvalArgs(l, c); err != nil {
@ -101,7 +94,6 @@ func evalGenericCommand(c *client, evalSha1 bool) error {
return err
} else {
r := luaReplyToLedisReply(l)
m.Close()
if v, ok := r.(error); ok {
return v
@ -122,7 +114,7 @@ func evalshaCommand(c *client) error {
}
func scriptCommand(c *client) error {
s := c.app.s
s := c.app.script
l := s.l
s.Lock()
@ -155,7 +147,7 @@ func scriptCommand(c *client) error {
}
func scriptLoadCommand(c *client) error {
s := c.app.s
s := c.app.script
l := s.l
if len(c.args) != 2 {
@ -181,7 +173,7 @@ func scriptLoadCommand(c *client) error {
}
func scriptExistsCommand(c *client) error {
s := c.app.s
s := c.app.script
if len(c.args) < 2 {
return ErrCmdParams
@ -201,7 +193,7 @@ func scriptExistsCommand(c *client) error {
}
func scriptFlushCommand(c *client) error {
s := c.app.s
s := c.app.script
l := s.l
if len(c.args) != 1 {

View File

@ -31,19 +31,26 @@ func selectCommand(c *client) error {
if index, err := strconv.Atoi(hack.String(c.args[0])); err != nil {
return err
} else {
if c.db.IsInMulti() {
if err := c.script.Select(index); err != nil {
return err
} else {
c.db = c.script.DB
}
// if c.db.IsInMulti() {
// if err := c.script.Select(index); err != nil {
// return err
// } else {
// c.db = c.script.DB
// }
// } else {
// if db, err := c.ldb.Select(index); err != nil {
// return err
// } else {
// c.db = db
// }
// }
if db, err := c.ldb.Select(index); err != nil {
return err
} else {
if db, err := c.ldb.Select(index); err != nil {
return err
} else {
c.db = db
}
c.db = db
}
c.resp.writeStatus(OK)
}

View File

@ -1,57 +1,57 @@
package server
import (
"errors"
)
// import (
// "errors"
// )
var errTxMiss = errors.New("transaction miss")
// var errTxMiss = errors.New("transaction miss")
func beginCommand(c *client) error {
tx, err := c.db.Begin()
if err == nil {
c.tx = tx
c.db = tx.DB
c.resp.writeStatus(OK)
}
// func beginCommand(c *client) error {
// tx, err := c.db.Begin()
// if err == nil {
// c.tx = tx
// c.db = tx.DB
// c.resp.writeStatus(OK)
// }
return err
}
// return err
// }
func commitCommand(c *client) error {
if c.tx == nil {
return errTxMiss
}
// func commitCommand(c *client) error {
// if c.tx == nil {
// return errTxMiss
// }
err := c.tx.Commit()
c.db, _ = c.ldb.Select(c.tx.Index())
c.tx = nil
// err := c.tx.Commit()
// c.db, _ = c.ldb.Select(c.tx.Index())
// c.tx = nil
if err == nil {
c.resp.writeStatus(OK)
}
// if err == nil {
// c.resp.writeStatus(OK)
// }
return err
}
// return err
// }
func rollbackCommand(c *client) error {
if c.tx == nil {
return errTxMiss
}
// func rollbackCommand(c *client) error {
// if c.tx == nil {
// return errTxMiss
// }
err := c.tx.Rollback()
// err := c.tx.Rollback()
c.db, _ = c.ldb.Select(c.tx.Index())
c.tx = nil
// c.db, _ = c.ldb.Select(c.tx.Index())
// c.tx = nil
if err == nil {
c.resp.writeStatus(OK)
}
// if err == nil {
// c.resp.writeStatus(OK)
// }
return err
}
// return err
// }
func init() {
register("begin", beginCommand)
register("commit", commitCommand)
register("rollback", rollbackCommand)
}
// func init() {
// register("begin", beginCommand)
// register("commit", commitCommand)
// register("rollback", rollbackCommand)
// }

View File

@ -8,7 +8,7 @@ import (
"github.com/siddontang/go/hack"
"github.com/siddontang/go/num"
"github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/lua"
"github.com/siddontang/ledisdb/vendor/lua"
"io"
"sync"
)
@ -152,7 +152,7 @@ func (app *App) openScript() {
s.chunks = make(map[string]struct{})
app.s = s
app.script = s
l := lua.NewState()
@ -204,9 +204,9 @@ func (app *App) openScript() {
}
func (app *App) closeScript() {
app.s.l.Close()
delMapState(app.s.l)
app.s = nil
app.script.l.Close()
delMapState(app.script.l)
app.script = nil
}
var mapState = map[*lua.State]*script{}

View File

@ -5,7 +5,7 @@ package server
import (
"fmt"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/lua"
"github.com/siddontang/ledisdb/vendor/lua"
"testing"
)
@ -115,16 +115,13 @@ func TestLuaCall(t *testing.T) {
defer app.Close()
db, _ := app.ldb.Select(0)
m, _ := db.Multi()
defer m.Close()
luaClient := app.s.c
luaClient.db = m.DB
luaClient.script = m
luaClient := app.script.c
luaClient.db = db
l := app.s.l
l := app.script.l
err := app.s.l.DoString(testScript1)
err := app.script.l.DoString(testScript1)
if err != nil {
t.Fatal(err)
}
@ -138,7 +135,7 @@ func TestLuaCall(t *testing.T) {
t.Fatal(fmt.Sprintf("%v %T", v, v))
}
err = app.s.l.DoString(testScript2)
err = app.script.l.DoString(testScript2)
if err != nil {
t.Fatal(err)
}
@ -148,7 +145,7 @@ func TestLuaCall(t *testing.T) {
t.Fatal(fmt.Sprintf("%v %T", v, v))
}
err = app.s.l.DoString(testScript3)
err = app.script.l.DoString(testScript3)
if err != nil {
t.Fatal(err)
}
@ -159,7 +156,7 @@ func TestLuaCall(t *testing.T) {
t.Fatal(string(v))
}
err = app.s.l.DoString(testScript4)
err = app.script.l.DoString(testScript4)
if err != nil {
t.Fatal(err)
}

View File

@ -5,7 +5,7 @@ package mdb
import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
mdb "github.com/szferi/gomdb"
mdb "github.com/siddontang/ledisdb/vendor/gomdb"
"os"
)

View File

@ -4,7 +4,7 @@ package mdb
import (
"github.com/siddontang/ledisdb/store/driver"
mdb "github.com/szferi/gomdb"
mdb "github.com/siddontang/ledisdb/vendor/gomdb"
)
type Snapshot struct {

View File

@ -4,7 +4,7 @@ package mdb
import (
"github.com/siddontang/ledisdb/store/driver"
mdb "github.com/szferi/gomdb"
mdb "github.com/siddontang/ledisdb/vendor/gomdb"
)
type Tx struct {

3
vendor/README.md vendored Normal file
View File

@ -0,0 +1,3 @@
[godep](https://github.com/tools/godep) can not save packages which have build tags.
So we put these packages here explicitly.

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
import (

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
/*

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
/*

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
import (

View File

@ -1,8 +1,10 @@
// +build lmdb
package mdb
import (
"testing"
"syscall"
"testing"
)
func TestErrno(t *testing.T) {

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
import (

View File

@ -1,3 +1,5 @@
// +build lmdb
/** @file lmdb.h
* @brief Lightning memory-mapped database library
*

View File

@ -1,3 +1,5 @@
// +build lmdb
/** @file mdb.c
* @brief Lightning memory-mapped database library
*

View File

@ -1,3 +1,5 @@
// +build lmdb
/*
A thin wrapper for the lmdb C library. These are low-level bindings for the C
API. The C documentation should be used as a reference while developing

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
import (

View File

@ -1,3 +1,5 @@
// +build lmdb
/** @file midl.c
* @brief ldap bdb back-end ID List functions */
/* $OpenLDAP$ */

View File

@ -1,3 +1,5 @@
// +build lmdb
/** @file midl.h
* @brief LMDB ID List header file.
*

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
/*
@ -66,20 +68,20 @@ func (env *Env) BeginTxn(parent *Txn, flags uint) (*Txn, error) {
func (txn *Txn) Commit() error {
ret := C.mdb_txn_commit(txn._txn)
runtime.UnlockOSThread()
// The transaction handle is freed if there was no error
if ret == C.MDB_SUCCESS {
txn._txn = nil
}
// The transaction handle is freed if there was no error
if ret == C.MDB_SUCCESS {
txn._txn = nil
}
return errno(ret)
}
func (txn *Txn) Abort() {
if txn._txn == nil {
return
}
C.mdb_txn_abort(txn._txn)
return
}
C.mdb_txn_abort(txn._txn)
runtime.UnlockOSThread()
// The transaction handle is always freed.
// The transaction handle is always freed.
txn._txn = nil
}

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
/*

View File

@ -1,3 +1,5 @@
// +build lmdb
package mdb
import (

22
vendor/lua/LICENSE vendored Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 siddontang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

View File

View File

View File

View File