From a1555c60962b7b31f9de2592db4342b433d9576b Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 25 Nov 2014 14:52:50 +0800 Subject: [PATCH 01/11] fix lmdb test error on windows --- ledis/tx_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ledis/tx_test.go b/ledis/tx_test.go index 26888b5..ba15a1c 100644 --- a/ledis/tx_test.go +++ b/ledis/tx_test.go @@ -3,6 +3,7 @@ package ledis import ( "github.com/siddontang/ledisdb/config" "os" + "strings" "testing" ) @@ -201,6 +202,9 @@ func testTx(t *testing.T, name string) { l, err := Open(cfg) if err != nil { + if strings.Contains(err.Error(), "not registered") { + return + } t.Fatal(err) } From f094ed65e3001687aba998f91d8636ce46b2a47c Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 27 Nov 2014 09:20:49 +0800 Subject: [PATCH 02/11] bug fix set close iterator --- ledis/t_set.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ledis/t_set.go b/ledis/t_set.go index d164752..b81496d 100644 --- a/ledis/t_set.go +++ b/ledis/t_set.go @@ -374,6 +374,8 @@ func (db *DB) SMembers(key []byte) ([][]byte, error) { v := make([][]byte, 0, 16) it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1) + defer it.Close() + for ; it.Valid(); it.Next() { _, m, err := db.sDecodeSetKey(it.Key()) if err != nil { @@ -383,8 +385,6 @@ func (db *DB) SMembers(key []byte) ([][]byte, error) { v = append(v, m) } - it.Close() - return v, nil } From 4bdbdb76979c3cc36cef5b627145d35b8a8d91d0 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 27 Nov 2014 14:03:44 +0800 Subject: [PATCH 03/11] add dump and restore support --- Godeps/Godeps.json | 8 +- cmd/ledis-cli/const.go | 8 +- doc/DiffRedis.md | 9 ++ doc/commands.json | 37 ++++++++ doc/commands.md | 45 ++++++++++ ledis/migrate.go | 164 ++++++++++++++++++++++++++++++++++++ ledis/migrate_test.go | 81 ++++++++++++++++++ ledis/rdb/decode.go | 128 ++++++++++++++++++++++++++++ ledis/rdb/encode.go | 52 ++++++++++++ ledis/rdb/rdb_test.go | 23 +++++ ledis/rdb/wandoujia-license | 21 +++++ ledis/replication_test.go | 2 +- ledis/t_bit.go | 5 ++ server/cmd_migrate.go | 111 ++++++++++++++++++++++++ server/cmd_migrate_test.go | 47 +++++++++++ 15 files changed, 737 insertions(+), 4 deletions(-) create mode 100644 ledis/migrate.go create mode 100644 ledis/migrate_test.go create mode 100644 ledis/rdb/decode.go create mode 100644 ledis/rdb/encode.go create mode 100644 ledis/rdb/rdb_test.go create mode 100644 ledis/rdb/wandoujia-license create mode 100644 server/cmd_migrate.go create mode 100644 server/cmd_migrate_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 4c1bf08..5aae124 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -11,8 +11,12 @@ }, { "ImportPath": "github.com/boltdb/bolt", - "Comment": "data/v1-254-gd285804", - "Rev": "d285804df1760edf4c602ecd901be5d5e67bf982" + "Comment": "data/v1-256-ge65c902", + "Rev": "e65c9027c35b7ef1014db9e02686889e51aadb2e" + }, + { + "ImportPath": "github.com/cupcake/rdb", + "Rev": "3454dcabd33cb8ea8261ffd6a45f4d836eb504cc" }, { "ImportPath": "github.com/edsrzf/mmap-go", diff --git a/cmd/ledis-cli/const.go b/cmd/ledis-cli/const.go index 5586f1a..9c8886b 100644 --- a/cmd/ledis-cli/const.go +++ b/cmd/ledis-cli/const.go @@ -1,4 +1,4 @@ -//This file was generated by .tools/generate_commands.py on Sun Oct 26 2014 15:14:39 +0800 +//This file was generated by .tools/generate_commands.py on Thu Nov 27 2014 13:42:44 +0800 package main var helpCommands = [][]string{ @@ -23,6 +23,7 @@ var helpCommands = [][]string{ {"DECR", "key", "KV"}, {"DECRBY", "key decrement", "KV"}, {"DEL", "key [key ...]", "KV"}, + {"DUMP", "key", "KV"}, {"ECHO", "message", "Server"}, {"EVAL", "script numkeys key [key ...] arg [arg ...]", "Script"}, {"EVALSHA", "sha1 numkeys key [key ...] arg [arg ...]", "Script"}, @@ -36,6 +37,7 @@ var helpCommands = [][]string{ {"GETSET", " key value", "KV"}, {"HCLEAR", "key", "Hash"}, {"HDEL", "key field [field ...]", "Hash"}, + {"HDUMP", "key", "Hash"}, {"HEXISTS", "key field", "Hash"}, {"HEXPIRE", "key seconds", "Hash"}, {"HEXPIREAT", "key timestamp", "Hash"}, @@ -57,6 +59,7 @@ var helpCommands = [][]string{ {"INCRBY", "key increment", "KV"}, {"INFO", "[section]", "Server"}, {"LCLEAR", "key", "List"}, + {"LDUMP", "key", "List"}, {"LEXPIRE", "key seconds", "List"}, {"LEXPIREAT", "key timestamp", "List"}, {"LINDEX", "key index", "List"}, @@ -73,6 +76,7 @@ var helpCommands = [][]string{ {"MSET", "key value [key value ...]", "KV"}, {"PERSIST", "key", "KV"}, {"PING", "-", "Server"}, + {"RESTORE", "key ttl value", "Server"}, {"ROLLBACK", "-", "Transaction"}, {"RPOP", "key", "List"}, {"RPUSH", "key value [value ...]", "List"}, @@ -84,6 +88,7 @@ var helpCommands = [][]string{ {"SCRIPT LOAD", "script", "Script"}, {"SDIFF", "key [key ...]", "Set"}, {"SDIFFSTORE", "destination key [key ...]", "Set"}, + {"SDUMP", "key", "Set"}, {"SELECT", "index", "Server"}, {"SET", "key value", "KV"}, {"SETEX", "key seconds value", "KV"}, @@ -112,6 +117,7 @@ var helpCommands = [][]string{ {"ZCARD", "key", "ZSet"}, {"ZCLEAR", "key", "ZSet"}, {"ZCOUNT", "key min max", "ZSet"}, + {"ZDUMP", "key", "ZSet"}, {"ZEXPIRE", "key seconds", "ZSet"}, {"ZEXPIREAT", "key timestamp", "ZSet"}, {"ZINCRBY", "key increment member", "ZSet"}, diff --git a/doc/DiffRedis.md b/doc/DiffRedis.md index cdcb89b..a42c1ad 100644 --- a/doc/DiffRedis.md +++ b/doc/DiffRedis.md @@ -56,5 +56,14 @@ LedisDB supplies `xscan`, `xrevscan`, etc, to fetch data iteratively and reverse + Zset: `zxscan`, `zxrevscan` + Bitmap: `bxscan`, `bxrevscan` +## DUMP + ++ KV: `dump` ++ Hash: `hdump` ++ List: `ldump` ++ Set: `sdump` ++ ZSet: `zdump` + +LedisDB supports `dump` to serialize the value with key, the data format is the same as Redis, so you can use it in Redis and vice versa. Of course, LedisDB has not implemented all APIs in Redis, you can see full commands in commands.json, commands.doc or [wiki](https://github.com/siddontang/ledisdb/wiki/Commands). \ No newline at end of file diff --git a/doc/commands.json b/doc/commands.json index d435bc3..ee48713 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -691,5 +691,42 @@ "arguments" : "-", "group": "Server", "readonly": false + }, + + "DUMP": { + "arguments" : "key", + "group": "KV", + "readonly": true + }, + + "LDUMP": { + "arguments" : "key", + "group": "List", + "readonly": true + }, + + "HDUMP": { + "arguments" : "key", + "group": "Hash", + "readonly": true + }, + + + "SDUMP": { + "arguments" : "key", + "group": "Set", + "readonly": true + }, + + "ZDUMP": { + "arguments" : "key", + "group": "ZSet", + "readonly": true + }, + + "RESTORE": { + "arguments" : "key ttl value", + "group" : "Server", + "readonly" : false } } diff --git a/doc/commands.md b/doc/commands.md index a5c0527..e1e9637 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -29,6 +29,7 @@ Table of Contents - [PERSIST key](#persist-key) - [XSCAN key [MATCH match] [COUNT count]](#xscan-key-match-match-count-count) - [XREVSCAN key [MATCH match] [COUNT count]](#xrevscan-key-match-match-count-count) + - [DUMP key](#dump-key) - [Hash](#hash) - [HDEL key field [field ...]](#hdel-key-field-field-) - [HEXISTS key field](#hexists-key-field) @@ -49,6 +50,7 @@ Table of Contents - [HPERSIST key](#hpersist-key) - [HXSCAN key [MATCH match] [COUNT count]](#hxscan-key-match-match-count-count) - [HXREVSCAN key [MATCH match] [COUNT count]](#hxrevscan-key-match-match-count-count) + - [HDUMP key](#hdump-key) - [List](#list) - [BLPOP key [key ...] timeout](#blpop-key-key--timeout) - [BRPOP key [key ...] timeout](#brpop-key-key--timeout) @@ -67,6 +69,7 @@ Table of Contents - [LPERSIST key](#lpersist-key) - [LXSCAN key [MATCH match] [COUNT count]](#lxscan-key-match-match-count-count) - [LXREVSCAN key [MATCH match] [COUNT count]](#lxrevscan-key-match-match-count-count) + - [LDUMP key](#ldump-key) - [Set](#set) - [SADD key member [member ...]](#sadd-key-member-member-) - [SCARD key](#scard-key) @@ -87,6 +90,7 @@ Table of Contents - [SPERSIST key](#spersist-key) - [SXSCAN key [MATCH match] [COUNT count]](#sxscan-key-match-match-count-count) - [SXREVSCAN key [MATCH match] [COUNT count]](#sxrevscan-key-match-match-count-count) + - [SDUMP key](#sdump-key) - [ZSet](#zset) - [ZADD key score member [score member ...]](#zadd-key-score-member-score-member-) - [ZCARD key](#zcard-key) @@ -117,6 +121,7 @@ Table of Contents - [ZRANGEBYLEX key min max [LIMIT offset count]](#zrangebylex-key-min-max-limit-offset-count) - [ZREMRANGEBYLEX key min max](#zremrangebylex-key-min-max) - [ZLEXCOUNT key min max](#zlexcount-key-min-max) + - [ZDUMP key](#zdump-key) - [Bitmap](#bitmap) - [BGET key](#bget-key) - [BGETBIT key offset](#bgetbit-key-offset) @@ -143,6 +148,7 @@ Table of Contents - [INFO [section]](#info-section) - [TIME](#time) - [CONFIG REWRITE](#config-rewrite) + - [RESTORE](#restore-key-ttl-value) - [Transaction](#transaction) - [BEGIN](#begin) - [ROLLBACK](#rollback) @@ -583,6 +589,22 @@ ledis>xrevscan "a" count 1 2) [] ``` +### DUMP key + +Serialize the value stored at key with KV type in a Redis-specific format like RDB and return it to the user. The returned value can be synthesized back into a key using the RESTORE command. + +**Return value** + +bulk: the serialized value + +**Examples** + +``` +ledis> set mykey 10 +OK +ledis>DUMP mykey +"\x00\xc0\n\x06\x00\xf8r?\xc5\xfb\xfb_(" +``` ## Hash @@ -959,6 +981,10 @@ Reverse iterate Hash keys incrementally. See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information. +### HDUMP key + +See [DUMP](#dump-key) for more information. + ## List ### BLPOP key [key ...] timeout @@ -1293,6 +1319,10 @@ Reverse iterate list keys incrementally. See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information. +### LDUMP key + +See [DUMP](#dump-key) for more information. + ## Set @@ -1728,6 +1758,10 @@ Reverse iterate Set keys incrementally. See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information. +### SDUMP key + +See [DUMP](#dump-key) for more information. + ## ZSet ### ZADD key score member [score member ...] @@ -2425,6 +2459,9 @@ ledis> ZLEXCOUNT myzset - [c (integer) 3 ``` +### ZDUMP key + +See [DUMP](#dump-key) for more information. ## Bitmap @@ -2719,6 +2756,14 @@ Rewrites the config file the server was started with. String: OK or error msg. +### RESTORE key ttl value + +Create a key associated with a value that is obtained by deserializing the provided serialized value (obtained via DUMP, LDUMP, HDUMP, SDUMP, ZDUMP). + +If ttl is 0 the key is created without any expire, otherwise the specified expire time (in milliseconds) is set. But you must know that now the checking ttl accuracy is second. + +RESTORE checks the RDB version and data checksum. If they don't match an error is returned. + ## Transaction ### BEGIN diff --git a/ledis/migrate.go b/ledis/migrate.go new file mode 100644 index 0000000..16d500f --- /dev/null +++ b/ledis/migrate.go @@ -0,0 +1,164 @@ +package ledis + +import ( + "fmt" + "github.com/siddontang/ledisdb/ledis/rdb" +) + +/* + To support redis <-> ledisdb, the dump value format is the same as redis. + We will not support bitmap, and may add bit operations for kv later. + + But you must know that we use int64 for zset score, not double. + Only support rdb version 6. +*/ + +func (db *DB) Dump(key []byte) ([]byte, error) { + v, err := db.Get(key) + if err != nil { + return nil, err + } else if v == nil { + return nil, err + } + + return rdb.Dump(rdb.String(v)) +} + +func (db *DB) LDump(key []byte) ([]byte, error) { + v, err := db.LRange(key, 0, -1) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + return rdb.Dump(rdb.List(v)) +} + +func (db *DB) HDump(key []byte) ([]byte, error) { + v, err := db.HGetAll(key) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + o := make(rdb.HashMap, len(v)) + for i := 0; i < len(v); i++ { + o[i].Field = v[i].Field + o[i].Value = v[i].Value + } + + return rdb.Dump(o) +} + +func (db *DB) SDump(key []byte) ([]byte, error) { + v, err := db.SMembers(key) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + return rdb.Dump(rdb.Set(v)) +} + +func (db *DB) ZDump(key []byte) ([]byte, error) { + v, err := db.ZRangeByScore(key, MinScore, MaxScore, 0, -1) + if err != nil { + return nil, err + } else if len(v) == 0 { + return nil, err + } + + o := make(rdb.ZSet, len(v)) + for i := 0; i < len(v); i++ { + o[i].Member = v[i].Member + o[i].Score = float64(v[i].Score) + } + + return rdb.Dump(o) +} + +func (db *DB) Restore(key []byte, ttl int64, data []byte) error { + d, err := rdb.DecodeDump(data) + if err != nil { + return err + } + + //ttl is milliseconds, but we only support seconds + //later may support milliseconds + if ttl > 0 { + ttl = ttl / 1e3 + if ttl == 0 { + ttl = 1 + } + } + + switch value := d.(type) { + case rdb.String: + if err = db.Set(key, value); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.Expire(key, ttl); err != nil { + return err + } + } + case rdb.HashMap: + fv := make([]FVPair, len(value)) + for i := 0; i < len(value); i++ { + fv[i] = FVPair{Field: value[i].Field, Value: value[i].Value} + } + + if err = db.HMset(key, fv...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.HExpire(key, ttl); err != nil { + return err + } + } + case rdb.List: + if _, err = db.RPush(key, value...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.LExpire(key, ttl); err != nil { + return err + } + } + case rdb.ZSet: + sp := make([]ScorePair, len(value)) + for i := 0; i < len(value); i++ { + sp[i] = ScorePair{int64(value[i].Score), value[i].Member} + } + + if _, err = db.ZAdd(key, sp...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.ZExpire(key, ttl); err != nil { + return err + } + } + case rdb.Set: + if _, err = db.SAdd(key, value...); err != nil { + return err + } + + if ttl > 0 { + if _, err = db.SExpire(key, ttl); err != nil { + return err + } + } + default: + return fmt.Errorf("invalid data type %T", d) + } + + return nil +} diff --git a/ledis/migrate_test.go b/ledis/migrate_test.go new file mode 100644 index 0000000..697fe36 --- /dev/null +++ b/ledis/migrate_test.go @@ -0,0 +1,81 @@ +package ledis + +import ( + "github.com/siddontang/ledisdb/config" + "os" + "testing" +) + +func TestMigrate(t *testing.T) { + cfg1 := config.NewConfigDefault() + cfg1.DataDir = "/tmp/test_ledisdb_migrate1" + os.RemoveAll(cfg1.DataDir) + + defer os.RemoveAll(cfg1.DataDir) + + l1, _ := Open(cfg1) + defer l1.Close() + + cfg2 := config.NewConfigDefault() + cfg2.DataDir = "/tmp/test_ledisdb_migrate2" + os.RemoveAll(cfg2.DataDir) + + defer os.RemoveAll(cfg2.DataDir) + + l2, _ := Open(cfg2) + defer l2.Close() + + db1, _ := l1.Select(0) + db2, _ := l2.Select(0) + + key := []byte("a") + lkey := []byte("a") + hkey := []byte("a") + skey := []byte("a") + zkey := []byte("a") + value := []byte("1") + + db1.Set(key, value) + + if data, err := db1.Dump(key); err != nil { + t.Fatal(err) + } else if err := db2.Restore(key, 0, data); err != nil { + t.Fatal(err) + } + + db1.RPush(lkey, []byte("1"), []byte("2"), []byte("3")) + + if data, err := db1.LDump(lkey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(lkey, 0, data); err != nil { + t.Fatal(err) + } + + db1.SAdd(skey, []byte("1"), []byte("2"), []byte("3")) + + if data, err := db1.SDump(skey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(skey, 0, data); err != nil { + t.Fatal(err) + } + + db1.HMset(hkey, FVPair{[]byte("a"), []byte("1")}, FVPair{[]byte("b"), []byte("2")}, FVPair{[]byte("c"), []byte("3")}) + + if data, err := db1.HDump(hkey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(hkey, 0, data); err != nil { + t.Fatal(err) + } + + db1.ZAdd(zkey, ScorePair{1, []byte("a")}, ScorePair{2, []byte("b")}, ScorePair{3, []byte("c")}) + + if data, err := db1.ZDump(zkey); err != nil { + t.Fatal(err) + } else if err := db2.Restore(zkey, 0, data); err != nil { + t.Fatal(err) + } + + if err := checkLedisEqual(l1, l2); err != nil { + t.Fatal(err) + } +} diff --git a/ledis/rdb/decode.go b/ledis/rdb/decode.go new file mode 100644 index 0000000..06cd392 --- /dev/null +++ b/ledis/rdb/decode.go @@ -0,0 +1,128 @@ +package rdb + +// Copyright 2014 Wandoujia Inc. All Rights Reserved. +// Licensed under the MIT (MIT-LICENSE.txt) license. + +import "fmt" + +import ( + "github.com/cupcake/rdb" + "github.com/cupcake/rdb/nopdecoder" +) + +func DecodeDump(p []byte) (interface{}, error) { + d := &decoder{} + if err := rdb.DecodeDump(p, 0, nil, 0, d); err != nil { + return nil, err + } + return d.obj, d.err +} + +type decoder struct { + nopdecoder.NopDecoder + obj interface{} + err error +} + +func (d *decoder) initObject(obj interface{}) { + if d.err != nil { + return + } + if d.obj != nil { + d.err = fmt.Errorf("invalid object, init again") + } else { + d.obj = obj + } +} + +func (d *decoder) Set(key, value []byte, expiry int64) { + d.initObject(String(value)) +} + +func (d *decoder) StartHash(key []byte, length, expiry int64) { + d.initObject(HashMap(nil)) +} + +func (d *decoder) Hset(key, field, value []byte) { + if d.err != nil { + return + } + switch h := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a hashmap") + case HashMap: + v := struct { + Field, Value []byte + }{ + field, + value, + } + d.obj = append(h, v) + } +} + +func (d *decoder) StartSet(key []byte, cardinality, expiry int64) { + d.initObject(Set(nil)) +} + +func (d *decoder) Sadd(key, member []byte) { + if d.err != nil { + return + } + switch s := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a set") + case Set: + d.obj = append(s, member) + } +} + +func (d *decoder) StartList(key []byte, length, expiry int64) { + d.initObject(List(nil)) +} + +func (d *decoder) Rpush(key, value []byte) { + if d.err != nil { + return + } + switch l := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a list") + case List: + d.obj = append(l, value) + } +} + +func (d *decoder) StartZSet(key []byte, cardinality, expiry int64) { + d.initObject(ZSet(nil)) +} + +func (d *decoder) Zadd(key []byte, score float64, member []byte) { + if d.err != nil { + return + } + switch z := d.obj.(type) { + default: + d.err = fmt.Errorf("invalid object, not a zset") + case ZSet: + v := struct { + Member []byte + Score float64 + }{ + member, + score, + } + d.obj = append(z, v) + } +} + +type String []byte +type List [][]byte +type HashMap []struct { + Field, Value []byte +} +type Set [][]byte +type ZSet []struct { + Member []byte + Score float64 +} diff --git a/ledis/rdb/encode.go b/ledis/rdb/encode.go new file mode 100644 index 0000000..1c33cb0 --- /dev/null +++ b/ledis/rdb/encode.go @@ -0,0 +1,52 @@ +package rdb + +import ( + "bytes" + "fmt" + "github.com/cupcake/rdb" +) + +func Dump(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + + e := rdb.NewEncoder(&buf) + + switch v := obj.(type) { + case String: + e.EncodeType(rdb.TypeString) + e.EncodeString(v) + case HashMap: + e.EncodeType(rdb.TypeHash) + e.EncodeLength(uint32(len(v))) + + for i := 0; i < len(v); i++ { + e.EncodeString(v[i].Field) + e.EncodeString(v[i].Value) + } + case List: + e.EncodeType(rdb.TypeList) + e.EncodeLength(uint32(len(v))) + for i := 0; i < len(v); i++ { + e.EncodeString(v[i]) + } + case Set: + e.EncodeType(rdb.TypeSet) + e.EncodeLength(uint32(len(v))) + for i := 0; i < len(v); i++ { + e.EncodeString(v[i]) + } + case ZSet: + e.EncodeType(rdb.TypeZSet) + e.EncodeLength(uint32(len(v))) + for i := 0; i < len(v); i++ { + e.EncodeString(v[i].Member) + e.EncodeFloat(v[i].Score) + } + default: + return nil, fmt.Errorf("invalid dump type %T", obj) + } + + e.EncodeDumpFooter() + + return buf.Bytes(), nil +} diff --git a/ledis/rdb/rdb_test.go b/ledis/rdb/rdb_test.go new file mode 100644 index 0000000..88095d2 --- /dev/null +++ b/ledis/rdb/rdb_test.go @@ -0,0 +1,23 @@ +package rdb + +import ( + "reflect" + "testing" +) + +func TestCodec(t *testing.T) { + testCodec(String("abc"), t) +} + +func testCodec(obj interface{}, t *testing.T) { + b, err := Dump(obj) + if err != nil { + t.Fatal(err) + } + + if o, err := DecodeDump(b); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(obj, o) { + t.Fatal("must equal") + } +} diff --git a/ledis/rdb/wandoujia-license b/ledis/rdb/wandoujia-license new file mode 100644 index 0000000..23320dc --- /dev/null +++ b/ledis/rdb/wandoujia-license @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Wandoujia Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/ledis/replication_test.go b/ledis/replication_test.go index 287480b..d29aa08 100644 --- a/ledis/replication_test.go +++ b/ledis/replication_test.go @@ -18,7 +18,7 @@ func checkLedisEqual(master *Ledis, slave *Ledis) error { if v, err := slave.ldb.Get(key); err != nil { return err } else if !bytes.Equal(v, value) { - return fmt.Errorf("replication error %d != %d", len(v), len(value)) + return fmt.Errorf("equal error at %q, %d != %d", key, len(v), len(value)) } } diff --git a/ledis/t_bit.go b/ledis/t_bit.go index 771ef90..501ea1b 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -9,6 +9,11 @@ import ( "time" ) +/* + We will not maintain bitmap anymore, and will add bit operations for kv type later. + Use your own risk. +*/ + const ( OPand uint8 = iota + 1 OPor diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go new file mode 100644 index 0000000..c18f6f0 --- /dev/null +++ b/server/cmd_migrate.go @@ -0,0 +1,111 @@ +package server + +import ( + "github.com/siddontang/ledisdb/ledis" +) + +func dumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.Dump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func ldumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.LDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func hdumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.HDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func sdumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.SDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func zdumpCommand(c *client) error { + if len(c.args) != 1 { + return ErrCmdParams + } + + key := c.args[0] + if data, err := c.db.ZDump(key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + + return nil +} + +func restoreCommand(c *client) error { + args := c.args + if len(args) != 3 { + return ErrCmdParams + } + + key := args[0] + ttl, err := ledis.StrInt64(args[1], nil) + if err != nil { + return err + } + data := args[2] + + if err = c.db.Restore(key, ttl, data); err != nil { + return err + } else { + c.resp.writeStatus(OK) + } + + return nil +} + +func init() { + register("dump", dumpCommand) + register("ldump", ldumpCommand) + register("hdump", hdumpCommand) + register("sdump", sdumpCommand) + register("zdump", zdumpCommand) + register("restore", restoreCommand) +} diff --git a/server/cmd_migrate_test.go b/server/cmd_migrate_test.go new file mode 100644 index 0000000..cc71c18 --- /dev/null +++ b/server/cmd_migrate_test.go @@ -0,0 +1,47 @@ +package server + +import ( + "github.com/siddontang/ledisdb/client/go/ledis" + "testing" +) + +func TestMigrate(t *testing.T) { + c := getTestConn() + defer c.Close() + + var err error + _, err = c.Do("set", "mtest_a", "1") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("rpush", "mtest_la", "1", "2", "3") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("hmset", "mtest_ha", "a", "1", "b", "2") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("sadd", "mtest_sa", "1", "2", "3") + if err != nil { + t.Fatal(err) + } + _, err = c.Do("zadd", "mtest_za", 1, "a", 2, "b", 3, "c") + if err != nil { + t.Fatal(err) + } + + testMigrate(c, "dump", "mtest_a", t) + testMigrate(c, "ldump", "mtest_la", t) + testMigrate(c, "hdump", "mtest_ha", t) + testMigrate(c, "sdump", "mtest_sa", t) + testMigrate(c, "zdump", "mtest_za", t) +} + +func testMigrate(c *ledis.Conn, dump string, key string, t *testing.T) { + if data, err := ledis.Bytes(c.Do(dump, key)); err != nil { + t.Fatal(err) + } else if _, err := c.Do("restore", key, 0, data); err != nil { + t.Fatal(err) + } +} From 69aaba85093a1640c53ba15528e6a6e954bbaf46 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 27 Nov 2014 14:06:00 +0800 Subject: [PATCH 04/11] update doc --- doc/commands.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/commands.md b/doc/commands.md index e1e9637..6c8a235 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -148,7 +148,7 @@ Table of Contents - [INFO [section]](#info-section) - [TIME](#time) - [CONFIG REWRITE](#config-rewrite) - - [RESTORE](#restore-key-ttl-value) + - [RESTORE key ttl value](#restore-key-ttl-value) - [Transaction](#transaction) - [BEGIN](#begin) - [ROLLBACK](#rollback) From 026a72b58e591aad927c4b77ac81886423080f0f Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 27 Nov 2014 20:51:21 +0800 Subject: [PATCH 05/11] Merge branch 'copy-bugfix' into develop --- cmd/ledis-cli/linenoise.go | 2 +- rpl/file_store.go | 7 ++++--- server/snapshot.go | 3 +-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/ledis-cli/linenoise.go b/cmd/ledis-cli/linenoise.go index 9919572..4bb49af 100644 --- a/cmd/ledis-cli/linenoise.go +++ b/cmd/ledis-cli/linenoise.go @@ -21,7 +21,7 @@ func line(prompt string) (string, error) { defer C.free(unsafe.Pointer(resultCString)) if resultCString == nil { - return "", errors.New("quited by a signal") + return "", errors.New("exiting due to signal") } result := C.GoString(resultCString) diff --git a/rpl/file_store.go b/rpl/file_store.go index 161ab8d..5f4c324 100644 --- a/rpl/file_store.go +++ b/rpl/file_store.go @@ -196,14 +196,15 @@ func (s *FileStore) storeLog(l *Log) error { func (s *FileStore) PurgeExpired(n int64) error { s.rm.Lock() - purges := []*tableReader{} + var purges []*tableReader t := uint32(time.Now().Unix() - int64(n)) for i, r := range s.rs { if r.lastTime > t { - purges = s.rs[0:i] - s.rs = s.rs[i:] + purges = append([]*tableReader{}, s.rs[0:i]...) + n := copy(s.rs, s.rs[i:]) + s.rs = s.rs[0:n] break } } diff --git a/server/snapshot.go b/server/snapshot.go index 67192ea..b238545 100644 --- a/server/snapshot.go +++ b/server/snapshot.go @@ -137,8 +137,7 @@ func (s *snapshotStore) purge(create bool) { } if num > 0 { - names = s.names[0:num] - + names = append([]string{}, s.names[0:num]...) n := copy(s.names, s.names[num:]) s.names = s.names[0:n] } From e43d2ea08696a877b5f3cdaac7d4aad006f9c9a9 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 28 Nov 2014 15:48:16 +0800 Subject: [PATCH 06/11] update doc --- doc/DiffRedis.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/DiffRedis.md b/doc/DiffRedis.md index a42c1ad..4460652 100644 --- a/doc/DiffRedis.md +++ b/doc/DiffRedis.md @@ -14,11 +14,11 @@ LedisDB has no Strings data type but KV and Bitmap, any some Keys and Strings co In Redis, `del` can delete all type data, like String, Hash, List, etc, but in LedisDB, `del` can only delete KV data. To delete other type data, you will use "clear" commands. + KV: `del`, `mdel` -+ Hash: `hclear`, `mhclear` -+ List: `lclear`, `mlclear` -+ Set: `sclear`, `msclear` -+ Zset: `zclear`, `mzclear` -+ Bitmap: `bclear`, `mbclear` ++ Hash: `hclear`, `hmclear` ++ List: `lclear`, `lmclear` ++ Set: `sclear`, `smclear` ++ Zset: `zclear`, `zmclear` ++ Bitmap: `bclear`, `bmclear` ## Expire, Persist, and TTL From 0ee6860979168d5c612d7f9110ab83a520d47047 Mon Sep 17 00:00:00 2001 From: siddontang Date: Sun, 30 Nov 2014 22:38:56 +0800 Subject: [PATCH 07/11] must clear key before restore --- ledis/migrate.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/ledis/migrate.go b/ledis/migrate.go index 16d500f..b6df026 100644 --- a/ledis/migrate.go +++ b/ledis/migrate.go @@ -97,6 +97,10 @@ func (db *DB) Restore(key []byte, ttl int64, data []byte) error { switch value := d.(type) { case rdb.String: + if _, err = db.Del(key); err != nil { + return err + } + if err = db.Set(key, value); err != nil { return err } @@ -107,6 +111,11 @@ func (db *DB) Restore(key []byte, ttl int64, data []byte) error { } } case rdb.HashMap: + //first clear old key + if _, err = db.HClear(key); err != nil { + return err + } + fv := make([]FVPair, len(value)) for i := 0; i < len(value); i++ { fv[i] = FVPair{Field: value[i].Field, Value: value[i].Value} @@ -122,6 +131,11 @@ func (db *DB) Restore(key []byte, ttl int64, data []byte) error { } } case rdb.List: + //first clear old key + if _, err = db.LClear(key); err != nil { + return err + } + if _, err = db.RPush(key, value...); err != nil { return err } @@ -132,6 +146,11 @@ func (db *DB) Restore(key []byte, ttl int64, data []byte) error { } } case rdb.ZSet: + //first clear old key + if _, err = db.ZClear(key); err != nil { + return err + } + sp := make([]ScorePair, len(value)) for i := 0; i < len(value); i++ { sp[i] = ScorePair{int64(value[i].Score), value[i].Member} @@ -147,6 +166,11 @@ func (db *DB) Restore(key []byte, ttl int64, data []byte) error { } } case rdb.Set: + //first clear old key + if _, err = db.SClear(key); err != nil { + return err + } + if _, err = db.SAdd(key, value...); err != nil { return err } From 07e3705b2210795615207c18f5e2a24e7239de64 Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 1 Dec 2014 17:50:48 +0800 Subject: [PATCH 08/11] add migrate command, improved later --- client/go/ledis/conn.go | 11 +++- server/app.go | 14 +++++ server/cmd_migrate.go | 136 ++++++++++++++++++++++++++++++++++++++++ server/const.go | 5 +- 4 files changed, 163 insertions(+), 3 deletions(-) diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 1c988b6..2b8e411 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -9,6 +9,7 @@ import ( "net" "strconv" "sync" + "time" ) // Error represents an error returned in a command reply. @@ -40,6 +41,8 @@ type Conn struct { // Scratch space for formatting integers and floats. numScratch [40]byte + + connectTimeout time.Duration } func NewConn(addr string) *Conn { @@ -69,6 +72,12 @@ func (c *Conn) Close() { } } +func (c *Conn) SetConnectTimeout(t time.Duration) { + c.cm.Lock() + c.connectTimeout = t + c.cm.Unlock() +} + func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { if err := c.Send(cmd, args...); err != nil { return nil, err @@ -144,7 +153,7 @@ func (c *Conn) connect() error { } var err error - c.c, err = net.Dial(getProto(c.addr), c.addr) + c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout) if err != nil { c.c = nil return err diff --git a/server/app.go b/server/app.go index 393da09..09cba2c 100644 --- a/server/app.go +++ b/server/app.go @@ -1,6 +1,7 @@ package server import ( + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/config" "github.com/siddontang/ledisdb/ledis" "net" @@ -42,6 +43,9 @@ type App struct { rcm sync.Mutex rcs map[*respClient]struct{} + + migrateConnM sync.Mutex + migrateConns map[string]*goledis.Conn } func netType(s string) string { @@ -71,6 +75,8 @@ func NewApp(cfg *config.Config) (*App, error) { app.rcs = make(map[*respClient]struct{}) + app.migrateConns = make(map[string]*goledis.Conn) + var err error if app.info, err = newInfo(app); err != nil { @@ -132,6 +138,14 @@ func (app *App) Close() { app.listener.Close() + //close all migrate connections + app.migrateConnM.Lock() + for k, c := range app.migrateConns { + c.Close() + delete(app.migrateConns, k) + } + app.migrateConnM.Unlock() + if app.httpListener != nil { app.httpListener.Close() } diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index c18f6f0..1295bb5 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -1,7 +1,11 @@ package server import ( + "fmt" + goledis "github.com/siddontang/ledisdb/client/go/ledis" "github.com/siddontang/ledisdb/ledis" + "strings" + "time" ) func dumpCommand(c *client) error { @@ -79,6 +83,7 @@ func zdumpCommand(c *client) error { return nil } +// unlike redis, restore will try to delete old key first func restoreCommand(c *client) error { args := c.args if len(args) != 3 { @@ -101,6 +106,135 @@ func restoreCommand(c *client) error { return nil } +func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { + var err error + var data []byte + switch tp { + case "kv": + data, err = db.Dump(key) + case "hash": + data, err = db.HDump(key) + case "list": + data, err = db.LDump(key) + case "set": + data, err = db.SDump(key) + case "zset": + data, err = db.ZDump(key) + default: + err = fmt.Errorf("invalid key type %s", tp) + } + return data, err +} + +func xdel(db *ledis.DB, tp string, key []byte) error { + var err error + switch tp { + case "kv": + _, err = db.Del(key) + case "hash": + _, err = db.HClear(key) + case "list": + _, err = db.LClear(key) + case "set": + _, err = db.SClear(key) + case "zset": + _, err = db.ZClear(key) + default: + err = fmt.Errorf("invalid key type %s", tp) + } + return err +} + +func xdumpCommand(c *client) error { + args := c.args + if len(args) != 2 { + return ErrCmdParams + } + + tp := string(args[0]) + key := args[1] + + if data, err := xdump(c.db, tp, key); err != nil { + return err + } else { + c.resp.writeBulk(data) + } + return nil +} + +//XMIGRATE host port type key destination-db timeout [COPY] +func xmigrateCommand(c *client) error { + args := c.args + + if len(args) != 6 && len(args) != 7 { + return ErrCmdParams + } + + addr := fmt.Sprintf("%s:%d", string(args[0]), string(args[1])) + tp := string(args[2]) + key := args[3] + db, err := ledis.StrUint64(args[4], nil) + if err != nil { + return err + } else if db >= uint64(ledis.MaxDBNumber) { + return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) + } + var timeout int64 + timeout, err = ledis.StrInt64(args[5], nil) + if err != nil { + return err + } else if timeout < 0 { + return fmt.Errorf("invalid timeout %d", timeout) + } + + onlyCopy := false + if len(args) == 7 { + if strings.ToUpper(string(args[6])) == "COPY" { + onlyCopy = true + } + } + + var m *ledis.Multi + if m, err = c.db.Multi(); err != nil { + return err + } + defer m.Close() + + var data []byte + data, err = xdump(m.DB, tp, key) + if err != nil { + return err + } else if data == nil { + c.resp.writeStatus(NOKEY) + return nil + } + + c.app.migrateConnM.Lock() + defer c.app.migrateConnM.Unlock() + + conn, ok := c.app.migrateConns[addr] + if !ok { + conn = goledis.NewConn(addr) + c.app.migrateConns[addr] = conn + } + + //timeout is milliseconds + conn.SetConnectTimeout(time.Duration(timeout) * time.Millisecond) + + if _, err = conn.Do("restore", key, data); err != nil { + return err + } + + if !onlyCopy { + if err = xdel(m.DB, tp, key); err != nil { + return err + } + } + + c.resp.writeStatus(OK) + return nil +} + func init() { register("dump", dumpCommand) register("ldump", ldumpCommand) @@ -108,4 +242,6 @@ func init() { register("sdump", sdumpCommand) register("zdump", zdumpCommand) register("restore", restoreCommand) + register("xdump", xdumpCommand) + register("xmigrate", xmigrateCommand) } diff --git a/server/const.go b/server/const.go index f1123e3..dc55e24 100644 --- a/server/const.go +++ b/server/const.go @@ -20,8 +20,9 @@ var ( NullBulk = []byte("-1") NullArray = []byte("-1") - PONG = "PONG" - OK = "OK" + PONG = "PONG" + OK = "OK" + NOKEY = "NOKEY" ) const ( From 3055e6b2fca0e65671b31bb8b4bc3e870e0b20b7 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 3 Dec 2014 16:27:52 +0800 Subject: [PATCH 09/11] add xmigrate xmigrate will be used in xcodis --- client/go/ledis/conn.go | 36 +++++- server/app.go | 14 +-- server/client.go | 36 +++--- server/cmd_migrate.go | 243 ++++++++++++++++++++++++++++++------- server/cmd_migrate_test.go | 95 +++++++++++++-- 5 files changed, 346 insertions(+), 78 deletions(-) diff --git a/client/go/ledis/conn.go b/client/go/ledis/conn.go index 2b8e411..12f5f00 100644 --- a/client/go/ledis/conn.go +++ b/client/go/ledis/conn.go @@ -8,6 +8,7 @@ import ( "io" "net" "strconv" + "strings" "sync" "time" ) @@ -78,6 +79,22 @@ func (c *Conn) SetConnectTimeout(t time.Duration) { c.cm.Unlock() } +func (c *Conn) SetReadDeadline(t time.Time) { + c.cm.Lock() + if c.c != nil { + c.c.SetReadDeadline(t) + } + c.cm.Unlock() +} + +func (c *Conn) SetWriteDeadline(t time.Time) { + c.cm.Lock() + if c.c != nil { + c.c.SetWriteDeadline(t) + } + c.cm.Unlock() +} + func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { if err := c.Send(cmd, args...); err != nil { return nil, err @@ -87,6 +104,21 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { } func (c *Conn) Send(cmd string, args ...interface{}) error { + var err error + for i := 0; i < 2; i++ { + if err = c.send(cmd, args...); err != nil { + if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") { + //send to a closed connection, try again + continue + } + } else { + return nil + } + } + return err +} + +func (c *Conn) send(cmd string, args ...interface{}) error { if err := c.connect(); err != nil { return err } @@ -138,7 +170,9 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error { func (c *Conn) finalize() { c.cm.Lock() if !c.closed { - c.c.Close() + if c.c != nil { + c.c.Close() + } c.closed = true } c.cm.Unlock() diff --git a/server/app.go b/server/app.go index 09cba2c..08cd5d0 100644 --- a/server/app.go +++ b/server/app.go @@ -44,8 +44,8 @@ type App struct { rcm sync.Mutex rcs map[*respClient]struct{} - migrateConnM sync.Mutex - migrateConns map[string]*goledis.Conn + migrateM sync.Mutex + migrateClients map[string]*goledis.Client } func netType(s string) string { @@ -75,7 +75,7 @@ func NewApp(cfg *config.Config) (*App, error) { app.rcs = make(map[*respClient]struct{}) - app.migrateConns = make(map[string]*goledis.Conn) + app.migrateClients = make(map[string]*goledis.Client) var err error @@ -139,12 +139,12 @@ func (app *App) Close() { app.listener.Close() //close all migrate connections - app.migrateConnM.Lock() - for k, c := range app.migrateConns { + app.migrateM.Lock() + for k, c := range app.migrateClients { c.Close() - delete(app.migrateConns, k) + delete(app.migrateClients, k) } - app.migrateConnM.Unlock() + app.migrateM.Unlock() if app.httpListener != nil { app.httpListener.Close() diff --git a/server/client.go b/server/client.go index 57abc8c..5c1a9d8 100644 --- a/server/client.go +++ b/server/client.go @@ -11,25 +11,29 @@ import ( ) var txUnsupportedCmds = map[string]struct{}{ - "select": struct{}{}, - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, - "eval": struct{}{}, + "select": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "eval": struct{}{}, + "xmigrate": struct{}{}, + "xmigratedb": struct{}{}, } var scriptUnsupportedCmds = map[string]struct{}{ - "slaveof": struct{}{}, - "fullsync": struct{}{}, - "sync": struct{}{}, - "begin": struct{}{}, - "commit": struct{}{}, - "rollback": struct{}{}, - "flushall": struct{}{}, - "flushdb": struct{}{}, + "slaveof": struct{}{}, + "fullsync": struct{}{}, + "sync": struct{}{}, + "begin": struct{}{}, + "commit": struct{}{}, + "rollback": struct{}{}, + "flushall": struct{}{}, + "flushdb": struct{}{}, + "xmigrate": struct{}{}, + "xmigratedb": struct{}{}, } type responseWriter interface { diff --git a/server/cmd_migrate.go b/server/cmd_migrate.go index 1295bb5..3153821 100644 --- a/server/cmd_migrate.go +++ b/server/cmd_migrate.go @@ -109,16 +109,16 @@ func restoreCommand(c *client) error { func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { var err error var data []byte - switch tp { - case "kv": + switch strings.ToUpper(tp) { + case "KV": data, err = db.Dump(key) - case "hash": + case "HASH": data, err = db.HDump(key) - case "list": + case "LIST": data, err = db.LDump(key) - case "set": + case "SET": data, err = db.SDump(key) - case "zset": + case "ZSET": data, err = db.ZDump(key) default: err = fmt.Errorf("invalid key type %s", tp) @@ -128,16 +128,16 @@ func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) { func xdel(db *ledis.DB, tp string, key []byte) error { var err error - switch tp { - case "kv": + switch strings.ToUpper(tp) { + case "KV": _, err = db.Del(key) - case "hash": + case "HASH": _, err = db.HClear(key) - case "list": + case "LIST": _, err = db.LClear(key) - case "set": + case "SET": _, err = db.SClear(key) - case "zset": + case "ZSET": _, err = db.ZClear(key) default: err = fmt.Errorf("invalid key type %s", tp) @@ -145,6 +145,40 @@ func xdel(db *ledis.DB, tp string, key []byte) error { return err } +func xttl(db *ledis.DB, tp string, key []byte) (int64, error) { + switch strings.ToUpper(tp) { + case "KV": + return db.TTL(key) + case "HASH": + return db.HTTL(key) + case "LIST": + return db.LTTL(key) + case "SET": + return db.STTL(key) + case "ZSET": + return db.ZTTL(key) + default: + return 0, fmt.Errorf("invalid key type %s", tp) + } +} + +func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) { + switch strings.ToUpper(tp) { + case "KV": + return db.Scan(nil, count, false, "") + case "HASH": + return db.HScan(nil, count, false, "") + case "LIST": + return db.LScan(nil, count, false, "") + case "SET": + return db.SScan(nil, count, false, "") + case "ZSET": + return db.ZScan(nil, count, false, "") + default: + return nil, fmt.Errorf("invalid key type %s", tp) + } +} + func xdumpCommand(c *client) error { args := c.args if len(args) != 2 { @@ -162,15 +196,131 @@ func xdumpCommand(c *client) error { return nil } -//XMIGRATE host port type key destination-db timeout [COPY] -func xmigrateCommand(c *client) error { - args := c.args +func (app *App) getMigrateClient(addr string) *goledis.Client { + app.migrateM.Lock() - if len(args) != 6 && len(args) != 7 { + mc, ok := app.migrateClients[addr] + if !ok { + mc = goledis.NewClient(&goledis.Config{addr, 4, 0, 0}) + app.migrateClients[addr] = mc + + } + + app.migrateM.Unlock() + + return mc +} + +//XMIGRATEDB host port tp count db timeout +//select count tp type keys and migrate +//will block any other write operations +//maybe only for xcodis +func xmigratedbCommand(c *client) error { + args := c.args + if len(args) != 6 { return ErrCmdParams } - addr := fmt.Sprintf("%s:%d", string(args[0]), string(args[1])) + addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1])) + if addr == c.app.cfg.Addr { + //same server, can not migrate + return fmt.Errorf("migrate in same server is not allowed") + } + + tp := string(args[2]) + + count, err := ledis.StrInt64(args[3], nil) + if err != nil { + return err + } else if count <= 0 { + count = 10 + } + + db, err := ledis.StrUint64(args[4], nil) + if err != nil { + return err + } else if db >= uint64(ledis.MaxDBNumber) { + return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) + } + + timeout, err := ledis.StrInt64(args[5], nil) + if err != nil { + return err + } else if timeout < 0 { + return fmt.Errorf("invalid timeout %d", timeout) + } + + m, err := c.db.Multi() + if err != nil { + return err + } + defer m.Close() + + keys, err := xscan(m.DB, tp, int(count)) + if err != nil { + return err + } else if len(keys) == 0 { + c.resp.writeInteger(0) + return nil + } + + mc := c.app.getMigrateClient(addr) + + conn := mc.Get() + + //timeout is milliseconds + t := time.Duration(timeout) * time.Millisecond + conn.SetConnectTimeout(t) + + if _, err = conn.Do("select", db); err != nil { + return err + } + + for _, key := range keys { + data, err := xdump(m.DB, tp, key) + if err != nil { + return err + } + + ttl, err := xttl(m.DB, tp, key) + if err != nil { + return err + } + + conn.SetReadDeadline(time.Now().Add(t)) + + //ttl is second, but restore need millisecond + if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { + return err + } + + if err = xdel(m.DB, tp, key); err != nil { + return err + } + + } + + c.resp.writeInteger(int64(len(keys))) + + return nil +} + +//XMIGRATE host port type key destination-db timeout +//will block any other write operations +//maybe only for xcodis +func xmigrateCommand(c *client) error { + args := c.args + + if len(args) != 6 { + return ErrCmdParams + } + + addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1])) + if addr == c.app.cfg.Addr { + //same server, can not migrate + return fmt.Errorf("migrate in same server is not allowed") + } + tp := string(args[2]) key := args[3] db, err := ledis.StrUint64(args[4], nil) @@ -179,29 +329,21 @@ func xmigrateCommand(c *client) error { } else if db >= uint64(ledis.MaxDBNumber) { return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber) } - var timeout int64 - timeout, err = ledis.StrInt64(args[5], nil) + + timeout, err := ledis.StrInt64(args[5], nil) if err != nil { return err } else if timeout < 0 { return fmt.Errorf("invalid timeout %d", timeout) } - onlyCopy := false - if len(args) == 7 { - if strings.ToUpper(string(args[6])) == "COPY" { - onlyCopy = true - } - } - - var m *ledis.Multi - if m, err = c.db.Multi(); err != nil { + m, err := c.db.Multi() + if err != nil { return err } defer m.Close() - var data []byte - data, err = xdump(m.DB, tp, key) + data, err := xdump(m.DB, tp, key) if err != nil { return err } else if data == nil { @@ -209,26 +351,32 @@ func xmigrateCommand(c *client) error { return nil } - c.app.migrateConnM.Lock() - defer c.app.migrateConnM.Unlock() - - conn, ok := c.app.migrateConns[addr] - if !ok { - conn = goledis.NewConn(addr) - c.app.migrateConns[addr] = conn - } - - //timeout is milliseconds - conn.SetConnectTimeout(time.Duration(timeout) * time.Millisecond) - - if _, err = conn.Do("restore", key, data); err != nil { + ttl, err := xttl(m.DB, tp, key) + if err != nil { return err } - if !onlyCopy { - if err = xdel(m.DB, tp, key); err != nil { - return err - } + mc := c.app.getMigrateClient(addr) + + conn := mc.Get() + + //timeout is milliseconds + t := time.Duration(timeout) * time.Millisecond + conn.SetConnectTimeout(t) + + if _, err = conn.Do("select", db); err != nil { + return err + } + + conn.SetReadDeadline(time.Now().Add(t)) + + //ttl is second, but restore need millisecond + if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil { + return err + } + + if err = xdel(m.DB, tp, key); err != nil { + return err } c.resp.writeStatus(OK) @@ -244,4 +392,5 @@ func init() { register("restore", restoreCommand) register("xdump", xdumpCommand) register("xmigrate", xmigrateCommand) + register("xmigratedb", xmigratedbCommand) } diff --git a/server/cmd_migrate_test.go b/server/cmd_migrate_test.go index cc71c18..1dfb2c0 100644 --- a/server/cmd_migrate_test.go +++ b/server/cmd_migrate_test.go @@ -1,11 +1,15 @@ package server import ( + "fmt" "github.com/siddontang/ledisdb/client/go/ledis" + "github.com/siddontang/ledisdb/config" + "os" "testing" + "time" ) -func TestMigrate(t *testing.T) { +func TestDumpRestore(t *testing.T) { c := getTestConn() defer c.Close() @@ -31,17 +35,94 @@ func TestMigrate(t *testing.T) { t.Fatal(err) } - testMigrate(c, "dump", "mtest_a", t) - testMigrate(c, "ldump", "mtest_la", t) - testMigrate(c, "hdump", "mtest_ha", t) - testMigrate(c, "sdump", "mtest_sa", t) - testMigrate(c, "zdump", "mtest_za", t) + testDumpRestore(c, "dump", "mtest_a", t) + testDumpRestore(c, "ldump", "mtest_la", t) + testDumpRestore(c, "hdump", "mtest_ha", t) + testDumpRestore(c, "sdump", "mtest_sa", t) + testDumpRestore(c, "zdump", "mtest_za", t) } -func testMigrate(c *ledis.Conn, dump string, key string, t *testing.T) { +func testDumpRestore(c *ledis.Conn, dump string, key string, t *testing.T) { if data, err := ledis.Bytes(c.Do(dump, key)); err != nil { t.Fatal(err) } else if _, err := c.Do("restore", key, 0, data); err != nil { t.Fatal(err) } } + +func TestMigrate(t *testing.T) { + data_dir := "/tmp/test_migrate" + os.RemoveAll(data_dir) + + s1Cfg := config.NewConfigDefault() + s1Cfg.DataDir = fmt.Sprintf("%s/s1", data_dir) + s1Cfg.Addr = "127.0.0.1:11185" + + s2Cfg := config.NewConfigDefault() + s2Cfg.DataDir = fmt.Sprintf("%s/s2", data_dir) + s2Cfg.Addr = "127.0.0.1:11186" + + s1, err := NewApp(s1Cfg) + if err != nil { + t.Fatal(err) + } + defer s1.Close() + + s2, err := NewApp(s2Cfg) + if err != nil { + t.Fatal(err) + } + defer s2.Close() + + go s1.Run() + + go s2.Run() + + time.Sleep(1 * time.Second) + + c1 := ledis.NewConn(s1Cfg.Addr) + defer c1.Close() + + c2 := ledis.NewConn(s2Cfg.Addr) + defer c2.Close() + + if _, err = c1.Do("set", "a", "1"); err != nil { + t.Fatal(err) + } + + timeout := 30000 + if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "KV", "a", 0, timeout); err != nil { + t.Fatal(err) + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } + + if num, err := ledis.Int(c2.Do("xmigratedb", "127.0.0.1", 11185, "KV", 10, 0, timeout)); err != nil { + t.Fatal(err) + } else if num != 1 { + t.Fatal(num, "must number 1") + } + + if s, err := ledis.String(c1.Do("get", "a")); err != nil { + t.Fatal(err) + } else if s != "1" { + t.Fatal(s, "must 1") + } + + if s, err := ledis.String(c2.Do("get", "a")); err != nil && err != ledis.ErrNil { + t.Fatal(err) + } else if s != "" { + t.Fatal(s, "must empty") + } + +} From a5621140fa93344c2350fcf3d46ccfb880d91dbc Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 3 Dec 2014 17:27:15 +0800 Subject: [PATCH 10/11] update read me --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2426c57..139ae72 100644 --- a/README.md +++ b/README.md @@ -178,5 +178,5 @@ See [Clients](https://github.com/siddontang/ledisdb/wiki/Clients) to find or con ## Feedback -Gmail: siddontang@gmail.com -Skype: live:siddontang_1 ++ Gmail: siddontang@gmail.com ++ Skype: live:siddontang_1 From f91d8fe09a29772cfca27474e78a046ce91db6b2 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 4 Dec 2014 20:59:16 +0800 Subject: [PATCH 11/11] update read me --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 139ae72..c81d86c 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ LedisDB now supports multiple different databases as backends. + HTTP API support, JSON/BSON/msgpack output. + Replication to guarantee data safety. + Supplies tools to load, dump, and repair database. ++ Supports cluster, use [xcodis](https://github.com/siddontang/xcodis) ## Build and Install @@ -147,6 +148,10 @@ Set slaveof in config or dynamiclly ledis 127.0.0.1:6381> slaveof 127.0.0.1 6380 OK +## Cluster support + +LedisDB uses a proxy named [xcodis](https://github.com/siddontang/xcodis) to support cluster. + ## Benchmark See [benchmark](https://github.com/siddontang/ledisdb/wiki/Benchmark) for more.