From e860902fef571cda7b33850941f2bb30ee84dc48 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 15 May 2014 14:19:48 +0800 Subject: [PATCH] refactor, support use embedded ledisdb now you can embed ledisdb as a lib to use. --- build_leveldb.sh | 2 +- dev.sh | 5 + ledis/app.go | 17 +--- ledis/client.go | 8 +- ledis/cmd_hash.go | 24 ++--- ledis/cmd_kv.go | 25 +++-- ledis/cmd_list.go | 16 +-- ledis/cmd_list_test.go | 10 -- ledis/cmd_zset.go | 91 ++++------------- ledis/db.go | 50 ++++++++++ ledis/lock.go | 27 ----- ledis/t_hash.go | 74 +++++++------- ledis/t_kv.go | 218 ++++++++++++++++++++++------------------- ledis/t_list.go | 172 ++++++++++++++++---------------- ledis/t_zset.go | 200 ++++++++++++++++++++++++++++--------- ledis/tx.go | 12 --- 16 files changed, 509 insertions(+), 442 deletions(-) create mode 100644 ledis/db.go delete mode 100644 ledis/lock.go diff --git a/build_leveldb.sh b/build_leveldb.sh index 1940e9a..df918d5 100644 --- a/build_leveldb.sh +++ b/build_leveldb.sh @@ -42,4 +42,4 @@ else echo "skip install leveldb" fi -cd $ROOT_DIR \ No newline at end of file +cd $ROOT_DIR diff --git a/dev.sh b/dev.sh index 3fee497..c1c59f7 100644 --- a/dev.sh +++ b/dev.sh @@ -31,6 +31,11 @@ export GOPATH=$(add_path $GOPATH $VTROOT) export CGO_CFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" export CGO_CXXFLAGS="-I$LEVELDB_DIR/include -I$SNAPPY_DIR/include" export CGO_LDFLAGS="-L$LEVELDB_DIR/lib -L$SNAPPY_DIR/lib -lsnappy" + +#for linux, use LD_LIBRARY_PATH export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $SNAPPY_DIR/lib) export LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $LEVELDB_DIR/lib) +#for macos, use DYLD_LIBRARY_PATH +export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $SNAPPY_DIR/lib) +export DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $LEVELDB_DIR/lib) diff --git a/ledis/app.go b/ledis/app.go index a0a6757..5cbdacd 100644 --- a/ledis/app.go +++ b/ledis/app.go @@ -1,7 +1,6 @@ package ledis import ( - "github.com/siddontang/go-leveldb/leveldb" "net" "strings" ) @@ -11,12 +10,7 @@ type App struct { listener net.Listener - db *leveldb.DB - - kvTx *tx - listTx *tx - hashTx *tx - zsetTx *tx + db *DB closed bool } @@ -40,16 +34,11 @@ func NewApp(cfg *Config) (*App, error) { return nil, err } - app.db, err = leveldb.OpenWithConfig(&cfg.DB) + app.db, err = OpenDBWithConfig(&cfg.DB) if err != nil { return nil, err } - app.kvTx = app.newTx() - app.listTx = app.newTx() - app.hashTx = app.newTx() - app.zsetTx = app.newTx() - return app, nil } @@ -72,6 +61,6 @@ func (app *App) Run() { continue } - newClient(conn, app) + newClient(conn, app.db) } } diff --git a/ledis/client.go b/ledis/client.go index c71d16a..2a57c53 100644 --- a/ledis/client.go +++ b/ledis/client.go @@ -14,8 +14,8 @@ import ( var errReadRequest = errors.New("invalid request protocol") type client struct { - app *App - c net.Conn + db *DB + c net.Conn rb *bufio.Reader wb *bufio.Writer @@ -26,9 +26,9 @@ type client struct { reqC chan error } -func newClient(c net.Conn, app *App) { +func newClient(c net.Conn, db *DB) { co := new(client) - co.app = app + co.db = db co.c = c co.rb = bufio.NewReaderSize(c, 256) diff --git a/ledis/cmd_hash.go b/ledis/cmd_hash.go index 86b424f..1e00854 100644 --- a/ledis/cmd_hash.go +++ b/ledis/cmd_hash.go @@ -8,7 +8,7 @@ func hsetCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.hash_set(args[0], args[1], args[2]); err != nil { + if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil { return err } else { c.writeInteger(n) @@ -23,7 +23,7 @@ func hgetCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.hash_get(args[0], args[1]); err != nil { + if v, err := c.db.HGet(args[0], args[1]); err != nil { return err } else { c.writeBulk(v) @@ -39,7 +39,7 @@ func hexistsCommand(c *client) error { } var n int64 = 1 - if v, err := c.app.hash_get(args[0], args[1]); err != nil { + if v, err := c.db.HGet(args[0], args[1]); err != nil { return err } else { if v == nil { @@ -57,7 +57,7 @@ func hdelCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.hash_del(args[0], args[1:]); err != nil { + if n, err := c.db.HDel(args[0], args[1:]); err != nil { return err } else { c.writeInteger(n) @@ -72,7 +72,7 @@ func hlenCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.hash_len(args[0]); err != nil { + if n, err := c.db.HLen(args[0]); err != nil { return err } else { c.writeInteger(n) @@ -93,7 +93,7 @@ func hincrbyCommand(c *client) error { } var n int64 - if n, err = c.app.hash_incrby(args[0], args[1], delta); err != nil { + if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil { return err } else { c.writeInteger(n) @@ -111,7 +111,7 @@ func hmsetCommand(c *client) error { return ErrCmdParams } - if err := c.app.hash_mset(args[0], args[1:]); err != nil { + if err := c.db.HMset(args[0], args[1:]); err != nil { return err } else { c.writeStatus(OK) @@ -126,7 +126,7 @@ func hmgetCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.hash_mget(args[0], args[1:]); err != nil { + if v, err := c.db.HMget(args[0], args[1:]); err != nil { return err } else { c.writeArray(v) @@ -141,7 +141,7 @@ func hgetallCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.hash_getall(args[0]); err != nil { + if v, err := c.db.HGetAll(args[0]); err != nil { return err } else { c.writeArray(v) @@ -156,7 +156,7 @@ func hkeysCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.hash_keys(args[0]); err != nil { + if v, err := c.db.HKeys(args[0]); err != nil { return err } else { c.writeArray(v) @@ -171,7 +171,7 @@ func hvalsCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.hash_values(args[0]); err != nil { + if v, err := c.db.HValues(args[0]); err != nil { return err } else { c.writeArray(v) @@ -186,7 +186,7 @@ func hclearCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.hash_clear(args[0]); err != nil { + if n, err := c.db.HClear(args[0]); err != nil { return err } else { c.writeInteger(n) diff --git a/ledis/cmd_kv.go b/ledis/cmd_kv.go index 381ec8c..754fb95 100644 --- a/ledis/cmd_kv.go +++ b/ledis/cmd_kv.go @@ -8,7 +8,7 @@ func getCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.kv_get(args[0]); err != nil { + if v, err := c.db.Get(args[0]); err != nil { return err } else { c.writeBulk(v) @@ -22,7 +22,7 @@ func setCommand(c *client) error { return ErrCmdParams } - if err := c.app.kv_set(args[0], args[1]); err != nil { + if err := c.db.Set(args[0], args[1]); err != nil { return err } else { c.writeStatus(OK) @@ -37,7 +37,7 @@ func getsetCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.kv_getset(args[0], args[1]); err != nil { + if v, err := c.db.GetSet(args[0], args[1]); err != nil { return err } else { c.writeBulk(v) @@ -52,7 +52,7 @@ func setnxCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.kv_setnx(args[0], args[1]); err != nil { + if n, err := c.db.SetNX(args[0], args[1]); err != nil { return err } else { c.writeInteger(n) @@ -67,7 +67,7 @@ func existsCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.kv_exists(args[0]); err != nil { + if n, err := c.db.Exists(args[0]); err != nil { return err } else { c.writeInteger(n) @@ -82,7 +82,7 @@ func incrCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.kv_incr(c.args[0], 1); err != nil { + if n, err := c.db.Incr(c.args[0]); err != nil { return err } else { c.writeInteger(n) @@ -97,7 +97,7 @@ func decrCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.kv_incr(c.args[0], -1); err != nil { + if n, err := c.db.Decr(c.args[0]); err != nil { return err } else { c.writeInteger(n) @@ -117,7 +117,7 @@ func incrbyCommand(c *client) error { return err } - if n, err := c.app.kv_incr(c.args[0], delta); err != nil { + if n, err := c.db.IncryBy(c.args[0], delta); err != nil { return err } else { c.writeInteger(n) @@ -137,7 +137,7 @@ func decrbyCommand(c *client) error { return err } - if n, err := c.app.kv_incr(c.args[0], -delta); err != nil { + if n, err := c.db.DecrBy(c.args[0], -delta); err != nil { return err } else { c.writeInteger(n) @@ -152,7 +152,7 @@ func delCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.tx_del(args); err != nil { + if n, err := c.db.Del(args); err != nil { return err } else { c.writeInteger(n) @@ -167,7 +167,7 @@ func msetCommand(c *client) error { return ErrCmdParams } - if err := c.app.tx_mset(args); err != nil { + if err := c.db.MSet(args); err != nil { return err } else { c.writeStatus(OK) @@ -186,7 +186,7 @@ func mgetCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.kv_mget(args); err != nil { + if v, err := c.db.MGet(args); err != nil { return err } else { c.writeArray(v) @@ -207,6 +207,5 @@ func init() { register("mget", mgetCommand) register("mset", msetCommand) register("set", setCommand) - // register("setex", setexCommand) register("setnx", setnxCommand) } diff --git a/ledis/cmd_list.go b/ledis/cmd_list.go index e41f520..213d432 100644 --- a/ledis/cmd_list.go +++ b/ledis/cmd_list.go @@ -8,7 +8,7 @@ func lpushCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.list_lpush(args[0], args[1:]); err != nil { + if n, err := c.db.LPush(args[0], args[1:]); err != nil { return err } else { c.writeInteger(n) @@ -23,7 +23,7 @@ func rpushCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.list_rpush(args[0], args[1:]); err != nil { + if n, err := c.db.RPush(args[0], args[1:]); err != nil { return err } else { c.writeInteger(n) @@ -38,7 +38,7 @@ func lpopCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.list_lpop(args[0]); err != nil { + if v, err := c.db.LPop(args[0]); err != nil { return err } else { c.writeBulk(v) @@ -53,7 +53,7 @@ func rpopCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.list_rpop(args[0]); err != nil { + if v, err := c.db.RPop(args[0]); err != nil { return err } else { c.writeBulk(v) @@ -68,7 +68,7 @@ func llenCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.list_len(args[0]); err != nil { + if n, err := c.db.LLen(args[0]); err != nil { return err } else { c.writeInteger(n) @@ -88,7 +88,7 @@ func lindexCommand(c *client) error { return err } - if v, err := c.app.list_index(args[0], int32(index)); err != nil { + if v, err := c.db.LIndex(args[0], int32(index)); err != nil { return err } else { c.writeBulk(v) @@ -117,7 +117,7 @@ func lrangeCommand(c *client) error { return err } - if v, err := c.app.list_range(args[0], int32(start), int32(stop)); err != nil { + if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil { return err } else { c.writeArray(v) @@ -132,7 +132,7 @@ func lclearCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.list_clear(args[0]); err != nil { + if n, err := c.db.LClear(args[0]); err != nil { return err } else { c.writeInteger(n) diff --git a/ledis/cmd_list_test.go b/ledis/cmd_list_test.go index 33b7455..7670a4a 100644 --- a/ledis/cmd_list_test.go +++ b/ledis/cmd_list_test.go @@ -53,16 +53,6 @@ func testListRange(key []byte, start int64, stop int64, checkValues ...int) erro return nil } -func testPrintList(key []byte) { - it := testApp.db.Iterator(encode_list_key(key, listMinSeq), - encode_list_key(key, listMaxSeq), 0, 0, -1) - for ; it.Valid(); it.Next() { - k, seq, _ := decode_list_key(it.Key()) - println(string(k), "seq ", seq, "value:", string(it.Value())) - } - println("end ---------------------") -} - func TestList(t *testing.T) { startTestApp() diff --git a/ledis/cmd_zset.go b/ledis/cmd_zset.go index 4f84e24..9e21be1 100644 --- a/ledis/cmd_zset.go +++ b/ledis/cmd_zset.go @@ -9,11 +9,6 @@ import ( //for simple implementation, we only support int64 score -const ( - MinScore int64 = -1<<63 + 1 - MaxScore int64 = 1<<63 - 1 -) - var errScoreOverflow = errors.New("zset score overflow") func zaddCommand(c *client) error { @@ -39,7 +34,7 @@ func zaddCommand(c *client) error { params[i+1] = args[i+1] } - if n, err := c.app.zset_add(key, params); err != nil { + if n, err := c.db.ZAdd(key, params); err != nil { return err } else { c.writeInteger(n) @@ -54,7 +49,7 @@ func zcardCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.zset_card(args[0]); err != nil { + if n, err := c.db.ZCard(args[0]); err != nil { return err } else { c.writeInteger(n) @@ -69,7 +64,7 @@ func zscoreCommand(c *client) error { return ErrCmdParams } - if v, err := c.app.zset_score(args[0], args[1]); err != nil { + if v, err := c.db.ZScore(args[0], args[1]); err != nil { return err } else { c.writeBulk(v) @@ -84,7 +79,7 @@ func zremCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.zset_rem(args[0], args[1:]); err != nil { + if n, err := c.db.ZRem(args[0], args[1:]); err != nil { return err } else { c.writeInteger(n) @@ -106,7 +101,7 @@ func zincrbyCommand(c *client) error { return err } - if v, err := c.app.zset_incrby(key, delta, args[2]); err != nil { + if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil { return err } else { c.writeBulk(v) @@ -193,7 +188,7 @@ func zcountCommand(c *client) error { return nil } - if n, err := c.app.zset_count(args[0], min, max); err != nil { + if n, err := c.db.ZCount(args[0], min, max); err != nil { return err } else { c.writeInteger(n) @@ -208,7 +203,7 @@ func zrankCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.zset_rank(args[0], args[1], false); err != nil { + if n, err := c.db.ZRank(args[0], args[1]); err != nil { return err } else if n == -1 { c.writeBulk(nil) @@ -225,7 +220,7 @@ func zrevrankCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.zset_rank(args[0], args[1], true); err != nil { + if n, err := c.db.ZRevRank(args[0], args[1]); err != nil { return err } else if n == -1 { c.writeBulk(nil) @@ -244,17 +239,12 @@ func zremrangebyrankCommand(c *client) error { key := args[0] - offset, limit, err := zparseRange(c, key, args[1], args[2]) + start, stop, err := zparseRange(c, args[1], args[2]) if err != nil { return err } - if offset < 0 { - c.writeInteger(0) - return nil - } - - if n, err := c.app.zset_remRange(key, MinScore, MaxScore, offset, limit); err != nil { + if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil { return err } else { c.writeInteger(n) @@ -275,7 +265,7 @@ func zremrangebyscoreCommand(c *client) error { return err } - if n, err := c.app.zset_remRange(key, min, max, 0, -1); err != nil { + if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil { return err } else { c.writeInteger(n) @@ -284,51 +274,15 @@ func zremrangebyscoreCommand(c *client) error { return nil } -func zparseRange(c *client, key []byte, startBuf []byte, stopBuf []byte) (offset int, limit int, err error) { - var start int - var stop int - if start, err = strconv.Atoi(String(startBuf)); err != nil { +func zparseRange(c *client, a1 []byte, a2 []byte) (start int, stop int, err error) { + if start, err = strconv.Atoi(String(a1)); err != nil { return } - if stop, err = strconv.Atoi(String(stopBuf)); err != nil { + if stop, err = strconv.Atoi(String(a2)); err != nil { return } - if start < 0 || stop < 0 { - //refer redis implementation - var size int64 - size, err = c.app.zset_card(key) - if err != nil { - return - } - - llen := int(size) - - if start < 0 { - start = llen + start - } - if stop < 0 { - stop = llen + stop - } - - if start < 0 { - start = 0 - } - - if start >= llen { - offset = -1 - return - } - } - - if start > stop { - offset = -1 - return - } - - offset = start - limit = (stop - start) + 1 return } @@ -340,16 +294,11 @@ func zrangeGeneric(c *client, reverse bool) error { key := args[0] - offset, limit, err := zparseRange(c, key, args[1], args[2]) + start, stop, err := zparseRange(c, args[1], args[2]) if err != nil { return err } - if offset < 0 { - c.writeArray([]interface{}{}) - return nil - } - args = args[3:] var withScores bool = false @@ -357,7 +306,7 @@ func zrangeGeneric(c *client, reverse bool) error { withScores = true } - if v, err := c.app.zset_range(key, MinScore, MaxScore, withScores, offset, limit, reverse); err != nil { + if v, err := c.db.ZRangeGeneric(key, start, stop, withScores, reverse); err != nil { return err } else { c.writeArray(v) @@ -395,7 +344,7 @@ func zrangebyscoreGeneric(c *client, reverse bool) error { } var offset int = 0 - var limit int = -1 + var count int = -1 if len(args) > 0 { if len(args) != 3 { @@ -410,7 +359,7 @@ func zrangebyscoreGeneric(c *client, reverse bool) error { return ErrCmdParams } - if limit, err = strconv.Atoi(String(args[2])); err != nil { + if count, err = strconv.Atoi(String(args[2])); err != nil { return ErrCmdParams } } @@ -422,7 +371,7 @@ func zrangebyscoreGeneric(c *client, reverse bool) error { return nil } - if v, err := c.app.zset_range(key, min, max, withScores, offset, limit, reverse); err != nil { + if v, err := c.db.ZRangeByScoreGeneric(key, min, max, withScores, offset, count, reverse); err != nil { return err } else { c.writeArray(v) @@ -445,7 +394,7 @@ func zclearCommand(c *client) error { return ErrCmdParams } - if n, err := c.app.zset_clear(args[0]); err != nil { + if n, err := c.db.ZClear(args[0]); err != nil { return err } else { c.writeInteger(n) diff --git a/ledis/db.go b/ledis/db.go new file mode 100644 index 0000000..4973722 --- /dev/null +++ b/ledis/db.go @@ -0,0 +1,50 @@ +package ledis + +import ( + "encoding/json" + "github.com/siddontang/go-leveldb/leveldb" +) + +type DB struct { + db *leveldb.DB + + kvTx *tx + listTx *tx + hashTx *tx + zsetTx *tx +} + +func OpenDB(configJson json.RawMessage) (*DB, error) { + db, err := leveldb.Open(configJson) + if err != nil { + return nil, err + } + + return newDB(db) +} + +func OpenDBWithConfig(cfg *leveldb.Config) (*DB, error) { + db, err := leveldb.OpenWithConfig(cfg) + if err != nil { + return nil, err + } + + return newDB(db) +} + +func newDB(db *leveldb.DB) (*DB, error) { + d := new(DB) + + d.db = db + + d.kvTx = &tx{wb: db.NewWriteBatch()} + d.listTx = &tx{wb: db.NewWriteBatch()} + d.hashTx = &tx{wb: db.NewWriteBatch()} + d.zsetTx = &tx{wb: db.NewWriteBatch()} + + return d, nil +} + +func (db *DB) Close() { + db.db.Close() +} diff --git a/ledis/lock.go b/ledis/lock.go deleted file mode 100644 index 8dfd4cd..0000000 --- a/ledis/lock.go +++ /dev/null @@ -1,27 +0,0 @@ -package ledis - -import ( - "hash/crc32" - "sync" -) - -type keyMutex struct { - mutexs []*sync.Mutex -} - -func newKeyMutex(size int) *keyMutex { - m := new(keyMutex) - - m.mutexs = make([]*sync.Mutex, size) - - for i := range m.mutexs { - m.mutexs[i] = &sync.Mutex{} - } - - return m -} - -func (k *keyMutex) Get(key []byte) *sync.Mutex { - h := int(crc32.ChecksumIEEE(key)) - return k.mutexs[h%len(k.mutexs)] -} diff --git a/ledis/t_hash.go b/ledis/t_hash.go index 55a2568..daedfc2 100644 --- a/ledis/t_hash.go +++ b/ledis/t_hash.go @@ -87,20 +87,20 @@ func decode_hash_key(ek []byte) ([]byte, []byte, error) { return key, field, nil } -func (a *App) hash_len(key []byte) (int64, error) { - return Int64(a.db.Get(encode_hsize_key(key))) +func (db *DB) HLen(key []byte) (int64, error) { + return Int64(db.db.Get(encode_hsize_key(key))) } -func (a *App) hash_setItem(key []byte, field []byte, value []byte) (int64, error) { - t := a.hashTx +func (db *DB) hSetItem(key []byte, field []byte, value []byte) (int64, error) { + t := db.hashTx ek := encode_hash_key(key, field) var n int64 = 1 - if v, _ := a.db.Get(ek); v != nil { + if v, _ := db.db.Get(ek); v != nil { n = 0 } else { - if _, err := a.hash_incrSize(key, 1); err != nil { + if _, err := db.hIncrSize(key, 1); err != nil { return 0, err } } @@ -109,12 +109,12 @@ func (a *App) hash_setItem(key []byte, field []byte, value []byte) (int64, error return n, nil } -func (a *App) hash_set(key []byte, field []byte, value []byte) (int64, error) { - t := a.hashTx +func (db *DB) HSet(key []byte, field []byte, value []byte) (int64, error) { + t := db.hashTx t.Lock() defer t.Unlock() - n, err := a.hash_setItem(key, field, value) + n, err := db.hSetItem(key, field, value) if err != nil { return 0, err } @@ -125,26 +125,26 @@ func (a *App) hash_set(key []byte, field []byte, value []byte) (int64, error) { return n, err } -func (a *App) hash_get(key []byte, field []byte) ([]byte, error) { - return a.db.Get(encode_hash_key(key, field)) +func (db *DB) HGet(key []byte, field []byte) ([]byte, error) { + return db.db.Get(encode_hash_key(key, field)) } -func (a *App) hash_mset(key []byte, args [][]byte) error { - t := a.hashTx +func (db *DB) HMset(key []byte, args [][]byte) error { + t := db.hashTx t.Lock() defer t.Unlock() var num int64 = 0 for i := 0; i < len(args); i += 2 { ek := encode_hash_key(key, args[i]) - if v, _ := a.db.Get(ek); v == nil { + if v, _ := db.db.Get(ek); v == nil { num++ } t.Put(ek, args[i+1]) } - if _, err := a.hash_incrSize(key, num); err != nil { + if _, err := db.hIncrSize(key, num); err != nil { return err } @@ -153,10 +153,10 @@ func (a *App) hash_mset(key []byte, args [][]byte) error { return err } -func (a *App) hash_mget(key []byte, args [][]byte) ([]interface{}, error) { +func (db *DB) HMget(key []byte, args [][]byte) ([]interface{}, error) { r := make([]interface{}, len(args)) for i := 0; i < len(args); i++ { - v, err := a.db.Get(encode_hash_key(key, args[i])) + v, err := db.db.Get(encode_hash_key(key, args[i])) if err != nil { return nil, err } @@ -167,15 +167,15 @@ func (a *App) hash_mget(key []byte, args [][]byte) ([]interface{}, error) { return r, nil } -func (a *App) hash_del(key []byte, args [][]byte) (int64, error) { - t := a.hashTx +func (db *DB) HDel(key []byte, args [][]byte) (int64, error) { + t := db.hashTx t.Lock() defer t.Unlock() var num int64 = 0 for i := 0; i < len(args); i++ { ek := encode_hash_key(key, args[i]) - if v, err := a.db.Get(ek); err != nil { + if v, err := db.db.Get(ek); err != nil { return 0, err } else if v == nil { continue @@ -185,7 +185,7 @@ func (a *App) hash_del(key []byte, args [][]byte) (int64, error) { } } - if _, err := a.hash_incrSize(key, -num); err != nil { + if _, err := db.hIncrSize(key, -num); err != nil { return 0, err } @@ -194,10 +194,10 @@ func (a *App) hash_del(key []byte, args [][]byte) (int64, error) { return num, err } -func (a *App) hash_incrSize(key []byte, delta int64) (int64, error) { - t := a.hashTx +func (db *DB) hIncrSize(key []byte, delta int64) (int64, error) { + t := db.hashTx sk := encode_hsize_key(key) - size, err := Int64(a.db.Get(sk)) + size, err := Int64(db.db.Get(sk)) if err != nil { return 0, err } else { @@ -213,22 +213,22 @@ func (a *App) hash_incrSize(key []byte, delta int64) (int64, error) { return size, nil } -func (a *App) hash_incrby(key []byte, field []byte, delta int64) (int64, error) { - t := a.hashTx +func (db *DB) HIncrBy(key []byte, field []byte, delta int64) (int64, error) { + t := db.hashTx t.Lock() defer t.Unlock() ek := encode_hash_key(key, field) var n int64 = 0 - n, err := StrInt64(a.db.Get(ek)) + n, err := StrInt64(db.db.Get(ek)) if err != nil { return 0, err } n += delta - _, err = a.hash_setItem(key, field, StrPutInt64(n)) + _, err = db.hSetItem(key, field, StrPutInt64(n)) if err != nil { return 0, err } @@ -238,13 +238,13 @@ func (a *App) hash_incrby(key []byte, field []byte, delta int64) (int64, error) return n, err } -func (a *App) hash_getall(key []byte) ([]interface{}, error) { +func (db *DB) HGetAll(key []byte) ([]interface{}, error) { start := encode_hash_start_key(key) stop := encode_hash_stop_key(key) v := make([]interface{}, 0, 16) - it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, k, err := decode_hash_key(it.Key()) if err != nil { @@ -259,13 +259,13 @@ func (a *App) hash_getall(key []byte) ([]interface{}, error) { return v, nil } -func (a *App) hash_keys(key []byte) ([]interface{}, error) { +func (db *DB) HKeys(key []byte) ([]interface{}, error) { start := encode_hash_start_key(key) stop := encode_hash_stop_key(key) v := make([]interface{}, 0, 16) - it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, k, err := decode_hash_key(it.Key()) if err != nil { @@ -279,13 +279,13 @@ func (a *App) hash_keys(key []byte) ([]interface{}, error) { return v, nil } -func (a *App) hash_values(key []byte) ([]interface{}, error) { +func (db *DB) HValues(key []byte) ([]interface{}, error) { start := encode_hash_start_key(key) stop := encode_hash_stop_key(key) v := make([]interface{}, 0, 16) - it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { v = append(v, it.Value()) } @@ -295,10 +295,10 @@ func (a *App) hash_values(key []byte) ([]interface{}, error) { return v, nil } -func (a *App) hash_clear(key []byte) (int64, error) { +func (db *DB) HClear(key []byte) (int64, error) { sk := encode_hsize_key(key) - t := a.hashTx + t := db.hashTx t.Lock() defer t.Unlock() @@ -306,7 +306,7 @@ func (a *App) hash_clear(key []byte) (int64, error) { stop := encode_hash_stop_key(key) var num int64 = 0 - it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) + it := db.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) num++ diff --git a/ledis/t_kv.go b/ledis/t_kv.go index 2fd146f..bf273fb 100644 --- a/ledis/t_kv.go +++ b/ledis/t_kv.go @@ -21,101 +21,17 @@ func decode_kv_key(ek []byte) ([]byte, error) { return ek[1:], nil } -func (a *App) kv_get(key []byte) ([]byte, error) { - key = encode_kv_key(key) - - return a.db.Get(key) -} - -func (a *App) kv_set(key []byte, value []byte) error { +func (db *DB) incr(key []byte, delta int64) (int64, error) { key = encode_kv_key(key) var err error - t := a.kvTx - - t.Lock() - defer t.Unlock() - - t.Put(key, value) - - //todo, binlog - - err = t.Commit() - - return err -} - -func (a *App) kv_getset(key []byte, value []byte) ([]byte, error) { - key = encode_kv_key(key) - - t := a.kvTx - - t.Lock() - defer t.Unlock() - - oldValue, err := a.db.Get(key) - if err != nil { - return nil, err - } - - t.Put(key, value) - //todo, binlog - - err = t.Commit() - - return oldValue, err -} - -func (a *App) kv_setnx(key []byte, value []byte) (int64, error) { - key = encode_kv_key(key) - var err error - - var n int64 = 1 - - t := a.kvTx - - t.Lock() - defer t.Unlock() - - if v, err := a.db.Get(key); err != nil { - return 0, err - } else if v != nil { - n = 0 - } else { - t.Put(key, value) - - //todo binlog - - err = t.Commit() - } - - return n, err -} - -func (a *App) kv_exists(key []byte) (int64, error) { - key = encode_kv_key(key) - var err error - - var v []byte - v, err = a.db.Get(key) - if v != nil && err == nil { - return 1, nil - } else { - return 0, err - } -} - -func (a *App) kv_incr(key []byte, delta int64) (int64, error) { - key = encode_kv_key(key) - var err error - - t := a.kvTx + t := db.kvTx t.Lock() defer t.Unlock() var n int64 - n, err = StrInt64(a.db.Get(key)) + n, err = StrInt64(db.db.Get(key)) if err != nil { return 0, err } @@ -130,12 +46,20 @@ func (a *App) kv_incr(key []byte, delta int64) (int64, error) { return n, err } -func (a *App) tx_del(keys [][]byte) (int64, error) { +func (db *DB) Decr(key []byte) (int64, error) { + return db.incr(key, -1) +} + +func (db *DB) DecrBy(key []byte, decrement int64) (int64, error) { + return db.incr(key, decrement) +} + +func (db *DB) Del(keys [][]byte) (int64, error) { for i := range keys { keys[i] = encode_kv_key(keys[i]) } - t := a.kvTx + t := db.kvTx t.Lock() defer t.Unlock() @@ -149,8 +73,72 @@ func (a *App) tx_del(keys [][]byte) (int64, error) { return int64(len(keys)), err } -func (a *App) tx_mset(args [][]byte) error { - t := a.kvTx +func (db *DB) Exists(key []byte) (int64, error) { + key = encode_kv_key(key) + var err error + + var v []byte + v, err = db.db.Get(key) + if v != nil && err == nil { + return 1, nil + } + + return 0, err +} + +func (db *DB) Get(key []byte) ([]byte, error) { + key = encode_kv_key(key) + + return db.db.Get(key) +} + +func (db *DB) GetSet(key []byte, value []byte) ([]byte, error) { + key = encode_kv_key(key) + + t := db.kvTx + + t.Lock() + defer t.Unlock() + + oldValue, err := db.db.Get(key) + if err != nil { + return nil, err + } + + t.Put(key, value) + //todo, binlog + + err = t.Commit() + + return oldValue, err +} + +func (db *DB) Incr(key []byte) (int64, error) { + return db.incr(key, 1) +} + +func (db *DB) IncryBy(key []byte, increment int64) (int64, error) { + return db.incr(key, increment) +} + +func (db *DB) MGet(keys [][]byte) ([]interface{}, error) { + values := make([]interface{}, len(keys)) + + for i := range keys { + key := encode_kv_key(keys[i]) + value, err := db.db.Get(key) + if err != nil { + return nil, err + } + + values[i] = value + } + + return values, nil +} + +func (db *DB) MSet(args [][]byte) error { + t := db.kvTx t.Lock() defer t.Unlock() @@ -168,18 +156,46 @@ func (a *App) tx_mset(args [][]byte) error { return err } -func (a *App) kv_mget(args [][]byte) ([]interface{}, error) { - values := make([]interface{}, len(args)) +func (db *DB) Set(key []byte, value []byte) error { + key = encode_kv_key(key) + var err error - for i := range args { - key := encode_kv_key(args[i]) - value, err := a.db.Get(key) - if err != nil { - return nil, err - } + t := db.kvTx - values[i] = value + t.Lock() + defer t.Unlock() + + t.Put(key, value) + + //todo, binlog + + err = t.Commit() + + return err +} + +func (db *DB) SetNX(key []byte, value []byte) (int64, error) { + key = encode_kv_key(key) + var err error + + var n int64 = 1 + + t := db.kvTx + + t.Lock() + defer t.Unlock() + + if v, err := db.db.Get(key); err != nil { + return 0, err + } else if v != nil { + n = 0 + } else { + t.Put(key, value) + + //todo binlog + + err = t.Commit() } - return values, nil + return n, err } diff --git a/ledis/t_list.go b/ledis/t_list.go index af6f484..270331f 100644 --- a/ledis/t_list.go +++ b/ledis/t_list.go @@ -70,69 +70,13 @@ func decode_list_key(ek []byte) (key []byte, seq int32, err error) { return } -func (a *App) list_lpush(key []byte, args [][]byte) (int64, error) { - return a.list_push(key, args, listHeadSeq) -} - -func (a *App) list_rpush(key []byte, args [][]byte) (int64, error) { - return a.list_push(key, args, listTailSeq) -} - -func (a *App) list_lpop(key []byte) ([]byte, error) { - return a.list_pop(key, listHeadSeq) -} - -func (a *App) list_rpop(key []byte) ([]byte, error) { - return a.list_pop(key, listTailSeq) -} - -func (a *App) list_getSeq(key []byte, whereSeq int32) (int64, error) { - ek := encode_list_key(key, whereSeq) - - return Int64(a.db.Get(ek)) -} - -func (a *App) list_getMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { - var v []byte - v, err = a.db.Get(ek) - if err != nil { - return - } else if v == nil { - size = 0 - return - } else { - headSeq = int32(binary.LittleEndian.Uint32(v[0:4])) - tailSeq = int32(binary.LittleEndian.Uint32(v[4:8])) - size = int32(binary.LittleEndian.Uint32(v[8:])) - } - return -} - -func (a *App) list_setMeta(ek []byte, headSeq int32, tailSeq int32, size int32) { - t := a.listTx - - buf := make([]byte, 12) - - binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq)) - binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq)) - binary.LittleEndian.PutUint32(buf[8:], uint32(size)) - - t.Put(ek, buf) -} - -func (a *App) list_len(key []byte) (int64, error) { - ek := encode_lmeta_key(key) - _, _, size, err := a.list_getMeta(ek) - return int64(size), err -} - -func (a *App) list_push(key []byte, args [][]byte, whereSeq int32) (int64, error) { - t := a.listTx +func (db *DB) lpush(key []byte, args [][]byte, whereSeq int32) (int64, error) { + t := db.listTx t.Lock() defer t.Unlock() metaKey := encode_lmeta_key(key) - headSeq, tailSeq, size, err := a.list_getMeta(metaKey) + headSeq, tailSeq, size, err := db.lGetMeta(metaKey) if err != nil { return 0, err @@ -174,20 +118,20 @@ func (a *App) list_push(key []byte, args [][]byte, whereSeq int32) (int64, error tailSeq = seq } - a.list_setMeta(metaKey, headSeq, tailSeq, size) + db.lSetMeta(metaKey, headSeq, tailSeq, size) err = t.Commit() return int64(size), err } -func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) { - t := a.listTx +func (db *DB) lpop(key []byte, whereSeq int32) ([]byte, error) { + t := db.listTx t.Lock() defer t.Unlock() metaKey := encode_lmeta_key(key) - headSeq, tailSeq, size, err := a.list_getMeta(metaKey) + headSeq, tailSeq, size, err := db.lGetMeta(metaKey) if err != nil { return nil, err @@ -204,7 +148,7 @@ func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) { itemKey := encode_list_key(key, seq) var value []byte - value, err = a.db.Get(itemKey) + value, err = db.db.Get(itemKey) if err != nil { return nil, err } @@ -222,7 +166,7 @@ func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) { tailSeq = seq } - a.list_setMeta(metaKey, headSeq, tailSeq, size) + db.lSetMeta(metaKey, headSeq, tailSeq, size) } //todo add binlog @@ -230,7 +174,71 @@ func (a *App) list_pop(key []byte, whereSeq int32) ([]byte, error) { return value, err } -func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, error) { +func (db *DB) lGetSeq(key []byte, whereSeq int32) (int64, error) { + ek := encode_list_key(key, whereSeq) + + return Int64(db.db.Get(ek)) +} + +func (db *DB) lGetMeta(ek []byte) (headSeq int32, tailSeq int32, size int32, err error) { + var v []byte + v, err = db.db.Get(ek) + if err != nil { + return + } else if v == nil { + size = 0 + return + } else { + headSeq = int32(binary.LittleEndian.Uint32(v[0:4])) + tailSeq = int32(binary.LittleEndian.Uint32(v[4:8])) + size = int32(binary.LittleEndian.Uint32(v[8:])) + } + return +} + +func (db *DB) lSetMeta(ek []byte, headSeq int32, tailSeq int32, size int32) { + t := db.listTx + + buf := make([]byte, 12) + + binary.LittleEndian.PutUint32(buf[0:4], uint32(headSeq)) + binary.LittleEndian.PutUint32(buf[4:8], uint32(tailSeq)) + binary.LittleEndian.PutUint32(buf[8:], uint32(size)) + + t.Put(ek, buf) +} + +func (db *DB) LIndex(key []byte, index int32) ([]byte, error) { + var seq int32 + headSeq, tailSeq, _, err := db.lGetMeta(encode_lmeta_key(key)) + if err != nil { + return nil, err + } + + if index >= 0 { + seq = headSeq + index + } else { + seq = tailSeq + index + 1 + } + + return db.db.Get(encode_list_key(key, seq)) +} + +func (db *DB) LLen(key []byte) (int64, error) { + ek := encode_lmeta_key(key) + _, _, size, err := db.lGetMeta(ek) + return int64(size), err +} + +func (db *DB) LPop(key []byte) ([]byte, error) { + return db.lpop(key, listHeadSeq) +} + +func (db *DB) LPush(key []byte, args [][]byte) (int64, error) { + return db.lpush(key, args, listHeadSeq) +} + +func (db *DB) LRange(key []byte, start int32, stop int32) ([]interface{}, error) { v := make([]interface{}, 0, 16) var startSeq int32 @@ -240,7 +248,7 @@ func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, er return []interface{}{}, nil } - headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key)) + headSeq, tailSeq, _, err := db.lGetMeta(encode_lmeta_key(key)) if err != nil { return nil, err } @@ -263,7 +271,7 @@ func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, er stopSeq = listMaxSeq } - it := a.db.Iterator(encode_list_key(key, startSeq), + it := db.db.Iterator(encode_list_key(key, startSeq), encode_list_key(key, stopSeq), leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { v = append(v, it.Value()) @@ -274,38 +282,30 @@ func (a *App) list_range(key []byte, start int32, stop int32) ([]interface{}, er return v, nil } -func (a *App) list_index(key []byte, index int32) ([]byte, error) { - var seq int32 - headSeq, tailSeq, _, err := a.list_getMeta(encode_lmeta_key(key)) - if err != nil { - return nil, err - } - - if index >= 0 { - seq = headSeq + index - } else { - seq = tailSeq + index + 1 - } - - return a.db.Get(encode_list_key(key, seq)) +func (db *DB) RPop(key []byte) ([]byte, error) { + return db.lpop(key, listTailSeq) } -func (a *App) list_clear(key []byte) (int64, error) { +func (db *DB) RPush(key []byte, args [][]byte) (int64, error) { + return db.lpush(key, args, listTailSeq) +} + +func (db *DB) LClear(key []byte) (int64, error) { mk := encode_lmeta_key(key) - t := a.listTx + t := db.listTx t.Lock() defer t.Unlock() metaKey := encode_lmeta_key(key) - headSeq, tailSeq, _, err := a.list_getMeta(metaKey) + headSeq, tailSeq, _, err := db.lGetMeta(metaKey) if err != nil { return 0, err } var num int64 = 0 - it := a.db.Iterator(encode_list_key(key, headSeq), + it := db.db.Iterator(encode_list_key(key, headSeq), encode_list_key(key, tailSeq), leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { t.Delete(it.Key()) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index ea0fae0..b46d397 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -8,6 +8,11 @@ import ( "strconv" ) +const ( + MinScore int64 = -1<<63 + 1 + MaxScore int64 = 1<<63 - 1 +) + var errZSizeKey = errors.New("invalid zsize key") var errZSetKey = errors.New("invalid zset key") var errZScoreKey = errors.New("invalid zscore key") @@ -146,16 +151,16 @@ func decode_zscore_key(ek []byte) (key []byte, member []byte, score int64, err e return } -func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error) { +func (db *DB) zSetItem(key []byte, score int64, member []byte) (int64, error) { if score <= MinScore || score >= MaxScore { return 0, errScoreOverflow } - t := a.zsetTx + t := db.zsetTx var exists int64 = 0 ek := encode_zset_key(key, member) - if v, err := a.db.Get(ek); err != nil { + if v, err := db.db.Get(ek); err != nil { return 0, err } else if v != nil { exists = 1 @@ -176,11 +181,11 @@ func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error return exists, nil } -func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64, error) { - t := a.zsetTx +func (db *DB) zDelItem(key []byte, member []byte, skipDelScore bool) (int64, error) { + t := db.zsetTx ek := encode_zset_key(key, member) - if v, err := a.db.Get(ek); err != nil { + if v, err := db.db.Get(ek); err != nil { return 0, err } else if v == nil { //not exists @@ -202,8 +207,8 @@ func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64, return 1, nil } -func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { - t := a.zsetTx +func (db *DB) ZAdd(key []byte, args []interface{}) (int64, error) { + t := db.zsetTx t.Lock() defer t.Unlock() @@ -212,7 +217,7 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { score := args[i].(int64) member := args[i+1].([]byte) - if n, err := a.zset_setItem(key, score, member); err != nil { + if n, err := db.zSetItem(key, score, member); err != nil { return 0, err } else if n == 0 { //add new @@ -220,7 +225,7 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { } } - if _, err := a.zset_incrSize(key, num); err != nil { + if _, err := db.zIncrSize(key, num); err != nil { return 0, err } @@ -229,10 +234,10 @@ func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { return num, err } -func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { - t := a.zsetTx +func (db *DB) zIncrSize(key []byte, delta int64) (int64, error) { + t := db.zsetTx sk := encode_zsize_key(key) - size, err := Int64(a.db.Get(sk)) + size, err := Int64(db.db.Get(sk)) if err != nil { return 0, err } else { @@ -248,15 +253,15 @@ func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { return size, nil } -func (a *App) zset_card(key []byte) (int64, error) { +func (db *DB) ZCard(key []byte) (int64, error) { sk := encode_zsize_key(key) - size, err := Int64(a.db.Get(sk)) + size, err := Int64(db.db.Get(sk)) return size, err } -func (a *App) zset_score(key []byte, member []byte) ([]byte, error) { +func (db *DB) ZScore(key []byte, member []byte) ([]byte, error) { k := encode_zset_key(key, member) - score, err := Int64(a.db.Get(k)) + score, err := Int64(db.db.Get(k)) if err != nil { return nil, err } @@ -264,21 +269,21 @@ func (a *App) zset_score(key []byte, member []byte) ([]byte, error) { return Slice(strconv.FormatInt(score, 10)), nil } -func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) { - t := a.zsetTx +func (db *DB) ZRem(key []byte, args [][]byte) (int64, error) { + t := db.zsetTx t.Lock() defer t.Unlock() var num int64 = 0 for i := 0; i < len(args); i++ { - if n, err := a.zset_delItem(key, args[i], false); err != nil { + if n, err := db.zDelItem(key, args[i], false); err != nil { return 0, err } else if n == 1 { num++ } } - if _, err := a.zset_incrSize(key, -num); err != nil { + if _, err := db.zIncrSize(key, -num); err != nil { return 0, err } @@ -286,14 +291,14 @@ func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) { return num, err } -func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error) { - t := a.zsetTx +func (db *DB) ZIncrBy(key []byte, delta int64, member []byte) ([]byte, error) { + t := db.zsetTx t.Lock() defer t.Unlock() ek := encode_zset_key(key, member) var score int64 = delta - v, err := a.db.Get(ek) + v, err := db.db.Get(ek) if err != nil { return nil, err } else if v != nil { @@ -310,7 +315,7 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error } } } else { - a.zset_incrSize(key, 1) + db.zIncrSize(key, 1) } t.Put(ek, PutInt64(score)) @@ -321,13 +326,13 @@ func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error return Slice(strconv.FormatInt(score, 10)), err } -func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) { +func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) { minKey := encode_start_zscore_key(key, min) maxKey := encode_stop_zscore_key(key, max) rangeType := leveldb.RangeROpen - it := a.db.Iterator(minKey, maxKey, rangeType, 0, -1) + it := db.db.Iterator(minKey, maxKey, rangeType, 0, -1) var n int64 = 0 for ; it.Valid(); it.Next() { n++ @@ -337,10 +342,10 @@ func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) { return n, nil } -func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error) { +func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) { k := encode_zset_key(key, member) - if v, err := a.db.Get(k); err != nil { + if v, err := db.db.Get(k); err != nil { return 0, err } else if v == nil { return -1, nil @@ -354,10 +359,10 @@ func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error) if !reverse { minKey := encode_start_zscore_key(key, MinScore) - it = a.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1) + it = db.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1) } else { maxKey := encode_stop_zscore_key(key, MaxScore) - it = a.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1) + it = db.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1) } var lastKey []byte = nil @@ -381,23 +386,23 @@ func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error) return -1, nil } -func (a *App) zset_iterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator { +func (db *DB) zIterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator { minKey := encode_start_zscore_key(key, min) maxKey := encode_stop_zscore_key(key, max) if !reverse { - return a.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit) + return db.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit) } else { - return a.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) + return db.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) } } -func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit int) (int64, error) { - t := a.zsetTx +func (db *DB) zRemRange(key []byte, min int64, max int64, offset int, limit int) (int64, error) { + t := db.zsetTx t.Lock() defer t.Unlock() - it := a.zset_iterator(key, min, max, offset, limit, false) + it := db.zIterator(key, min, max, offset, limit, false) var num int64 = 0 for ; it.Valid(); it.Next() { k := it.Key() @@ -406,7 +411,7 @@ func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit continue } - if n, err := a.zset_delItem(key, m, true); err != nil { + if n, err := db.zDelItem(key, m, true); err != nil { return 0, err } else if n == 1 { num++ @@ -415,7 +420,7 @@ func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit t.Delete(k) } - if _, err := a.zset_incrSize(key, -num); err != nil { + if _, err := db.zIncrSize(key, -num); err != nil { return 0, err } @@ -425,7 +430,7 @@ func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit return num, err } -func (a *App) zset_reverse(s []interface{}, withScores bool) []interface{} { +func (db *DB) zReverse(s []interface{}, withScores bool) []interface{} { if withScores { for i, j := 0, len(s)-2; i < j; i, j = i+2, j-2 { s[i], s[j] = s[j], s[i] @@ -440,7 +445,11 @@ func (a *App) zset_reverse(s []interface{}, withScores bool) []interface{} { return s } -func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offset int, limit int, reverse bool) ([]interface{}, error) { +func (db *DB) zRange(key []byte, min int64, max int64, withScores bool, offset int, limit int, reverse bool) ([]interface{}, error) { + if offset < 0 { + return []interface{}{}, nil + } + nv := 64 if limit > 0 { nv = limit @@ -455,9 +464,9 @@ func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offs //if reverse and offset is 0, limit < 0, we may use forward iterator then reverse //because leveldb iterator prev is slower than next if !reverse || (offset == 0 && limit < 0) { - it = a.zset_iterator(key, min, max, offset, limit, false) + it = db.zIterator(key, min, max, offset, limit, false) } else { - it = a.zset_iterator(key, min, max, offset, limit, true) + it = db.zIterator(key, min, max, offset, limit, true) } for ; it.Valid(); it.Next() { @@ -475,12 +484,111 @@ func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offs } if reverse && (offset == 0 && limit < 0) { - v = a.zset_reverse(v, withScores) + v = db.zReverse(v, withScores) } return v, nil } -func (a *App) zset_clear(key []byte) (int64, error) { - return a.zset_remRange(key, MinScore, MaxScore, 0, -1) +func (db *DB) zParseLimit(key []byte, start int, stop int) (offset int, limit int, err error) { + if start < 0 || stop < 0 { + //refer redis implementation + var size int64 + size, err = db.ZCard(key) + if err != nil { + return + } + + llen := int(size) + + if start < 0 { + start = llen + start + } + if stop < 0 { + stop = llen + stop + } + + if start < 0 { + start = 0 + } + + if start >= llen { + offset = -1 + return + } + } + + if start > stop { + offset = -1 + return + } + + offset = start + limit = (stop - start) + 1 + return +} + +func (db *DB) ZClear(key []byte) (int64, error) { + return db.zRemRange(key, MinScore, MaxScore, 0, -1) +} + +func (db *DB) ZRange(key []byte, start int, stop int, withScores bool) ([]interface{}, error) { + return db.ZRangeGeneric(key, start, stop, withScores, false) +} + +//min and max must be inclusive +//if no limit, set offset = 0 and count = -1 +func (db *DB) ZRangeByScore(key []byte, min int64, max int64, + withScores bool, offset int, count int) ([]interface{}, error) { + return db.ZRangeByScoreGeneric(key, min, max, withScores, offset, count, false) +} + +func (db *DB) ZRank(key []byte, member []byte) (int64, error) { + return db.zrank(key, member, false) +} + +func (db *DB) ZRemRangeByRank(key []byte, start int, stop int) (int64, error) { + offset, limit, err := db.zParseLimit(key, start, stop) + if err != nil { + return 0, err + } + return db.zRemRange(key, MinScore, MaxScore, offset, limit) +} + +//min and max must be inclusive +func (db *DB) ZRemRangeByScore(key []byte, min int64, max int64) (int64, error) { + return db.zRemRange(key, min, max, 0, -1) +} + +func (db *DB) ZRevRange(key []byte, start int, stop int, withScores bool) ([]interface{}, error) { + return db.ZRangeGeneric(key, start, stop, withScores, true) +} + +func (db *DB) ZRevRank(key []byte, member []byte) (int64, error) { + return db.zrank(key, member, true) +} + +//min and max must be inclusive +//if no limit, set offset = 0 and count = -1 +func (db *DB) ZRevRangeByScore(key []byte, min int64, max int64, + withScores bool, offset int, count int) ([]interface{}, error) { + return db.ZRangeByScoreGeneric(key, min, max, withScores, offset, count, true) +} + +func (db *DB) ZRangeGeneric(key []byte, start int, stop int, + withScores bool, reverse bool) ([]interface{}, error) { + offset, limit, err := db.zParseLimit(key, start, stop) + if err != nil { + return nil, err + } + + return db.zRange(key, MinScore, MaxScore, withScores, offset, limit, reverse) +} + +//min and max must be inclusive +//if no limit, set offset = 0 and count = -1 +func (db *DB) ZRangeByScoreGeneric(key []byte, min int64, max int64, + withScores bool, offset int, count int, reverse bool) ([]interface{}, error) { + + return db.zRange(key, min, max, withScores, offset, count, reverse) } diff --git a/ledis/tx.go b/ledis/tx.go index 41d0133..38b6e01 100644 --- a/ledis/tx.go +++ b/ledis/tx.go @@ -8,21 +8,9 @@ import ( type tx struct { m sync.Mutex - app *App - wb *leveldb.WriteBatch } -func (app *App) newTx() *tx { - t := new(tx) - - t.app = app - - t.wb = app.db.NewWriteBatch() - - return t -} - func (t *tx) Close() { t.wb.Close() }