diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 4c1bf08..5aae124 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -11,8 +11,12 @@ }, { "ImportPath": "github.com/boltdb/bolt", - "Comment": "data/v1-254-gd285804", - "Rev": "d285804df1760edf4c602ecd901be5d5e67bf982" + "Comment": "data/v1-256-ge65c902", + "Rev": "e65c9027c35b7ef1014db9e02686889e51aadb2e" + }, + { + "ImportPath": "github.com/cupcake/rdb", + "Rev": "3454dcabd33cb8ea8261ffd6a45f4d836eb504cc" }, { "ImportPath": "github.com/edsrzf/mmap-go", diff --git a/README.md b/README.md index 2426c57..c81d86c 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ LedisDB now supports multiple different databases as backends. + HTTP API support, JSON/BSON/msgpack output. + Replication to guarantee data safety. + Supplies tools to load, dump, and repair database. ++ Supports cluster, use [xcodis](https://github.com/siddontang/xcodis) ## Build and Install @@ -147,6 +148,10 @@ Set slaveof in config or dynamiclly ledis 127.0.0.1:6381> slaveof 127.0.0.1 6380 OK +## Cluster support + +LedisDB uses a proxy named [xcodis](https://github.com/siddontang/xcodis) to support cluster. + ## Benchmark See [benchmark](https://github.com/siddontang/ledisdb/wiki/Benchmark) for more. @@ -178,5 +183,5 @@ See [Clients](https://github.com/siddontang/ledisdb/wiki/Clients) to find or con ## Feedback -Gmail: siddontang@gmail.com -Skype: live:siddontang_1 ++ Gmail: siddontang@gmail.com ++ Skype: live:siddontang_1 diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 1c988b6..12f5f00 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -8,7 +8,9 @@ import ( "io" "net" "strconv" + "strings" "sync" + "time" ) // Error represents an error returned in a command reply. @@ -40,6 +42,8 @@ type Conn struct { // Scratch space for formatting integers and floats. numScratch [40]byte + + connectTimeout time.Duration } func NewConn(addr string) *Conn { @@ -69,6 +73,28 @@ func (c *Conn) Close() { } } +func (c *Conn) SetConnectTimeout(t time.Duration) { + c.cm.Lock() + c.connectTimeout = t + c.cm.Unlock() +} + +func (c *Conn) SetReadDeadline(t time.Time) { + c.cm.Lock() + if c.c != nil { + c.c.SetReadDeadline(t) + } + c.cm.Unlock() +} + +func (c *Conn) SetWriteDeadline(t time.Time) { + c.cm.Lock() + if c.c != nil { + c.c.SetWriteDeadline(t) + } + c.cm.Unlock() +} + func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { if err := c.Send(cmd, args...); err != nil { return nil, err @@ -78,6 +104,21 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } func (c *Conn) Send(cmd string, args ...interface{}) error { + var err error + for i := 0; i < 2; i++ { + if err = c.send(cmd, args...); err != nil { + if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") { + //send to a closed connection, try again + continue + } + } else { + return nil + } + } + return err +} + +func (c *Conn) send(cmd string, args ...interface{}) error { if err := c.connect(); err != nil { return err } @@ -129,7 +170,9 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { func (c *Conn) finalize() { c.cm.Lock() if !c.closed { - c.c.Close() + if c.c != nil { + c.c.Close() + } c.closed = true } c.cm.Unlock() @@ -144,7 +187,7 @@ func (c *Conn) connect() error { } var err error - c.c, err = net.Dial(getProto(c.addr), c.addr) + c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout) if err != nil { c.c = nil return err diff --git a/cmd/ledis-cli/const.go b/cmd/ledis-cli/const.go index 5586f1a..9c8886b 100644 --- a/cmd/ledis-cli/const.go +++ b/cmd/ledis-cli/const.go @@ -1,4 +1,4 @@ -//This file was generated by .tools/generate_commands.py on Sun Oct 26 2014 15:14:39 +0800 +//This file was generated by .tools/generate_commands.py on Thu Nov 27 2014 13:42:44 +0800 package main var helpCommands = [][]string{ @@ -23,6 +23,7 @@ var helpCommands = [][]string{ {"DECR", "key", "KV"}, {"DECRBY", "key decrement", "KV"}, {"DEL", "key [key ...]", "KV"}, + {"DUMP", "key", "KV"}, {"ECHO", "message", "Server"}, {"EVAL", "script numkeys key [key ...] arg [arg ...]", "Script"}, {"EVALSHA", "sha1 numkeys key [key ...] arg [arg ...]", "Script"}, @@ -36,6 +37,7 @@ var helpCommands = [][]string{ {"GETSET", " key value", "KV"}, {"HCLEAR", "key", "Hash"}, {"HDEL", "key field [field ...]", "Hash"}, + {"HDUMP", "key", "Hash"}, {"HEXISTS", "key field", "Hash"}, {"HEXPIRE", "key seconds", "Hash"}, {"HEXPIREAT", "key timestamp", "Hash"}, @@ -57,6 +59,7 @@ var helpCommands = [][]string{ {"INCRBY", "key increment", "KV"}, {"INFO", "[section]", "Server"}, {"LCLEAR", "key", "List"}, + {"LDUMP", "key", "List"}, {"LEXPIRE", "key seconds", "List"}, {"LEXPIREAT", "key timestamp", "List"}, {"LINDEX", "key index", "List"}, @@ -73,6 +76,7 @@ var helpCommands = [][]string{ {"MSET", "key value [key value ...]", "KV"}, {"PERSIST", "key", "KV"}, {"PING", "-", "Server"}, + {"RESTORE", "key ttl value", "Server"}, {"ROLLBACK", "-", "Transaction"}, {"RPOP", "key", "List"}, {"RPUSH", "key value [value ...]", "List"}, @@ -84,6 +88,7 @@ var helpCommands = [][]string{ {"SCRIPT LOAD", "script", "Script"}, {"SDIFF", "key [key ...]", "Set"}, {"SDIFFSTORE", "destination key [key ...]", "Set"}, + {"SDUMP", "key", "Set"}, {"SELECT", "index", "Server"}, {"SET", "key value", "KV"}, {"SETEX", "key seconds value", "KV"}, @@ -112,6 +117,7 @@ var helpCommands = [][]string{ {"ZCARD", "key", "ZSet"}, {"ZCLEAR", "key", "ZSet"}, {"ZCOUNT", "key min max", "ZSet"}, + {"ZDUMP", "key", "ZSet"}, {"ZEXPIRE", "key seconds", "ZSet"}, {"ZEXPIREAT", "key timestamp", "ZSet"}, {"ZINCRBY", "key increment member", "ZSet"}, diff --git a/doc/DiffRedis.md b/doc/DiffRedis.md index cdcb89b..4460652 100644 --- a/doc/DiffRedis.md +++ b/doc/DiffRedis.md @@ -14,11 +14,11 @@ LedisDB has no Strings data type but KV and Bitmap, any some Keys and Strings co In Redis, `del` can delete all type data, like String, Hash, List, etc, but in LedisDB, `del` can only delete KV data. To delete other type data, you will use "clear" commands. + KV: `del`, `mdel` -+ Hash: `hclear`, `mhclear` -+ List: `lclear`, `mlclear` -+ Set: `sclear`, `msclear` -+ Zset: `zclear`, `mzclear` -+ Bitmap: `bclear`, `mbclear` ++ Hash: `hclear`, `hmclear` ++ List: `lclear`, `lmclear` ++ Set: `sclear`, `smclear` ++ Zset: `zclear`, `zmclear` ++ Bitmap: `bclear`, `bmclear` ## Expire, Persist, and TTL @@ -56,5 +56,14 @@ LedisDB supplies `xscan`, `xrevscan`, etc, to fetch data iteratively and reverse + Zset: `zxscan`, `zxrevscan` + Bitmap: `bxscan`, `bxrevscan` +## DUMP + ++ KV: `dump` ++ Hash: `hdump` ++ List: `ldump` ++ Set: `sdump` ++ ZSet: `zdump` + +LedisDB supports `dump` to serialize the value with key, the data format is the same as Redis, so you can use it in Redis and vice versa. Of course, LedisDB has not implemented all APIs in Redis, you can see full commands in commands.json, commands.doc or [wiki](https://github.com/siddontang/ledisdb/wiki/Commands). \ No newline at end of file diff --git a/doc/commands.json b/doc/commands.json index d435bc3..ee48713 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -691,5 +691,42 @@ "arguments" : "-", "group": "Server", "readonly": false + }, + + "DUMP": { + "arguments" : "key", + "group": "KV", + "readonly": true + }, + + "LDUMP": { + "arguments" : "key", + "group": "List", + "readonly": true + }, + + "HDUMP": { + "arguments" : "key", + "group": "Hash", + "readonly": true + }, + + + "SDUMP": { + "arguments" : "key", + "group": "Set", + "readonly": true + }, + + "ZDUMP": { + "arguments" : "key", + "group": "ZSet", + "readonly": true + }, + + "RESTORE": { + "arguments" : "key ttl value", + "group" : "Server", + "readonly" : false } } diff --git a/doc/commands.md b/doc/commands.md index a5c0527..6c8a235 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -29,6 +29,7 @@ Table of Contents - [PERSIST key](#persist-key) - [XSCAN key [MATCH match] [COUNT count]](#xscan-key-match-match-count-count) - [XREVSCAN key [MATCH match] [COUNT count]](#xrevscan-key-match-match-count-count) + - [DUMP key](#dump-key) - [Hash](#hash) - [HDEL key field [field ...]](#hdel-key-field-field-) - [HEXISTS key field](#hexists-key-field) @@ -49,6 +50,7 @@ Table of Contents - [HPERSIST key](#hpersist-key) - [HXSCAN key [MATCH match] [COUNT count]](#hxscan-key-match-match-count-count) - [HXREVSCAN key [MATCH match] [COUNT count]](#hxrevscan-key-match-match-count-count) + - [HDUMP key](#hdump-key) - [List](#list) - [BLPOP key [key ...] timeout](#blpop-key-key--timeout) - [BRPOP key [key ...] timeout](#brpop-key-key--timeout) @@ -67,6 +69,7 @@ Table of Contents - [LPERSIST key](#lpersist-key) - [LXSCAN key [MATCH match] [COUNT count]](#lxscan-key-match-match-count-count) - [LXREVSCAN key [MATCH match] [COUNT count]](#lxrevscan-key-match-match-count-count) + - [LDUMP key](#ldump-key) - [Set](#set) - [SADD key member [member ...]](#sadd-key-member-member-) - [SCARD key](#scard-key) @@ -87,6 +90,7 @@ Table of Contents - [SPERSIST key](#spersist-key) - [SXSCAN key [MATCH match] [COUNT count]](#sxscan-key-match-match-count-count) - [SXREVSCAN key [MATCH match] [COUNT count]](#sxrevscan-key-match-match-count-count) + - [SDUMP key](#sdump-key) - [ZSet](#zset) - [ZADD key score member [score member ...]](#zadd-key-score-member-score-member-) - [ZCARD key](#zcard-key) @@ -117,6 +121,7 @@ Table of Contents - [ZRANGEBYLEX key min max [LIMIT offset count]](#zrangebylex-key-min-max-limit-offset-count) - [ZREMRANGEBYLEX key min max](#zremrangebylex-key-min-max) - [ZLEXCOUNT key min max](#zlexcount-key-min-max) + - [ZDUMP key](#zdump-key) - [Bitmap](#bitmap) - [BGET key](#bget-key) - [BGETBIT key offset](#bgetbit-key-offset) @@ -143,6 +148,7 @@ Table of Contents - [INFO [section]](#info-section) - [TIME](#time) - [CONFIG REWRITE](#config-rewrite) + - [RESTORE key ttl value](#restore-key-ttl-value) - [Transaction](#transaction) - [BEGIN](#begin) - [ROLLBACK](#rollback) @@ -583,6 +589,22 @@ ledis>xrevscan "a" count 1 2) [] ``` +### DUMP key + +Serialize the value stored at key with KV type in a Redis-specific format like RDB and return it to the user. The returned value can be synthesized back into a key using the RESTORE command. + +**Return value** + +bulk: the serialized value + +**Examples** + +``` +ledis> set mykey 10 +OK +ledis>DUMP mykey +"\x00\xc0\n\x06\x00\xf8r?\xc5\xfb\xfb_(" +``` ## Hash @@ -959,6 +981,10 @@ Reverse iterate Hash keys incrementally. See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information. +### HDUMP key + +See [DUMP](#dump-key) for more information. + ## List ### BLPOP key [key ...] timeout @@ -1293,6 +1319,10 @@ Reverse iterate list keys incrementally. See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information. +### LDUMP key + +See [DUMP](#dump-key) for more information. + ## Set @@ -1728,6 +1758,10 @@ Reverse iterate Set keys incrementally. See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information. +### SDUMP key + +See [DUMP](#dump-key) for more information. + ## ZSet ### ZADD key score member [score member ...] @@ -2425,6 +2459,9 @@ ledis> ZLEXCOUNT myzset - [c (integer) 3 ``` +### ZDUMP key + +See [DUMP](#dump-key) for more information. ## Bitmap @@ -2719,6 +2756,14 @@ Rewrites the config file the server was started with. String: OK or error msg. +### RESTORE key ttl value + +Create a key associated with a value that is obtained by deserializing the provided serialized value (obtained via DUMP, LDUMP, HDUMP, SDUMP, ZDUMP). + +If ttl is 0 the key is created without any expire, otherwise the specified expire time (in milliseconds) is set. But you must know that now the checking ttl accuracy is second. + +RESTORE checks the RDB version and data checksum. If they don't match an error is returned. + ## Transaction ### BEGIN diff --git a/ledis/migrate.go b/ledis/migrate.go new file mode 100644 index 0000000..b6df026 --- /dev/null +++ b/ledis/migrate.go @@ -0,0 +1,188 @@ +package ledis + +import ( + "fmt" + "github.com/siddontang/ledisdb/ledis/rdb" +) + +/* + To support redis <-> ledisdb, the dump value format is the same as redis. + We will not support bitmap, and may add bit operations for kv later. + + But you must know that we use int64 for zset score, not double. + Only support rdb version 6. +*/ + +func (db *DB) Dump(key []byte) ([]byte, error) { + v, err := db.Get(key) + if err != nil { + return nil, err + } else if v == nil { + return nil, err + } + + return rdb.Dump(rdb.String(v)) +} + +func (db *DB) LDump(key []byte) ([]byte, error) { + v, err := db.LRange(key, 0, -1) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + return rdb.Dump(rdb.List(v)) +} + +func (db *DB) HDump(key []byte) ([]byte, error) { + v, err := db.HGetAll(key) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + o := make(rdb.HashMap, len(v)) + for i := 0; i < len(v); i++ { + o[i].Field = v[i].Field + o[i].Value = v[i].Value + } + + return rdb.Dump(o) +} + +func (db *DB) SDump(key []byte) ([]byte, error) { + v, err := db.SMembers(key) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + return rdb.Dump(rdb.Set(v)) +} + +func (db *DB) ZDump(key []byte) ([]byte, error) { + v, err := db.ZRangeByScore(key, MinScore, MaxScore, 0, -1) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + o := make(rdb.ZSet, len(v)) + for i := 0; i < len(v); i++ { + o[i].Member = v[i].Member + o[i].Score = float64(v[i].Score) + } + + return rdb.Dump(o) +} + +func (db *DB) Restore(key []byte, ttl int64, data []byte) error { + d, err := rdb.DecodeDump(data) + if err != nil { + return err + } + + //ttl is milliseconds, but we only support seconds + //later may support milliseconds + if ttl > 0 { + ttl = ttl / 1e3 + if ttl == 0 { + ttl = 1 + } + } + + switch value := d.(type) { + case rdb.String: + if _, err = db.Del(key); err != nil { + return err + } + + if err = db.Set(key, value); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.Expire(key, ttl); err != nil { + return err + } + } + case rdb.HashMap: + //first clear old key + if _, err = db.HClear(key); err != nil { + return err + } + + fv := make([]FVPair, len(value)) + for i := 0; i < len(value); i++ { + fv[i] = FVPair{Field: value[i].Field, Value: value[i].Value} + } + + if err = db.HMset(key, fv...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.HExpire(key, ttl); err != nil { + return err + } + } + case rdb.List: + //first clear old key + if _, err = db.LClear(key); err != nil { + return err + } + + if _, err = db.RPush(key, value...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.LExpire(key, ttl); err != nil { + return err + } + } + case rdb.ZSet: + //first clear old key + if _, err = db.ZClear(key); err != nil { + return err + } + + sp := make([]ScorePair, len(value)) + for i := 0; i < len(value); i++ { + sp[i] = ScorePair{int64(value[i].Score), value[i].Member} + } + + if _, err = db.ZAdd(key, sp...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.ZExpire(key, ttl); err != nil { + return err + } + } + case rdb.Set: + //first clear old key + if _, err = db.SClear(key); err != nil { + return err + } + + if _, err = db.SAdd(key, value...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.SExpire(key, ttl); err != nil { + return err + } + } + default: + return fmt.Errorf("invalid data type %T", d) + } + + return nil +} diff --git a/ledis/migrate_test.go b/ledis/migrate_test.go new file mode 100644 index 0000000..697fe36 --- /dev/null +++ b/ledis/migrate_test.go @@ -0,0 +1,81 @@ +package ledis + +import ( + "github.com/siddontang/ledisdb/config" + "os" + "testing" +) + +func TestMigrate(t *testing.T) { + cfg1 := config.NewConfigDefault() + cfg1.DataDir = "/tmp/test_ledisdb_migrate1" + os.RemoveAll(cfg1.DataDir) + + defer os.RemoveAll(cfg1.DataDir) + + l1, _ := Open(cfg1) + defer l1.Close() + + cfg2 := config.NewConfigDefault() + cfg2.DataDir = "/tmp/test_ledisdb_migrate2" + os.RemoveAll(cfg2.DataDir) + + defer os.RemoveAll(cfg2.DataDir) + + l2, _ := Open(cfg2) + defer l2.Close() + + db1, _ := l1.Select(0) + db2, _ := l2.Select(0) + + key := []byte("a") + lkey := []byte("a") + hkey := []byte("a") + skey := []byte("a") + zkey := []byte("a") + value := []byte("1") + + db1.Set(key, value) + + if data, err := db1.Dump(key); err != nil { + t.Fatal(err) + } else if err := db2.Restore(key, 0, data); err != nil { + t.Fatal(err) + } + + db1.RPush(lkey, []byte("1"), []byte("2"), []byte("3")) + + if data, err := db1.LDump(lkey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(lkey, 0, data); err != nil { + t.Fatal(err) + } + + db1.SAdd(skey, []byte("1"), []byte("2"), []byte("3")) + + if data, err := db1.SDump(skey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(skey, 0, data); err != nil { + t.Fatal(err) + } + + db1.HMset(hkey, FVPair{[]byte("a"), []byte("1")}, FVPair{[]byte("b"), []byte("2")}, FVPair{[]byte("c"), []byte("3")}) + + if data, err := db1.HDump(hkey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(hkey, 0, data); err != nil { + t.Fatal(err) + } + + db1.ZAdd(zkey, ScorePair{1, []byte("a")}, ScorePair{2, []byte("b")}, ScorePair{3, []byte("c")}) + + if data, err := db1.ZDump(zkey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(zkey, 0, data); err != nil { + t.Fatal(err) + } + + if err := checkLedisEqual(l1, l2); err != nil { + t.Fatal(err) + } +} diff --git a/ledis/rdb/decode.go b/ledis/rdb/decode.go new file mode 100644 index 0000000..06cd392 --- /dev/null +++ b/ledis/rdb/decode.go @@ -0,0 +1,128 @@ +package rdb + +// Copyright 2014 Wandoujia Inc. All Rights Reserved. +// Licensed under the MIT (MIT-LICENSE.txt) license. + +import "fmt" + +import ( + "github.com/cupcake/rdb" + "github.com/cupcake/rdb/nopdecoder" +) + +func DecodeDump(p []byte) (interface{}, error) { + d := &decoder{} + if err := rdb.DecodeDump(p, 0, nil, 0, d); err != nil { + return nil, err + } + return d.obj, d.err +} + +type decoder struct { + nopdecoder.NopDecoder + obj interface{} + err error +} + +func (d *decoder) initObject(obj interface{}) { + if d.err != nil { + return + } + if d.obj != nil { + d.err = fmt.Errorf("invalid object, init again") + } else { + d.obj = obj + } +} + +func (d *decoder) Set(key, value []byte, expiry int64) { + d.initObject(String(value)) +} + +func (d *decoder) StartHash(key []byte, length, expiry int64) { + d.initObject(HashMap(nil)) +} + +func (d *decoder) Hset(key, field, value []byte) { + if d.err != nil { + return + } + switch h := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a hashmap") + case HashMap: + v := struct { + Field, Value []byte + }{ + field, + value, + } + d.obj = append(h, v) + } +} + +func (d *decoder) StartSet(key []byte, cardinality, expiry int64) { + d.initObject(Set(nil)) +} + +func (d *decoder) Sadd(key, member []byte) { + if d.err != nil { + return + } + switch s := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a set") + case Set: + d.obj = append(s, member) + } +} + +func (d *decoder) StartList(key []byte, length, expiry int64) { + d.initObject(List(nil)) +} + +func (d *decoder) Rpush(key, value []byte) { + if d.err != nil { + return + } + switch l := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a list") + case List: + d.obj = append(l, value) + } +} + +func (d *decoder) StartZSet(key []byte, cardinality, expiry int64) { + d.initObject(ZSet(nil)) +} + +func (d *decoder) Zadd(key []byte, score float64, member []byte) { + if d.err != nil { + return + } + switch z := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a zset") + case ZSet: + v := struct { + Member []byte + Score float64 + }{ + member, + score, + } + d.obj = append(z, v) + } +} + +type String []byte +type List [][]byte +type HashMap []struct { + Field, Value []byte +} +type Set [][]byte +type ZSet []struct { + Member []byte + Score float64 +} diff --git a/ledis/rdb/encode.go b/ledis/rdb/encode.go new file mode 100644 index 0000000..1c33cb0 --- /dev/null +++ b/ledis/rdb/encode.go @@ -0,0 +1,52 @@ +package rdb + +import ( + "bytes" + "fmt" + "github.com/cupcake/rdb" +) + +func Dump(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + + e := rdb.NewEncoder(&buf) + + switch v := obj.(type) { + case String: + e.EncodeType(rdb.TypeString) + e.EncodeString(v) + case HashMap: + e.EncodeType(rdb.TypeHash) + e.EncodeLength(uint32(len(v))) + + for i := 0; i < len(v); i++ { + e.EncodeString(v[i].Field) + e.EncodeString(v[i].Value) + } + case List: + e.EncodeType(rdb.TypeList) + e.EncodeLength(uint32(len(v))) + for i := 0; i < len(v); i++ { + e.EncodeString(v[i]) + } + case Set: + e.EncodeType(rdb.TypeSet) + e.EncodeLength(uint32(len(v))) + for i := 0; i < len(v); i++ { + e.EncodeString(v[i]) + } + case ZSet: + e.EncodeType(rdb.TypeZSet) + e.EncodeLength(uint32(len(v))) + for i := 0; i < len(v); i++ { + e.EncodeString(v[i].Member) + e.EncodeFloat(v[i].Score) + } + default: + return nil, fmt.Errorf("invalid dump type %T", obj) + } + + e.EncodeDumpFooter() + + return buf.Bytes(), nil +} diff --git a/ledis/rdb/rdb_test.go b/ledis/rdb/rdb_test.go new file mode 100644 index 0000000..88095d2 --- /dev/null +++ b/ledis/rdb/rdb_test.go @@ -0,0 +1,23 @@ +package rdb + +import ( + "reflect" + "testing" +) + +func TestCodec(t *testing.T) { + testCodec(String("abc"), t) +} + +func testCodec(obj interface{}, t *testing.T) { + b, err := Dump(obj) + if err != nil { + t.Fatal(err) + } + + if o, err := DecodeDump(b); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(obj, o) { + t.Fatal("must equal") + } +} diff --git a/ledis/rdb/wandoujia-license b/ledis/rdb/wandoujia-license new file mode 100644 index 0000000..23320dc --- /dev/null +++ b/ledis/rdb/wandoujia-license @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Wandoujia Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 287480b..d29aa08 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -18,7 +18,7 @@ func checkLedisEqual(master *Ledis, slave *Ledis) error { if v, err := slave.ldb.Get(key); err != nil { return err } else if !bytes.Equal(v, value) { - return fmt.Errorf("replication error %d != %d", len(v), len(value)) + return fmt.Errorf("equal error at %q, %d != %d", key, len(v), len(value)) } } diff --git a/ledis/t_bit.go b/ledis/t_bit.go index 771ef90..501ea1b 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -9,6 +9,11 @@ import ( "time" ) +/* + We will not maintain bitmap anymore, and will add bit operations for kv type later. + Use your own risk. +*/ + const ( OPand uint8 = iota + 1 OPor diff --git a/ledis/t_set.go b/ledis/t_set.go index d164752..b81496d 100644 --- a/ledis/t_set.go +++ b/ledis/t_set.go @@ -374,6 +374,8 @@ func (db *DB) SMembers(key []byte) ([][]byte, error) { v := make([][]byte, 0, 16) it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + defer it.Close() + for ; it.Valid(); it.Next() { _, m, err := db.sDecodeSetKey(it.Key()) if err != nil { @@ -383,8 +385,6 @@ func (db *DB) SMembers(key []byte) ([][]byte, error) { v = append(v, m) } - it.Close() - return v, nil } diff --git a/ledis/tx_test.go b/ledis/tx_test.go index 26888b5..ba15a1c 100644 --- a/ledis/tx_test.go +++ b/ledis/tx_test.go @@ -3,6 +3,7 @@ package ledis import ( "github.com/siddontang/ledisdb/config" "os" + "strings" "testing" ) @@ -201,6 +202,9 @@ func testTx(t *testing.T, name string) { l, err := Open(cfg) if err != nil { + if strings.Contains(err.Error(), "not registered") { + return + } t.Fatal(err) } diff --git a/server/app.go b/server/app.go index 393da09..08cd5d0 100644 --- a/server/app.go +++ b/server/app.go @@ -1,6 +1,7 @@ package server import ( + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/ledis" "net" @@ -42,6 +43,9 @@ type App struct { rcm sync.Mutex rcs map[*respClient]struct{} + + migrateM sync.Mutex + migrateClients map[string]*goledis.Client } func netType(s string) string { @@ -71,6 +75,8 @@ func NewApp(cfg *config.Config) (*App, error) { app.rcs = make(map[*respClient]struct{}) + app.migrateClients = make(map[string]*goledis.Client) + var err error if app.info, err = newInfo(app); err != nil { @@ -132,6 +138,14 @@ func (app *App) Close() { app.listener.Close() + //close all migrate connections + app.migrateM.Lock() + for k, c := range app.migrateClients { + c.Close() + delete(app.migrateClients, k) + } + app.migrateM.Unlock() + if app.httpListener != nil { app.httpListener.Close() } diff --git a/server/client.go b/server/client.go index 57abc8c..5c1a9d8 100644 --- a/server/client.go +++ b/server/client.go @@ -11,25 +11,29 @@ import ( ) var txUnsupportedCmds = map[string]struct{}{ - "select": struct{}{}, - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, - "eval": struct{}{}, + "select": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "eval": struct{}{}, + "xmigrate": struct{}{}, + "xmigratedb": struct{}{}, } var scriptUnsupportedCmds = map[string]struct{}{ - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "commit": struct{}{}, - "rollback": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "commit": struct{}{}, + "rollback": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "xmigrate": struct{}{}, + "xmigratedb": struct{}{}, } type responseWriter interface { diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go new file mode 100644 index 0000000..3153821 --- /dev/null +++ b/server/cmd_migrate.go @@ -0,0 +1,396 @@ +package server + +import ( + "fmt" + goledis "github.com/siddontang/ledisdb/client/go/ledis" + "github.com/siddontang/ledisdb/ledis" + "strings" + "time" +) + +func dumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.Dump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func ldumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.LDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func hdumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.HDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func sdumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.SDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func zdumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.ZDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +// unlike redis, restore will try to delete old key first +func restoreCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + key := args[0] + ttl, err := ledis.StrInt64(args[1], nil) + if err != nil { + return err + } + data := args[2] + + if err = c.db.Restore(key, ttl, data); err != nil { + return err + } else { + c.resp.writeStatus(OK) + } + + return nil +} + +func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { + var err error + var data []byte + switch strings.ToUpper(tp) { + case "KV": + data, err = db.Dump(key) + case "HASH": + data, err = db.HDump(key) + case "LIST": + data, err = db.LDump(key) + case "SET": + data, err = db.SDump(key) + case "ZSET": + data, err = db.ZDump(key) + default: + err = fmt.Errorf("invalid key type %s", tp) + } + return data, err +} + +func xdel(db *ledis.DB, tp string, key []byte) error { + var err error + switch strings.ToUpper(tp) { + case "KV": + _, err = db.Del(key) + case "HASH": + _, err = db.HClear(key) + case "LIST": + _, err = db.LClear(key) + case "SET": + _, err = db.SClear(key) + case "ZSET": + _, err = db.ZClear(key) + default: + err = fmt.Errorf("invalid key type %s", tp) + } + return err +} + +func xttl(db *ledis.DB, tp string, key []byte) (int64, error) { + switch strings.ToUpper(tp) { + case "KV": + return db.TTL(key) + case "HASH": + return db.HTTL(key) + case "LIST": + return db.LTTL(key) + case "SET": + return db.STTL(key) + case "ZSET": + return db.ZTTL(key) + default: + return 0, fmt.Errorf("invalid key type %s", tp) + } +} + +func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) { + switch strings.ToUpper(tp) { + case "KV": + return db.Scan(nil, count, false, "") + case "HASH": + return db.HScan(nil, count, false, "") + case "LIST": + return db.LScan(nil, count, false, "") + case "SET": + return db.SScan(nil, count, false, "") + case "ZSET": + return db.ZScan(nil, count, false, "") + default: + return nil, fmt.Errorf("invalid key type %s", tp) + } +} + +func xdumpCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + tp := string(args[0]) + key := args[1] + + if data, err := xdump(c.db, tp, key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + return nil +} + +func (app *App) getMigrateClient(addr string) *goledis.Client { + app.migrateM.Lock() + + mc, ok := app.migrateClients[addr] + if !ok { + mc = goledis.NewClient(&goledis.Config{addr, 4, 0, 0}) + app.migrateClients[addr] = mc + + } + + app.migrateM.Unlock() + + return mc +} + +//XMIGRATEDB host port tp count db timeout +//select count tp type keys and migrate +//will block any other write operations +//maybe only for xcodis +func xmigratedbCommand(c *client) error { + args := c.args + if len(args) != 6 { + return ErrCmdParams + } + + addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1])) + if addr == c.app.cfg.Addr { + //same server, can not migrate + return fmt.Errorf("migrate in same server is not allowed") + } + + tp := string(args[2]) + + count, err := ledis.StrInt64(args[3], nil) + if err != nil { + return err + } else if count <= 0 { + count = 10 + } + + db, err := ledis.StrUint64(args[4], nil) + if err != nil { + return err + } else if db >= uint64(ledis.MaxDBNumber) { + return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) + } + + timeout, err := ledis.StrInt64(args[5], nil) + if err != nil { + return err + } else if timeout < 0 { + return fmt.Errorf("invalid timeout %d", timeout) + } + + m, err := c.db.Multi() + if err != nil { + return err + } + defer m.Close() + + keys, err := xscan(m.DB, tp, int(count)) + if err != nil { + return err + } else if len(keys) == 0 { + c.resp.writeInteger(0) + return nil + } + + mc := c.app.getMigrateClient(addr) + + conn := mc.Get() + + //timeout is milliseconds + t := time.Duration(timeout) * time.Millisecond + conn.SetConnectTimeout(t) + + if _, err = conn.Do("select", db); err != nil { + return err + } + + for _, key := range keys { + data, err := xdump(m.DB, tp, key) + if err != nil { + return err + } + + ttl, err := xttl(m.DB, tp, key) + if err != nil { + return err + } + + conn.SetReadDeadline(time.Now().Add(t)) + + //ttl is second, but restore need millisecond + if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { + return err + } + + if err = xdel(m.DB, tp, key); err != nil { + return err + } + + } + + c.resp.writeInteger(int64(len(keys))) + + return nil +} + +//XMIGRATE host port type key destination-db timeout +//will block any other write operations +//maybe only for xcodis +func xmigrateCommand(c *client) error { + args := c.args + + if len(args) != 6 { + return ErrCmdParams + } + + addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1])) + if addr == c.app.cfg.Addr { + //same server, can not migrate + return fmt.Errorf("migrate in same server is not allowed") + } + + tp := string(args[2]) + key := args[3] + db, err := ledis.StrUint64(args[4], nil) + if err != nil { + return err + } else if db >= uint64(ledis.MaxDBNumber) { + return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) + } + + timeout, err := ledis.StrInt64(args[5], nil) + if err != nil { + return err + } else if timeout < 0 { + return fmt.Errorf("invalid timeout %d", timeout) + } + + m, err := c.db.Multi() + if err != nil { + return err + } + defer m.Close() + + data, err := xdump(m.DB, tp, key) + if err != nil { + return err + } else if data == nil { + c.resp.writeStatus(NOKEY) + return nil + } + + ttl, err := xttl(m.DB, tp, key) + if err != nil { + return err + } + + mc := c.app.getMigrateClient(addr) + + conn := mc.Get() + + //timeout is milliseconds + t := time.Duration(timeout) * time.Millisecond + conn.SetConnectTimeout(t) + + if _, err = conn.Do("select", db); err != nil { + return err + } + + conn.SetReadDeadline(time.Now().Add(t)) + + //ttl is second, but restore need millisecond + if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { + return err + } + + if err = xdel(m.DB, tp, key); err != nil { + return err + } + + c.resp.writeStatus(OK) + return nil +} + +func init() { + register("dump", dumpCommand) + register("ldump", ldumpCommand) + register("hdump", hdumpCommand) + register("sdump", sdumpCommand) + register("zdump", zdumpCommand) + register("restore", restoreCommand) + register("xdump", xdumpCommand) + register("xmigrate", xmigrateCommand) + register("xmigratedb", xmigratedbCommand) +} diff --git a/server/cmd_migrate_test.go b/server/cmd_migrate_test.go new file mode 100644 index 0000000..1dfb2c0 --- /dev/null +++ b/server/cmd_migrate_test.go @@ -0,0 +1,128 @@ +package server + +import ( + "fmt" + "github.com/siddontang/ledisdb/client/go/ledis" + "github.com/siddontang/ledisdb/config" + "os" + "testing" + "time" +) + +func TestDumpRestore(t *testing.T) { + c := getTestConn() + defer c.Close() + + var err error + _, err = c.Do("set", "mtest_a", "1") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("rpush", "mtest_la", "1", "2", "3") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("hmset", "mtest_ha", "a", "1", "b", "2") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("sadd", "mtest_sa", "1", "2", "3") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("zadd", "mtest_za", 1, "a", 2, "b", 3, "c") + if err != nil { + t.Fatal(err) + } + + testDumpRestore(c, "dump", "mtest_a", t) + testDumpRestore(c, "ldump", "mtest_la", t) + testDumpRestore(c, "hdump", "mtest_ha", t) + testDumpRestore(c, "sdump", "mtest_sa", t) + testDumpRestore(c, "zdump", "mtest_za", t) +} + +func testDumpRestore(c *ledis.Conn, dump string, key string, t *testing.T) { + if data, err := ledis.Bytes(c.Do(dump, key)); err != nil { + t.Fatal(err) + } else if _, err := c.Do("restore", key, 0, data); err != nil { + t.Fatal(err) + } +} + +func TestMigrate(t *testing.T) { + data_dir := "/tmp/test_migrate" + os.RemoveAll(data_dir) + + s1Cfg := config.NewConfigDefault() + s1Cfg.DataDir = fmt.Sprintf("%s/s1", data_dir) + s1Cfg.Addr = "127.0.0.1:11185" + + s2Cfg := config.NewConfigDefault() + s2Cfg.DataDir = fmt.Sprintf("%s/s2", data_dir) + s2Cfg.Addr = "127.0.0.1:11186" + + s1, err := NewApp(s1Cfg) + if err != nil { + t.Fatal(err) + } + defer s1.Close() + + s2, err := NewApp(s2Cfg) + if err != nil { + t.Fatal(err) + } + defer s2.Close() + + go s1.Run() + + go s2.Run() + + time.Sleep(1 * time.Second) + + c1 := ledis.NewConn(s1Cfg.Addr) + defer c1.Close() + + c2 := ledis.NewConn(s2Cfg.Addr) + defer c2.Close() + + if _, err = c1.Do("set", "a", "1"); err != nil { + t.Fatal(err) + } + + timeout := 30000 + if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "KV", "a", 0, timeout); err != nil { + t.Fatal(err) + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } + + if num, err := ledis.Int(c2.Do("xmigratedb", "127.0.0.1", 11185, "KV", 10, 0, timeout)); err != nil { + t.Fatal(err) + } else if num != 1 { + t.Fatal(num, "must number 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } + +} diff --git a/server/const.go b/server/const.go index f1123e3..dc55e24 100644 --- a/server/const.go +++ b/server/const.go @@ -20,8 +20,9 @@ var ( NullBulk = []byte("-1") NullArray = []byte("-1") - PONG = "PONG" - OK = "OK" + PONG = "PONG" + OK = "OK" + NOKEY = "NOKEY" ) const (