From e71c22cf76a98348dabc977c6668ec683bb5290a Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 8 May 2014 10:54:33 +0800 Subject: [PATCH] add zest support --- etc/ssdb.json | 10 + ssdb/app.go | 12 +- ssdb/app_test.go | 4 +- ssdb/cmd_list_test.go | 2 +- ssdb/cmd_zset.go | 464 +++++++++++++++++++++++++++++++++++++--- ssdb/cmd_zset_test.go | 477 ++++++++++++++++++++++++++++++++++++++++++ ssdb/const.go | 5 +- ssdb/t_hash.go | 44 ++-- ssdb/t_list.go | 9 +- ssdb/t_zset.go | 443 +++++++++++++++++++++++++++++++++++++++ 10 files changed, 1408 insertions(+), 62 deletions(-) create mode 100644 etc/ssdb.json create mode 100644 ssdb/cmd_zset_test.go create mode 100644 ssdb/t_zset.go diff --git a/etc/ssdb.json b/etc/ssdb.json new file mode 100644 index 0000000..d0bcd0f --- /dev/null +++ b/etc/ssdb.json @@ -0,0 +1,10 @@ +{ + "addr": "127.0.0.1:6380", + "leveldb": { + "path": "/tmp/ssdb", + "compression": true, + "block_size": 32768, + "write_buffer_size": 2097152, + "cache_size": 20971520 + } +} \ No newline at end of file diff --git a/ssdb/app.go b/ssdb/app.go index 8b3e9d5..00ee1d3 100644 --- a/ssdb/app.go +++ b/ssdb/app.go @@ -17,11 +17,15 @@ type App struct { listTx *tx hashTx *tx zsetTx *tx + + closed bool } func NewApp(cfg *Config) (*App, error) { app := new(App) + app.closed = false + app.cfg = cfg var err error @@ -50,13 +54,19 @@ func NewApp(cfg *Config) (*App, error) { } func (app *App) Close() { + if app.closed { + return + } + app.listener.Close() app.db.Close() + + app.closed = true } func (app *App) Run() { - for { + for !app.closed { conn, err := app.listener.Accept() if err != nil { continue diff --git a/ssdb/app_test.go b/ssdb/app_test.go index d633f18..47360ad 100644 --- a/ssdb/app_test.go +++ b/ssdb/app_test.go @@ -33,14 +33,14 @@ func startTestApp() { f := func() { newTestRedisPool() - os.RemoveAll("./testdb") + os.RemoveAll("/tmp/testdb") var d = []byte(` { "addr" : "127.0.0.1:6380", "leveldb" : { - "path" : "./testdb", + "path" : "/tmp/testdb", "compression":true, "block_size" : 32768, "write_buffer_size" : 2097152, diff --git a/ssdb/cmd_list_test.go b/ssdb/cmd_list_test.go index 8cfed80..a343b93 100644 --- a/ssdb/cmd_list_test.go +++ b/ssdb/cmd_list_test.go @@ -63,7 +63,7 @@ func testPrintList(key []byte) { println(headSeq, tailSeq, size) it := testApp.db.Iterator(encode_list_key(key, listMinSeq), - encode_list_key(key, listMaxSeq), 0) + encode_list_key(key, listMaxSeq), 0, 0, -1) for ; it.Valid(); it.Next() { k, seq, _ := decode_list_key(it.Key()) println(string(k), seq, string(it.Value())) diff --git a/ssdb/cmd_zset.go b/ssdb/cmd_zset.go index ae238fb..cd05958 100644 --- a/ssdb/cmd_zset.go +++ b/ssdb/cmd_zset.go @@ -1,5 +1,445 @@ package ssdb +import ( + "errors" + "github.com/siddontang/golib/hack" + "math" + "strconv" + "strings" +) + +//for simple implementation, we only support int64 score + +const ( + MinScore int64 = -1<<63 + 1 + MaxScore int64 = 1<<63 - 1 +) + +var errScoreOverflow = errors.New("zset score overflow") + +func zaddCommand(c *client) error { + args := c.args + if len(args) < 3 { + return ErrCmdParams + } + + key := args[0] + if len(args[1:])%2 != 0 { + return ErrCmdParams + } + + args = args[1:] + params := make([]interface{}, len(args)) + for i := 0; i < len(params); i += 2 { + score, err := strconv.ParseInt(hack.String(args[i]), 10, 64) + if err != nil { + return err + } + + params[i] = score + params[i+1] = args[i+1] + } + + if n, err := c.app.zset_add(key, params); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func zcardCommand(c *client) error { + args := c.args + if len(args) != 1 { + return ErrCmdParams + } + + if n, err := c.app.zset_card(args[0]); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func zscoreCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + if v, err := c.app.zset_score(args[0], args[1]); err != nil { + return err + } else { + c.writeBulk(v) + } + + return nil +} + +func zremCommand(c *client) error { + args := c.args + if len(args) < 2 { + return ErrCmdParams + } + + if n, err := c.app.zset_rem(args[0], args[1:]); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func zincrbyCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + key := args[0] + + delta, err := strconv.ParseInt(hack.String(args[1]), 10, 64) + if err != nil { + return err + } + + if v, err := c.app.zset_incrby(key, delta, args[2]); err != nil { + return err + } else { + c.writeBulk(v) + } + + return nil +} + +func zparseScoreRange(minBuf []byte, maxBuf []byte) (min int64, max int64, err error) { + if strings.ToLower(hack.String(minBuf)) == "-inf" { + min = math.MinInt64 + } else { + var lopen bool = false + if minBuf[0] == '(' { + lopen = true + minBuf = minBuf[1:] + } + + if len(minBuf) == 0 { + err = ErrCmdParams + return + } + + min, err = strconv.ParseInt(hack.String(minBuf), 10, 64) + if err != nil { + return + } + + if min <= MinScore || min >= MaxScore { + err = errScoreOverflow + return + } + + if lopen { + min++ + } + } + + if strings.ToLower(hack.String(maxBuf)) == "+inf" { + max = math.MaxInt64 + } else { + var ropen = false + if maxBuf[0] == '(' { + ropen = true + maxBuf = maxBuf[1:] + } + + if len(maxBuf) == 0 { + err = ErrCmdParams + return + } + + max, err = strconv.ParseInt(hack.String(maxBuf), 10, 64) + if err != nil { + return + } + + if max <= MinScore || max >= MaxScore { + err = errScoreOverflow + return + } + + if ropen { + max-- + } + } + + return +} + +func zcountCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + min, max, err := zparseScoreRange(args[1], args[2]) + if err != nil { + return err + } + + if min > max { + c.writeInteger(0) + return nil + } + + if n, err := c.app.zset_count(args[0], min, max); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func zrankCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + if n, err := c.app.zset_rank(args[0], args[1], false); err != nil { + return err + } else if n == -1 { + c.writeBulk(nil) + } else { + c.writeInteger(n) + } + + return nil +} + +func zrevrankCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + if n, err := c.app.zset_rank(args[0], args[1], true); err != nil { + return err + } else if n == -1 { + c.writeBulk(nil) + } else { + c.writeInteger(n) + } + + return nil +} + +func zremrangebyrankCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + key := args[0] + + offset, limit, err := zparseRange(c, key, args[1], args[2]) + if err != nil { + return err + } + + if offset < 0 { + c.writeInteger(0) + return nil + } + + if n, err := c.app.zset_remRange(key, MinScore, MaxScore, offset, limit); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func zremrangebyscoreCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + key := args[0] + min, max, err := zparseScoreRange(args[1], args[2]) + if err != nil { + return err + } + + if n, err := c.app.zset_remRange(key, min, max, 0, -1); err != nil { + return err + } else { + c.writeInteger(n) + } + + return nil +} + +func zparseRange(c *client, key []byte, startBuf []byte, stopBuf []byte) (offset int, limit int, err error) { + var start int + var stop int + if start, err = strconv.Atoi(hack.String(startBuf)); err != nil { + return + } + + if stop, err = strconv.Atoi(hack.String(stopBuf)); err != nil { + return + } + + if start < 0 || stop < 0 { + //refer redis implementation + var size int64 + size, err = c.app.zset_card(key) + if err != nil { + return + } + + llen := int(size) + + if start < 0 { + start = llen + start + } + if stop < 0 { + stop = llen + stop + } + + if start < 0 { + start = 0 + } + + if start >= llen { + offset = -1 + return + } + } + + if start > stop { + offset = -1 + return + } + + offset = start + limit = (stop - start) + 1 + return +} + +func zrangeGeneric(c *client, reverse bool) error { + args := c.args + if len(args) < 3 { + return ErrCmdParams + } + + key := args[0] + + offset, limit, err := zparseRange(c, key, args[1], args[2]) + if err != nil { + return err + } + + if offset < 0 { + c.writeArray([]interface{}{}) + return nil + } + + args = args[3:] + var withScores bool = false + + if len(args) > 0 && strings.ToLower(hack.String(args[0])) == "withscores" { + withScores = true + } + + if v, err := c.app.zset_range(key, MinScore, MaxScore, withScores, offset, limit, reverse); err != nil { + return err + } else { + c.writeArray(v) + } + return nil +} + +func zrangeCommand(c *client) error { + return zrangeGeneric(c, false) +} + +func zrevrangeCommand(c *client) error { + return zrangeGeneric(c, true) +} + +func zrangebyscoreGeneric(c *client, reverse bool) error { + args := c.args + if len(args) < 3 { + return ErrCmdParams + } + + key := args[0] + min, max, err := zparseScoreRange(args[1], args[2]) + if err != nil { + return err + } + + args = args[3:] + + var withScores bool = false + + if len(args) > 0 && strings.ToLower(hack.String(args[0])) == "withscores" { + withScores = true + args = args[1:] + } + + var offset int = 0 + var limit int = -1 + + if len(args) > 0 { + if len(args) != 3 { + return ErrCmdParams + } + + if strings.ToLower(hack.String(args[0])) != "limit" { + return ErrCmdParams + } + + if offset, err = strconv.Atoi(hack.String(args[1])); err != nil { + return ErrCmdParams + } + + if limit, err = strconv.Atoi(hack.String(args[2])); err != nil { + return ErrCmdParams + } + } + + if offset < 0 { + //for redis, if offset < 0, a empty will return + //so here we directly return a empty array + c.writeArray([]interface{}{}) + return nil + } + + if v, err := c.app.zset_range(key, min, max, withScores, offset, limit, reverse); err != nil { + return err + } else { + c.writeArray(v) + } + + return nil +} + +func zrangebyscoreCommand(c *client) error { + return zrangebyscoreGeneric(c, false) +} + +func zrevrangebyscoreCommand(c *client) error { + return zrangebyscoreGeneric(c, true) +} + func init() { register("zadd", zaddCommand) register("zcard", zcardCommand) @@ -16,27 +456,3 @@ func init() { register("zrevrangebyscore", zrevrangebyscoreCommand) register("zscore", zscoreCommand) } - -func zcardCommand(c *client) error { - return nil -} - -func zscoreCommand(c *client) error { - return nil -} - -func zremCommand(c *client) error { - return nil -} - -func zrankCommand(c *client) error { return nil } -func zrevrankCommand(c *client) error { return nil } -func zcountCommand(c *client) error { return nil } -func zremrangebyrankCommand(c *client) error { return nil } -func zremrangebyscoreCommand(c *client) error { return nil } -func zrangeCommand(c *client) error { return nil } -func zrevrangeCommand(c *client) error { return nil } -func zaddCommand(c *client) error { return nil } -func zincrbyCommand(c *client) error { return nil } -func zrangebyscoreCommand(c *client) error { return nil } -func zrevrangebyscoreCommand(c *client) error { return nil } diff --git a/ssdb/cmd_zset_test.go b/ssdb/cmd_zset_test.go new file mode 100644 index 0000000..d2aeea6 --- /dev/null +++ b/ssdb/cmd_zset_test.go @@ -0,0 +1,477 @@ +package ssdb + +import ( + "bytes" + "fmt" + "github.com/garyburd/redigo/redis" + "strconv" + "testing" +) + +func TestCodecZSet(t *testing.T) { + key := []byte("a") + + ek := encode_zsize_key(key) + + if k, err := decode_zsize_key(ek); err != nil { + t.Fatal(err) + } else if !bytes.Equal(key, k) { + t.Fatal(string(k)) + } + + member := []byte("f") + + ek = encode_zset_key(key, member) + + if k, m, err := decode_zset_key(ek); err != nil { + t.Fatal(err) + } else if !bytes.Equal(key, k) { + t.Fatal(string(k)) + } else if !bytes.Equal(member, m) { + t.Fatal(string(m)) + } + + ek = encode_zscore_key(key, member, 3) + + if k, m, s, err := decode_zscore_key(ek); err != nil { + t.Fatal(err) + } else if !bytes.Equal(key, k) { + t.Fatal(string(k)) + } else if !bytes.Equal(member, m) { + t.Fatal(string(m)) + } else if s != 3 { + t.Fatal(s) + } + + ek = encode_zscore_key(key, member, -3) + + if k, m, s, err := decode_zscore_key(ek); err != nil { + t.Fatal(err) + } else if !bytes.Equal(key, k) { + t.Fatal(string(k)) + } else if !bytes.Equal(member, m) { + t.Fatal(string(m)) + } else if s != -3 { + t.Fatal(s) + } +} + +func TestZSet(t *testing.T) { + startTestApp() + + c := getTestConn() + defer c.Close() + + key := []byte("myzset") + if n, err := redis.Int(c.Do("zadd", key, 3, "a", 4, "b")); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(n) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zadd", key, 1, "a", 2, "b")); err != nil { + t.Fatal(err) + } else if n != 0 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(n) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zadd", key, 3, "c", 4, "d")); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(err) + } else if n != 4 { + t.Fatal(n) + } + + if s, err := redis.Int(c.Do("zscore", key, "c")); err != nil { + t.Fatal(err) + } else if s != 3 { + t.Fatal(s) + } + + if n, err := redis.Int(c.Do("zrem", key, "d", "e")); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(err) + } else if n != 3 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zincrby", key, 4, "c")); err != nil { + t.Fatal(err) + } else if n != 7 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zincrby", key, -4, "c")); err != nil { + t.Fatal(err) + } else if n != 3 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zincrby", key, 4, "d")); err != nil { + t.Fatal(err) + } else if n != 4 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(err) + } else if n != 4 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zrem", key, "a", "b", "c", "d")); err != nil { + t.Fatal(err) + } else if n != 4 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(err) + } else if n != 0 { + t.Fatal(n) + } + +} + +func TestZSetCount(t *testing.T) { + startTestApp() + + c := getTestConn() + defer c.Close() + + key := []byte("myzset") + if _, err := redis.Int(c.Do("zadd", key, 1, "a", 2, "b", 3, "c", 4, "d")); err != nil { + t.Fatal(err) + } + + if n, err := redis.Int(c.Do("zcount", key, 2, 4)); err != nil { + t.Fatal(err) + } else if n != 3 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcount", key, 4, 4)); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcount", key, 4, 3)); err != nil { + t.Fatal(err) + } else if n != 0 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcount", key, "(2", 4)); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcount", key, "2", "(4")); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcount", key, "(2", "(4")); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcount", key, "-inf", "+inf")); err != nil { + t.Fatal(err) + } else if n != 4 { + t.Fatal(n) + } + + c.Do("zadd", key, 3, "e") + + if n, err := redis.Int(c.Do("zcount", key, "(2", "(4")); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + c.Do("zrem", key, "a", "b", "c", "d", "e") +} + +func TestZSetRank(t *testing.T) { + startTestApp() + + c := getTestConn() + defer c.Close() + + key := []byte("myzset") + if _, err := redis.Int(c.Do("zadd", key, 1, "a", 2, "b", 3, "c", 4, "d")); err != nil { + t.Fatal(err) + } + + if n, err := redis.Int(c.Do("zrank", key, "c")); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if _, err := redis.Int(c.Do("zrank", key, "e")); err != redis.ErrNil { + t.Fatal(err) + } + + if n, err := redis.Int(c.Do("zrevrank", key, "c")); err != nil { + t.Fatal(err) + } else if n != 1 { + t.Fatal(n) + } + + if _, err := redis.Int(c.Do("zrevrank", key, "e")); err != redis.ErrNil { + t.Fatal(err) + } +} + +func testZSetRange(ay []interface{}, checkValues ...interface{}) error { + if len(ay) != len(checkValues) { + return fmt.Errorf("invalid return number %d != %d", len(ay), len(checkValues)) + } + + for i := 0; i < len(ay); i++ { + v, ok := ay[i].([]byte) + if !ok { + return fmt.Errorf("invalid data %d %v %T", i, ay[i], ay[i]) + } + + switch cv := checkValues[i].(type) { + case string: + if string(v) != cv { + return fmt.Errorf("not equal %s != %s", v, checkValues[i]) + } + default: + if s, _ := strconv.Atoi(string(v)); s != checkValues[i] { + return fmt.Errorf("not equal %s != %v", v, checkValues[i]) + } + } + + } + + return nil +} + +func TestZSetRangeScore(t *testing.T) { + startTestApp() + + c := getTestConn() + defer c.Close() + + key := []byte("myzset_range") + if _, err := redis.Int(c.Do("zadd", key, 1, "a", 2, "b", 3, "c", 4, "d")); err != nil { + t.Fatal(err) + } + + if v, err := redis.MultiBulk(c.Do("zrangebyscore", key, 1, 4, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "a", 1, "b", 2, "c", 3, "d", 4); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrangebyscore", key, 1, 4, "withscores", "limit", 1, 2)); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "b", 2, "c", 3); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrangebyscore", key, "-inf", "+inf", "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "a", 1, "b", 2, "c", 3, "d", 4); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrangebyscore", key, "(1", "(4")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "b", "c"); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrangebyscore", key, 1, 4, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "d", 4, "c", 3, "b", 2, "a", 1); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrangebyscore", key, 1, 4, "withscores", "limit", 1, 2)); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "c", 3, "b", 2); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrangebyscore", key, "-inf", "+inf", "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "d", 4, "c", 3, "b", 2, "a", 1); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrangebyscore", key, "(1", "(4")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "c", "b"); err != nil { + t.Fatal(err) + } + } + + if n, err := redis.Int(c.Do("zremrangebyscore", key, 2, 3)); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if v, err := redis.MultiBulk(c.Do("zrangebyscore", key, 1, 4)); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "a", "d"); err != nil { + t.Fatal(err) + } + } +} + +func TestZSetRange(t *testing.T) { + startTestApp() + + c := getTestConn() + defer c.Close() + + key := []byte("myzset_range_rank") + if _, err := redis.Int(c.Do("zadd", key, 1, "a", 2, "b", 3, "c", 4, "d")); err != nil { + t.Fatal(err) + } + + if v, err := redis.MultiBulk(c.Do("zrange", key, 0, 3, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "a", 1, "b", 2, "c", 3, "d", 4); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrange", key, 1, 4, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "b", 2, "c", 3, "d", 4); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrange", key, -2, -1, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "c", 3, "d", 4); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrange", key, 0, -1, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "a", 1, "b", 2, "c", 3, "d", 4); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrange", key, -1, -2, "withscores")); err != nil { + t.Fatal(err) + } else if len(v) != 0 { + t.Fatal(len(v)) + } + + if v, err := redis.MultiBulk(c.Do("zrevrange", key, 0, 4, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "d", 4, "c", 3, "b", 2, "a", 1); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrange", key, 0, -1, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "d", 4, "c", 3, "b", 2, "a", 1); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrange", key, 2, 3, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "b", 2, "a", 1); err != nil { + t.Fatal(err) + } + } + + if v, err := redis.MultiBulk(c.Do("zrevrange", key, -2, -1, "withscores")); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "b", 2, "a", 1); err != nil { + t.Fatal(err) + } + } + + if n, err := redis.Int(c.Do("zremrangebyrank", key, 2, 3)); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if n, err := redis.Int(c.Do("zcard", key)); err != nil { + t.Fatal(err) + } else if n != 2 { + t.Fatal(n) + } + + if v, err := redis.MultiBulk(c.Do("zrange", key, 0, 4)); err != nil { + t.Fatal(err) + } else { + if err := testZSetRange(v, "a", "b"); err != nil { + t.Fatal(err) + } + } +} diff --git a/ssdb/const.go b/ssdb/const.go index 00b9b08..93a830f 100644 --- a/ssdb/const.go +++ b/ssdb/const.go @@ -24,8 +24,9 @@ const ( KV_TYPE byte = iota + 1 HASH_TYPE HSIZE_TYPE - ZSET_TYPE - ZSIZE_TYPE LIST_TYPE LSIZE_TYPE + ZSET_TYPE + ZSIZE_TYPE + ZSCORE_TYPE ) diff --git a/ssdb/t_hash.go b/ssdb/t_hash.go index 2518d3c..5529fb6 100644 --- a/ssdb/t_hash.go +++ b/ssdb/t_hash.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "github.com/siddontang/golib/hack" + "github.com/siddontang/golib/leveldb" "strconv" ) @@ -12,7 +13,7 @@ var errHSizeKey = errors.New("invalid hsize key") const ( hashStartSep byte = ':' - hashStopSep byte = ';' + hashStopSep byte = hashStartSep + 1 ) func encode_hsize_key(key []byte) []byte { @@ -51,35 +52,16 @@ func encode_hash_key(key []byte, field []byte) []byte { } func encode_hash_start_key(key []byte) []byte { - buf := make([]byte, len(key)+1+4+1) - - pos := 0 - buf[pos] = HASH_TYPE - pos++ - binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) - pos += 4 - - copy(buf[pos:], key) - pos += len(key) - - buf[pos] = hashStartSep - return buf + k := encode_hash_key(key, nil) + return k } func encode_hash_stop_key(key []byte) []byte { - buf := make([]byte, len(key)+1+4+1) + k := encode_hash_key(key, nil) - pos := 0 - buf[pos] = HASH_TYPE - pos++ - binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) - pos += 4 + k[len(k)-1] = hashStopSep - copy(buf[pos:], key) - pos += len(key) - - buf[pos] = hashStopSep - return buf + return k } func decode_hash_key(ek []byte) ([]byte, []byte, error) { @@ -268,7 +250,7 @@ func (a *App) hash_getall(key []byte) ([]interface{}, error) { v := make([]interface{}, 0, 16) - it := a.db.Iterator(start, stop, 0) + it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, k, err := decode_hash_key(it.Key()) if err != nil { @@ -278,6 +260,8 @@ func (a *App) hash_getall(key []byte) ([]interface{}, error) { v = append(v, it.Value()) } + it.Close() + return v, nil } @@ -287,7 +271,7 @@ func (a *App) hash_keys(key []byte) ([]interface{}, error) { v := make([]interface{}, 0, 16) - it := a.db.Iterator(start, stop, 0) + it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { _, k, err := decode_hash_key(it.Key()) if err != nil { @@ -296,6 +280,8 @@ func (a *App) hash_keys(key []byte) ([]interface{}, error) { v = append(v, k) } + it.Close() + return v, nil } @@ -305,10 +291,12 @@ func (a *App) hash_values(key []byte) ([]interface{}, error) { v := make([]interface{}, 0, 16) - it := a.db.Iterator(start, stop, 0) + it := a.db.Iterator(start, stop, leveldb.RangeROpen, 0, -1) for ; it.Valid(); it.Next() { v = append(v, it.Value()) } + it.Close() + return v, nil } diff --git a/ssdb/t_list.go b/ssdb/t_list.go index 670f289..0fd0ad4 100644 --- a/ssdb/t_list.go +++ b/ssdb/t_list.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "github.com/siddontang/golib/hack" + "github.com/siddontang/golib/leveldb" "strconv" ) @@ -211,7 +212,7 @@ func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, er } startSeq = seq + start - stopSeq = seq + stop + 1 + stopSeq = seq + stop } else if start < 0 && stop < 0 { seq, err := a.list_getSeq(key, listTailSeq) @@ -220,7 +221,7 @@ func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, er } startSeq = seq + start + 1 - stopSeq = seq + stop + 2 + stopSeq = seq + stop + 1 } else { //start < 0 && stop > 0 var err error @@ -236,7 +237,7 @@ func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, er return nil, err } - stopSeq += stop + 1 + stopSeq += stop } if startSeq < listMinSeq { @@ -246,7 +247,7 @@ func (a *App) list_range(key []byte, start int64, stop int64) ([]interface{}, er } it := a.db.Iterator(encode_list_key(key, startSeq), - encode_list_key(key, stopSeq), 0) + encode_list_key(key, stopSeq), leveldb.RangeClose, 0, -1) for ; it.Valid(); it.Next() { v = append(v, it.Value()) } diff --git a/ssdb/t_zset.go b/ssdb/t_zset.go new file mode 100644 index 0000000..7de1d9c --- /dev/null +++ b/ssdb/t_zset.go @@ -0,0 +1,443 @@ +package ssdb + +import ( + "bytes" + "encoding/binary" + "errors" + "github.com/siddontang/golib/hack" + "github.com/siddontang/golib/leveldb" + "strconv" +) + +var errZSizeKey = errors.New("invalid zsize key") +var errZSetKey = errors.New("invalid zset key") +var errZScoreKey = errors.New("invalid zscore key") + +const ( + zsetNScoreSep byte = '<' + zsetPScoreSep byte = zsetNScoreSep + 1 + zsetStopScoreSep byte = zsetPScoreSep + 1 + + zsetStartMemSep byte = ':' + zsetStopMemSep byte = zsetStartMemSep + 1 +) + +func encode_zsize_key(key []byte) []byte { + buf := make([]byte, len(key)+1) + buf[0] = ZSIZE_TYPE + + copy(buf[1:], key) + return buf +} + +func decode_zsize_key(ek []byte) ([]byte, error) { + if len(ek) == 0 || ek[0] != ZSIZE_TYPE { + return nil, errZSizeKey + } + + return ek[1:], nil +} + +func encode_zset_key(key []byte, member []byte) []byte { + buf := make([]byte, len(key)+len(member)+5) + + pos := 0 + buf[pos] = ZSET_TYPE + pos++ + + binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) + pos += 4 + + copy(buf[pos:], key) + pos += len(key) + + copy(buf[pos:], member) + + return buf +} + +func decode_zset_key(ek []byte) ([]byte, []byte, error) { + if len(ek) < 5 || ek[0] != ZSET_TYPE { + return nil, nil, errZSetKey + } + + keyLen := int(binary.BigEndian.Uint32(ek[1:])) + if keyLen+5 > len(ek) { + return nil, nil, errZSetKey + } + + key := ek[5 : 5+keyLen] + member := ek[5+keyLen:] + return key, member, nil +} + +func encode_zscore_key(key []byte, member []byte, score int64) []byte { + buf := make([]byte, len(key)+len(member)+15) + + pos := 0 + buf[pos] = ZSCORE_TYPE + pos++ + + binary.BigEndian.PutUint32(buf[pos:], uint32(len(key))) + pos += 4 + + copy(buf[pos:], key) + pos += len(key) + + if score < 0 { + buf[pos] = zsetNScoreSep + } else { + buf[pos] = zsetPScoreSep + } + + pos++ + binary.BigEndian.PutUint64(buf[pos:], uint64(score)) + pos += 8 + + buf[pos] = zsetStartMemSep + pos++ + + copy(buf[pos:], member) + return buf +} + +func encode_start_zscore_key(key []byte, score int64) []byte { + k := encode_zscore_key(key, nil, score) + return k +} + +func encode_stop_zscore_key(key []byte, score int64) []byte { + k := encode_zscore_key(key, nil, score) + k[len(k)-1] = zsetStopMemSep + return k +} + +func decode_zscore_key(ek []byte) (key []byte, member []byte, score int64, err error) { + if len(ek) < 15 || ek[0] != ZSCORE_TYPE { + err = errZScoreKey + return + } + + keyLen := int(binary.BigEndian.Uint32(ek[1:])) + if keyLen+14 > len(ek) { + err = errZScoreKey + return + } + + key = ek[5 : 5+keyLen] + pos := 5 + keyLen + + if (ek[pos] != zsetNScoreSep) && (ek[pos] != zsetPScoreSep) { + err = errZScoreKey + return + } + pos++ + + score = int64(binary.BigEndian.Uint64(ek[pos:])) + pos += 8 + + if ek[pos] != zsetStartMemSep { + err = errZScoreKey + return + } + + pos++ + + member = ek[pos:] + return +} + +func (a *App) zset_setItem(key []byte, score int64, member []byte) (int64, error) { + if score <= MinScore || score >= MaxScore { + return 0, errScoreOverflow + } + + t := a.zsetTx + + var exists int64 = 0 + ek := encode_zset_key(key, member) + if v, err := a.db.Get(ek); err != nil { + return 0, err + } else if v != nil { + exists = 1 + + if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + return 0, err + } else { + sk := encode_zscore_key(key, member, s) + t.Delete(sk) + } + } + + t.Put(ek, hack.Slice(strconv.FormatInt(score, 10))) + + sk := encode_zscore_key(key, member, score) + t.Put(sk, []byte{}) + + return exists, nil +} + +func (a *App) zset_delItem(key []byte, member []byte, skipDelScore bool) (int64, error) { + t := a.zsetTx + + ek := encode_zset_key(key, member) + if v, err := a.db.Get(ek); err != nil { + return 0, err + } else if v == nil { + //not exists + return 0, nil + } else { + //exists + if !skipDelScore { + //we must del score + if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + return 0, err + } else { + sk := encode_zscore_key(key, member, s) + t.Delete(sk) + } + } + } + + t.Delete(ek) + return 1, nil +} + +func (a *App) zset_add(key []byte, args []interface{}) (int64, error) { + t := a.zsetTx + t.Lock() + defer t.Unlock() + + var num int64 = 0 + for i := 0; i < len(args); i += 2 { + score := args[i].(int64) + member := args[i+1].([]byte) + + if n, err := a.zset_setItem(key, score, member); err != nil { + return 0, err + } else if n == 0 { + //add new + num++ + } + } + + if _, err := a.zset_incrSize(key, num); err != nil { + return 0, err + } + + //todo add binlog + err := t.Commit() + return num, err +} + +func (a *App) zset_incrSize(key []byte, delta int64) (int64, error) { + t := a.zsetTx + sk := encode_zsize_key(key) + size, err := a.db.GetInt(sk) + if err != nil { + return 0, err + } else { + size += delta + if size <= 0 { + size = 0 + t.Delete(sk) + } else { + t.Put(sk, hack.Slice(strconv.FormatInt(size, 10))) + } + } + + return size, nil +} + +func (a *App) zset_card(key []byte) (int64, error) { + sk := encode_zsize_key(key) + size, err := a.db.GetInt(sk) + return size, err +} + +func (a *App) zset_score(key []byte, member []byte) ([]byte, error) { + k := encode_zset_key(key, member) + return a.db.Get(k) +} + +func (a *App) zset_rem(key []byte, args [][]byte) (int64, error) { + t := a.zsetTx + t.Lock() + defer t.Unlock() + + var num int64 = 0 + for i := 0; i < len(args); i++ { + if n, err := a.zset_delItem(key, args[i], false); err != nil { + return 0, err + } else if n == 1 { + num++ + } + } + + if _, err := a.zset_incrSize(key, -num); err != nil { + return 0, err + } + + err := t.Commit() + return num, err +} + +func (a *App) zset_incrby(key []byte, delta int64, member []byte) ([]byte, error) { + t := a.zsetTx + t.Lock() + defer t.Unlock() + + ek := encode_zset_key(key, member) + var score int64 = delta + v, err := a.db.Get(ek) + if err != nil { + return nil, err + } else if v != nil { + if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + return nil, err + } else { + sk := encode_zscore_key(key, member, s) + t.Delete(sk) + + score = s + delta + + if score >= MaxScore || score <= MinScore { + return nil, errScoreOverflow + } + } + } else { + a.zset_incrSize(key, 1) + } + + buf := hack.Slice(strconv.FormatInt(score, 10)) + t.Put(ek, buf) + + t.Put(encode_zscore_key(key, member, score), []byte{}) + + err = t.Commit() + return buf, err +} + +func (a *App) zset_count(key []byte, min int64, max int64) (int64, error) { + minKey := encode_start_zscore_key(key, min) + maxKey := encode_stop_zscore_key(key, max) + + rangeType := leveldb.RangeROpen + + it := a.db.Iterator(minKey, maxKey, rangeType, 0, -1) + var n int64 = 0 + for ; it.Valid(); it.Next() { + n++ + } + it.Close() + + return n, nil +} + +func (a *App) zset_rank(key []byte, member []byte, reverse bool) (int64, error) { + k := encode_zset_key(key, member) + + if v, err := a.db.Get(k); err != nil { + return 0, err + } else if v == nil { + return -1, nil + } else { + if s, err := strconv.ParseInt(hack.String(v), 10, 64); err != nil { + return 0, err + } else { + var it *leveldb.Iterator + + sk := encode_zscore_key(key, member, s) + + if !reverse { + minKey := encode_start_zscore_key(key, MinScore) + it = a.db.Iterator(minKey, sk, leveldb.RangeClose, 0, -1) + } else { + maxKey := encode_stop_zscore_key(key, MaxScore) + it = a.db.RevIterator(sk, maxKey, leveldb.RangeClose, 0, -1) + } + + var lastKey []byte = nil + var n int64 = 0 + + for ; it.Valid(); it.Next() { + n++ + + lastKey = it.Key() + } + + it.Close() + + if _, m, _, err := decode_zscore_key(lastKey); err == nil && bytes.Equal(m, member) { + n-- + return n, nil + } + } + } + + return -1, nil +} + +func (a *App) zset_iterator(key []byte, min int64, max int64, offset int, limit int, reverse bool) *leveldb.Iterator { + minKey := encode_start_zscore_key(key, min) + maxKey := encode_stop_zscore_key(key, max) + + if !reverse { + return a.db.Iterator(minKey, maxKey, leveldb.RangeClose, offset, limit) + } else { + return a.db.RevIterator(minKey, maxKey, leveldb.RangeClose, offset, limit) + } +} + +func (a *App) zset_remRange(key []byte, min int64, max int64, offset int, limit int) (int64, error) { + t := a.zsetTx + t.Lock() + defer t.Unlock() + + it := a.zset_iterator(key, min, max, offset, limit, false) + var num int64 = 0 + for ; it.Valid(); it.Next() { + k := it.Key() + _, m, _, err := decode_zscore_key(k) + if err != nil { + continue + } + + if n, err := a.zset_delItem(key, m, true); err != nil { + return 0, err + } else if n == 1 { + num++ + } + + t.Delete(k) + } + + if _, err := a.zset_incrSize(key, -num); err != nil { + return 0, err + } + + //todo add binlog + + err := t.Commit() + return num, err +} + +func (a *App) zset_range(key []byte, min int64, max int64, withScores bool, offset int, limit int, reverse bool) ([]interface{}, error) { + v := make([]interface{}, 0, 16) + it := a.zset_iterator(key, min, max, offset, limit, reverse) + for ; it.Valid(); it.Next() { + _, m, s, err := decode_zscore_key(it.Key()) + //may be we will check key equal? + if err != nil { + continue + } + + v = append(v, m) + + if withScores { + v = append(v, hack.Slice(strconv.FormatInt(s, 10))) + } + } + + return v, nil +}