mirror of https://github.com/ledisdb/ledisdb.git
Merge branch 'develop'
This commit is contained in:
commit
50697b9dcd
|
@ -11,8 +11,12 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/boltdb/bolt",
|
||||
"Comment": "data/v1-254-gd285804",
|
||||
"Rev": "d285804df1760edf4c602ecd901be5d5e67bf982"
|
||||
"Comment": "data/v1-256-ge65c902",
|
||||
"Rev": "e65c9027c35b7ef1014db9e02686889e51aadb2e"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/cupcake/rdb",
|
||||
"Rev": "3454dcabd33cb8ea8261ffd6a45f4d836eb504cc"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/edsrzf/mmap-go",
|
||||
|
|
|
@ -22,6 +22,7 @@ LedisDB now supports multiple different databases as backends.
|
|||
+ HTTP API support, JSON/BSON/msgpack output.
|
||||
+ Replication to guarantee data safety.
|
||||
+ Supplies tools to load, dump, and repair database.
|
||||
+ Supports cluster, use [xcodis](https://github.com/siddontang/xcodis)
|
||||
|
||||
## Build and Install
|
||||
|
||||
|
@ -147,6 +148,10 @@ Set slaveof in config or dynamiclly
|
|||
ledis 127.0.0.1:6381> slaveof 127.0.0.1 6380
|
||||
OK
|
||||
|
||||
## Cluster support
|
||||
|
||||
LedisDB uses a proxy named [xcodis](https://github.com/siddontang/xcodis) to support cluster.
|
||||
|
||||
## Benchmark
|
||||
|
||||
See [benchmark](https://github.com/siddontang/ledisdb/wiki/Benchmark) for more.
|
||||
|
@ -178,5 +183,5 @@ See [Clients](https://github.com/siddontang/ledisdb/wiki/Clients) to find or con
|
|||
|
||||
## Feedback
|
||||
|
||||
Gmail: siddontang@gmail.com
|
||||
Skype: live:siddontang_1
|
||||
+ Gmail: siddontang@gmail.com
|
||||
+ Skype: live:siddontang_1
|
||||
|
|
|
@ -8,7 +8,9 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Error represents an error returned in a command reply.
|
||||
|
@ -40,6 +42,8 @@ type Conn struct {
|
|||
|
||||
// Scratch space for formatting integers and floats.
|
||||
numScratch [40]byte
|
||||
|
||||
connectTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewConn(addr string) *Conn {
|
||||
|
@ -69,6 +73,28 @@ func (c *Conn) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Conn) SetConnectTimeout(t time.Duration) {
|
||||
c.cm.Lock()
|
||||
c.connectTimeout = t
|
||||
c.cm.Unlock()
|
||||
}
|
||||
|
||||
func (c *Conn) SetReadDeadline(t time.Time) {
|
||||
c.cm.Lock()
|
||||
if c.c != nil {
|
||||
c.c.SetReadDeadline(t)
|
||||
}
|
||||
c.cm.Unlock()
|
||||
}
|
||||
|
||||
func (c *Conn) SetWriteDeadline(t time.Time) {
|
||||
c.cm.Lock()
|
||||
if c.c != nil {
|
||||
c.c.SetWriteDeadline(t)
|
||||
}
|
||||
c.cm.Unlock()
|
||||
}
|
||||
|
||||
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||
if err := c.Send(cmd, args...); err != nil {
|
||||
return nil, err
|
||||
|
@ -78,6 +104,21 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
|||
}
|
||||
|
||||
func (c *Conn) Send(cmd string, args ...interface{}) error {
|
||||
var err error
|
||||
for i := 0; i < 2; i++ {
|
||||
if err = c.send(cmd, args...); err != nil {
|
||||
if e, ok := err.(*net.OpError); ok && strings.Contains(e.Error(), "use of closed network connection") {
|
||||
//send to a closed connection, try again
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) send(cmd string, args ...interface{}) error {
|
||||
if err := c.connect(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -129,7 +170,9 @@ func (c *Conn) ReceiveBulkTo(w io.Writer) error {
|
|||
func (c *Conn) finalize() {
|
||||
c.cm.Lock()
|
||||
if !c.closed {
|
||||
c.c.Close()
|
||||
if c.c != nil {
|
||||
c.c.Close()
|
||||
}
|
||||
c.closed = true
|
||||
}
|
||||
c.cm.Unlock()
|
||||
|
@ -144,7 +187,7 @@ func (c *Conn) connect() error {
|
|||
}
|
||||
|
||||
var err error
|
||||
c.c, err = net.Dial(getProto(c.addr), c.addr)
|
||||
c.c, err = net.DialTimeout(getProto(c.addr), c.addr, c.connectTimeout)
|
||||
if err != nil {
|
||||
c.c = nil
|
||||
return err
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
//This file was generated by .tools/generate_commands.py on Sun Oct 26 2014 15:14:39 +0800
|
||||
//This file was generated by .tools/generate_commands.py on Thu Nov 27 2014 13:42:44 +0800
|
||||
package main
|
||||
|
||||
var helpCommands = [][]string{
|
||||
|
@ -23,6 +23,7 @@ var helpCommands = [][]string{
|
|||
{"DECR", "key", "KV"},
|
||||
{"DECRBY", "key decrement", "KV"},
|
||||
{"DEL", "key [key ...]", "KV"},
|
||||
{"DUMP", "key", "KV"},
|
||||
{"ECHO", "message", "Server"},
|
||||
{"EVAL", "script numkeys key [key ...] arg [arg ...]", "Script"},
|
||||
{"EVALSHA", "sha1 numkeys key [key ...] arg [arg ...]", "Script"},
|
||||
|
@ -36,6 +37,7 @@ var helpCommands = [][]string{
|
|||
{"GETSET", " key value", "KV"},
|
||||
{"HCLEAR", "key", "Hash"},
|
||||
{"HDEL", "key field [field ...]", "Hash"},
|
||||
{"HDUMP", "key", "Hash"},
|
||||
{"HEXISTS", "key field", "Hash"},
|
||||
{"HEXPIRE", "key seconds", "Hash"},
|
||||
{"HEXPIREAT", "key timestamp", "Hash"},
|
||||
|
@ -57,6 +59,7 @@ var helpCommands = [][]string{
|
|||
{"INCRBY", "key increment", "KV"},
|
||||
{"INFO", "[section]", "Server"},
|
||||
{"LCLEAR", "key", "List"},
|
||||
{"LDUMP", "key", "List"},
|
||||
{"LEXPIRE", "key seconds", "List"},
|
||||
{"LEXPIREAT", "key timestamp", "List"},
|
||||
{"LINDEX", "key index", "List"},
|
||||
|
@ -73,6 +76,7 @@ var helpCommands = [][]string{
|
|||
{"MSET", "key value [key value ...]", "KV"},
|
||||
{"PERSIST", "key", "KV"},
|
||||
{"PING", "-", "Server"},
|
||||
{"RESTORE", "key ttl value", "Server"},
|
||||
{"ROLLBACK", "-", "Transaction"},
|
||||
{"RPOP", "key", "List"},
|
||||
{"RPUSH", "key value [value ...]", "List"},
|
||||
|
@ -84,6 +88,7 @@ var helpCommands = [][]string{
|
|||
{"SCRIPT LOAD", "script", "Script"},
|
||||
{"SDIFF", "key [key ...]", "Set"},
|
||||
{"SDIFFSTORE", "destination key [key ...]", "Set"},
|
||||
{"SDUMP", "key", "Set"},
|
||||
{"SELECT", "index", "Server"},
|
||||
{"SET", "key value", "KV"},
|
||||
{"SETEX", "key seconds value", "KV"},
|
||||
|
@ -112,6 +117,7 @@ var helpCommands = [][]string{
|
|||
{"ZCARD", "key", "ZSet"},
|
||||
{"ZCLEAR", "key", "ZSet"},
|
||||
{"ZCOUNT", "key min max", "ZSet"},
|
||||
{"ZDUMP", "key", "ZSet"},
|
||||
{"ZEXPIRE", "key seconds", "ZSet"},
|
||||
{"ZEXPIREAT", "key timestamp", "ZSet"},
|
||||
{"ZINCRBY", "key increment member", "ZSet"},
|
||||
|
|
|
@ -14,11 +14,11 @@ LedisDB has no Strings data type but KV and Bitmap, any some Keys and Strings co
|
|||
In Redis, `del` can delete all type data, like String, Hash, List, etc, but in LedisDB, `del` can only delete KV data. To delete other type data, you will use "clear" commands.
|
||||
|
||||
+ KV: `del`, `mdel`
|
||||
+ Hash: `hclear`, `mhclear`
|
||||
+ List: `lclear`, `mlclear`
|
||||
+ Set: `sclear`, `msclear`
|
||||
+ Zset: `zclear`, `mzclear`
|
||||
+ Bitmap: `bclear`, `mbclear`
|
||||
+ Hash: `hclear`, `hmclear`
|
||||
+ List: `lclear`, `lmclear`
|
||||
+ Set: `sclear`, `smclear`
|
||||
+ Zset: `zclear`, `zmclear`
|
||||
+ Bitmap: `bclear`, `bmclear`
|
||||
|
||||
## Expire, Persist, and TTL
|
||||
|
||||
|
@ -56,5 +56,14 @@ LedisDB supplies `xscan`, `xrevscan`, etc, to fetch data iteratively and reverse
|
|||
+ Zset: `zxscan`, `zxrevscan`
|
||||
+ Bitmap: `bxscan`, `bxrevscan`
|
||||
|
||||
## DUMP
|
||||
|
||||
+ KV: `dump`
|
||||
+ Hash: `hdump`
|
||||
+ List: `ldump`
|
||||
+ Set: `sdump`
|
||||
+ ZSet: `zdump`
|
||||
|
||||
LedisDB supports `dump` to serialize the value with key, the data format is the same as Redis, so you can use it in Redis and vice versa.
|
||||
|
||||
Of course, LedisDB has not implemented all APIs in Redis, you can see full commands in commands.json, commands.doc or [wiki](https://github.com/siddontang/ledisdb/wiki/Commands).
|
|
@ -691,5 +691,42 @@
|
|||
"arguments" : "-",
|
||||
"group": "Server",
|
||||
"readonly": false
|
||||
},
|
||||
|
||||
"DUMP": {
|
||||
"arguments" : "key",
|
||||
"group": "KV",
|
||||
"readonly": true
|
||||
},
|
||||
|
||||
"LDUMP": {
|
||||
"arguments" : "key",
|
||||
"group": "List",
|
||||
"readonly": true
|
||||
},
|
||||
|
||||
"HDUMP": {
|
||||
"arguments" : "key",
|
||||
"group": "Hash",
|
||||
"readonly": true
|
||||
},
|
||||
|
||||
|
||||
"SDUMP": {
|
||||
"arguments" : "key",
|
||||
"group": "Set",
|
||||
"readonly": true
|
||||
},
|
||||
|
||||
"ZDUMP": {
|
||||
"arguments" : "key",
|
||||
"group": "ZSet",
|
||||
"readonly": true
|
||||
},
|
||||
|
||||
"RESTORE": {
|
||||
"arguments" : "key ttl value",
|
||||
"group" : "Server",
|
||||
"readonly" : false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ Table of Contents
|
|||
- [PERSIST key](#persist-key)
|
||||
- [XSCAN key [MATCH match] [COUNT count]](#xscan-key-match-match-count-count)
|
||||
- [XREVSCAN key [MATCH match] [COUNT count]](#xrevscan-key-match-match-count-count)
|
||||
- [DUMP key](#dump-key)
|
||||
- [Hash](#hash)
|
||||
- [HDEL key field [field ...]](#hdel-key-field-field-)
|
||||
- [HEXISTS key field](#hexists-key-field)
|
||||
|
@ -49,6 +50,7 @@ Table of Contents
|
|||
- [HPERSIST key](#hpersist-key)
|
||||
- [HXSCAN key [MATCH match] [COUNT count]](#hxscan-key-match-match-count-count)
|
||||
- [HXREVSCAN key [MATCH match] [COUNT count]](#hxrevscan-key-match-match-count-count)
|
||||
- [HDUMP key](#hdump-key)
|
||||
- [List](#list)
|
||||
- [BLPOP key [key ...] timeout](#blpop-key-key--timeout)
|
||||
- [BRPOP key [key ...] timeout](#brpop-key-key--timeout)
|
||||
|
@ -67,6 +69,7 @@ Table of Contents
|
|||
- [LPERSIST key](#lpersist-key)
|
||||
- [LXSCAN key [MATCH match] [COUNT count]](#lxscan-key-match-match-count-count)
|
||||
- [LXREVSCAN key [MATCH match] [COUNT count]](#lxrevscan-key-match-match-count-count)
|
||||
- [LDUMP key](#ldump-key)
|
||||
- [Set](#set)
|
||||
- [SADD key member [member ...]](#sadd-key-member-member-)
|
||||
- [SCARD key](#scard-key)
|
||||
|
@ -87,6 +90,7 @@ Table of Contents
|
|||
- [SPERSIST key](#spersist-key)
|
||||
- [SXSCAN key [MATCH match] [COUNT count]](#sxscan-key-match-match-count-count)
|
||||
- [SXREVSCAN key [MATCH match] [COUNT count]](#sxrevscan-key-match-match-count-count)
|
||||
- [SDUMP key](#sdump-key)
|
||||
- [ZSet](#zset)
|
||||
- [ZADD key score member [score member ...]](#zadd-key-score-member-score-member-)
|
||||
- [ZCARD key](#zcard-key)
|
||||
|
@ -117,6 +121,7 @@ Table of Contents
|
|||
- [ZRANGEBYLEX key min max [LIMIT offset count]](#zrangebylex-key-min-max-limit-offset-count)
|
||||
- [ZREMRANGEBYLEX key min max](#zremrangebylex-key-min-max)
|
||||
- [ZLEXCOUNT key min max](#zlexcount-key-min-max)
|
||||
- [ZDUMP key](#zdump-key)
|
||||
- [Bitmap](#bitmap)
|
||||
- [BGET key](#bget-key)
|
||||
- [BGETBIT key offset](#bgetbit-key-offset)
|
||||
|
@ -143,6 +148,7 @@ Table of Contents
|
|||
- [INFO [section]](#info-section)
|
||||
- [TIME](#time)
|
||||
- [CONFIG REWRITE](#config-rewrite)
|
||||
- [RESTORE key ttl value](#restore-key-ttl-value)
|
||||
- [Transaction](#transaction)
|
||||
- [BEGIN](#begin)
|
||||
- [ROLLBACK](#rollback)
|
||||
|
@ -583,6 +589,22 @@ ledis>xrevscan "a" count 1
|
|||
2) []
|
||||
```
|
||||
|
||||
### DUMP key
|
||||
|
||||
Serialize the value stored at key with KV type in a Redis-specific format like RDB and return it to the user. The returned value can be synthesized back into a key using the RESTORE command.
|
||||
|
||||
**Return value**
|
||||
|
||||
bulk: the serialized value
|
||||
|
||||
**Examples**
|
||||
|
||||
```
|
||||
ledis> set mykey 10
|
||||
OK
|
||||
ledis>DUMP mykey
|
||||
"\x00\xc0\n\x06\x00\xf8r?\xc5\xfb\xfb_("
|
||||
```
|
||||
|
||||
## Hash
|
||||
|
||||
|
@ -959,6 +981,10 @@ Reverse iterate Hash keys incrementally.
|
|||
|
||||
See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information.
|
||||
|
||||
### HDUMP key
|
||||
|
||||
See [DUMP](#dump-key) for more information.
|
||||
|
||||
## List
|
||||
|
||||
### BLPOP key [key ...] timeout
|
||||
|
@ -1293,6 +1319,10 @@ Reverse iterate list keys incrementally.
|
|||
|
||||
See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information.
|
||||
|
||||
### LDUMP key
|
||||
|
||||
See [DUMP](#dump-key) for more information.
|
||||
|
||||
|
||||
## Set
|
||||
|
||||
|
@ -1728,6 +1758,10 @@ Reverse iterate Set keys incrementally.
|
|||
|
||||
See [XREVSCAN](#xrevscan-key-match-match-count-count) for more information.
|
||||
|
||||
### SDUMP key
|
||||
|
||||
See [DUMP](#dump-key) for more information.
|
||||
|
||||
## ZSet
|
||||
|
||||
### ZADD key score member [score member ...]
|
||||
|
@ -2425,6 +2459,9 @@ ledis> ZLEXCOUNT myzset - [c
|
|||
(integer) 3
|
||||
```
|
||||
|
||||
### ZDUMP key
|
||||
|
||||
See [DUMP](#dump-key) for more information.
|
||||
|
||||
## Bitmap
|
||||
|
||||
|
@ -2719,6 +2756,14 @@ Rewrites the config file the server was started with.
|
|||
|
||||
String: OK or error msg.
|
||||
|
||||
### RESTORE key ttl value
|
||||
|
||||
Create a key associated with a value that is obtained by deserializing the provided serialized value (obtained via DUMP, LDUMP, HDUMP, SDUMP, ZDUMP).
|
||||
|
||||
If ttl is 0 the key is created without any expire, otherwise the specified expire time (in milliseconds) is set. But you must know that now the checking ttl accuracy is second.
|
||||
|
||||
RESTORE checks the RDB version and data checksum. If they don't match an error is returned.
|
||||
|
||||
## Transaction
|
||||
|
||||
### BEGIN
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
package ledis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/siddontang/ledisdb/ledis/rdb"
|
||||
)
|
||||
|
||||
/*
|
||||
To support redis <-> ledisdb, the dump value format is the same as redis.
|
||||
We will not support bitmap, and may add bit operations for kv later.
|
||||
|
||||
But you must know that we use int64 for zset score, not double.
|
||||
Only support rdb version 6.
|
||||
*/
|
||||
|
||||
func (db *DB) Dump(key []byte) ([]byte, error) {
|
||||
v, err := db.Get(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if v == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rdb.Dump(rdb.String(v))
|
||||
}
|
||||
|
||||
func (db *DB) LDump(key []byte) ([]byte, error) {
|
||||
v, err := db.LRange(key, 0, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(v) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rdb.Dump(rdb.List(v))
|
||||
}
|
||||
|
||||
func (db *DB) HDump(key []byte) ([]byte, error) {
|
||||
v, err := db.HGetAll(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(v) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
o := make(rdb.HashMap, len(v))
|
||||
for i := 0; i < len(v); i++ {
|
||||
o[i].Field = v[i].Field
|
||||
o[i].Value = v[i].Value
|
||||
}
|
||||
|
||||
return rdb.Dump(o)
|
||||
}
|
||||
|
||||
func (db *DB) SDump(key []byte) ([]byte, error) {
|
||||
v, err := db.SMembers(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(v) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rdb.Dump(rdb.Set(v))
|
||||
}
|
||||
|
||||
func (db *DB) ZDump(key []byte) ([]byte, error) {
|
||||
v, err := db.ZRangeByScore(key, MinScore, MaxScore, 0, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(v) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
o := make(rdb.ZSet, len(v))
|
||||
for i := 0; i < len(v); i++ {
|
||||
o[i].Member = v[i].Member
|
||||
o[i].Score = float64(v[i].Score)
|
||||
}
|
||||
|
||||
return rdb.Dump(o)
|
||||
}
|
||||
|
||||
func (db *DB) Restore(key []byte, ttl int64, data []byte) error {
|
||||
d, err := rdb.DecodeDump(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//ttl is milliseconds, but we only support seconds
|
||||
//later may support milliseconds
|
||||
if ttl > 0 {
|
||||
ttl = ttl / 1e3
|
||||
if ttl == 0 {
|
||||
ttl = 1
|
||||
}
|
||||
}
|
||||
|
||||
switch value := d.(type) {
|
||||
case rdb.String:
|
||||
if _, err = db.Del(key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = db.Set(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
if _, err = db.Expire(key, ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case rdb.HashMap:
|
||||
//first clear old key
|
||||
if _, err = db.HClear(key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fv := make([]FVPair, len(value))
|
||||
for i := 0; i < len(value); i++ {
|
||||
fv[i] = FVPair{Field: value[i].Field, Value: value[i].Value}
|
||||
}
|
||||
|
||||
if err = db.HMset(key, fv...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
if _, err = db.HExpire(key, ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case rdb.List:
|
||||
//first clear old key
|
||||
if _, err = db.LClear(key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = db.RPush(key, value...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
if _, err = db.LExpire(key, ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case rdb.ZSet:
|
||||
//first clear old key
|
||||
if _, err = db.ZClear(key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sp := make([]ScorePair, len(value))
|
||||
for i := 0; i < len(value); i++ {
|
||||
sp[i] = ScorePair{int64(value[i].Score), value[i].Member}
|
||||
}
|
||||
|
||||
if _, err = db.ZAdd(key, sp...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
if _, err = db.ZExpire(key, ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case rdb.Set:
|
||||
//first clear old key
|
||||
if _, err = db.SClear(key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = db.SAdd(key, value...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ttl > 0 {
|
||||
if _, err = db.SExpire(key, ttl); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("invalid data type %T", d)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package ledis
|
||||
|
||||
import (
|
||||
"github.com/siddontang/ledisdb/config"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMigrate(t *testing.T) {
|
||||
cfg1 := config.NewConfigDefault()
|
||||
cfg1.DataDir = "/tmp/test_ledisdb_migrate1"
|
||||
os.RemoveAll(cfg1.DataDir)
|
||||
|
||||
defer os.RemoveAll(cfg1.DataDir)
|
||||
|
||||
l1, _ := Open(cfg1)
|
||||
defer l1.Close()
|
||||
|
||||
cfg2 := config.NewConfigDefault()
|
||||
cfg2.DataDir = "/tmp/test_ledisdb_migrate2"
|
||||
os.RemoveAll(cfg2.DataDir)
|
||||
|
||||
defer os.RemoveAll(cfg2.DataDir)
|
||||
|
||||
l2, _ := Open(cfg2)
|
||||
defer l2.Close()
|
||||
|
||||
db1, _ := l1.Select(0)
|
||||
db2, _ := l2.Select(0)
|
||||
|
||||
key := []byte("a")
|
||||
lkey := []byte("a")
|
||||
hkey := []byte("a")
|
||||
skey := []byte("a")
|
||||
zkey := []byte("a")
|
||||
value := []byte("1")
|
||||
|
||||
db1.Set(key, value)
|
||||
|
||||
if data, err := db1.Dump(key); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := db2.Restore(key, 0, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db1.RPush(lkey, []byte("1"), []byte("2"), []byte("3"))
|
||||
|
||||
if data, err := db1.LDump(lkey); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := db2.Restore(lkey, 0, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db1.SAdd(skey, []byte("1"), []byte("2"), []byte("3"))
|
||||
|
||||
if data, err := db1.SDump(skey); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := db2.Restore(skey, 0, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db1.HMset(hkey, FVPair{[]byte("a"), []byte("1")}, FVPair{[]byte("b"), []byte("2")}, FVPair{[]byte("c"), []byte("3")})
|
||||
|
||||
if data, err := db1.HDump(hkey); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := db2.Restore(hkey, 0, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db1.ZAdd(zkey, ScorePair{1, []byte("a")}, ScorePair{2, []byte("b")}, ScorePair{3, []byte("c")})
|
||||
|
||||
if data, err := db1.ZDump(zkey); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := db2.Restore(zkey, 0, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := checkLedisEqual(l1, l2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package rdb
|
||||
|
||||
// Copyright 2014 Wandoujia Inc. All Rights Reserved.
|
||||
// Licensed under the MIT (MIT-LICENSE.txt) license.
|
||||
|
||||
import "fmt"
|
||||
|
||||
import (
|
||||
"github.com/cupcake/rdb"
|
||||
"github.com/cupcake/rdb/nopdecoder"
|
||||
)
|
||||
|
||||
func DecodeDump(p []byte) (interface{}, error) {
|
||||
d := &decoder{}
|
||||
if err := rdb.DecodeDump(p, 0, nil, 0, d); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return d.obj, d.err
|
||||
}
|
||||
|
||||
type decoder struct {
|
||||
nopdecoder.NopDecoder
|
||||
obj interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
func (d *decoder) initObject(obj interface{}) {
|
||||
if d.err != nil {
|
||||
return
|
||||
}
|
||||
if d.obj != nil {
|
||||
d.err = fmt.Errorf("invalid object, init again")
|
||||
} else {
|
||||
d.obj = obj
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoder) Set(key, value []byte, expiry int64) {
|
||||
d.initObject(String(value))
|
||||
}
|
||||
|
||||
func (d *decoder) StartHash(key []byte, length, expiry int64) {
|
||||
d.initObject(HashMap(nil))
|
||||
}
|
||||
|
||||
func (d *decoder) Hset(key, field, value []byte) {
|
||||
if d.err != nil {
|
||||
return
|
||||
}
|
||||
switch h := d.obj.(type) {
|
||||
default:
|
||||
d.err = fmt.Errorf("invalid object, not a hashmap")
|
||||
case HashMap:
|
||||
v := struct {
|
||||
Field, Value []byte
|
||||
}{
|
||||
field,
|
||||
value,
|
||||
}
|
||||
d.obj = append(h, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoder) StartSet(key []byte, cardinality, expiry int64) {
|
||||
d.initObject(Set(nil))
|
||||
}
|
||||
|
||||
func (d *decoder) Sadd(key, member []byte) {
|
||||
if d.err != nil {
|
||||
return
|
||||
}
|
||||
switch s := d.obj.(type) {
|
||||
default:
|
||||
d.err = fmt.Errorf("invalid object, not a set")
|
||||
case Set:
|
||||
d.obj = append(s, member)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoder) StartList(key []byte, length, expiry int64) {
|
||||
d.initObject(List(nil))
|
||||
}
|
||||
|
||||
func (d *decoder) Rpush(key, value []byte) {
|
||||
if d.err != nil {
|
||||
return
|
||||
}
|
||||
switch l := d.obj.(type) {
|
||||
default:
|
||||
d.err = fmt.Errorf("invalid object, not a list")
|
||||
case List:
|
||||
d.obj = append(l, value)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoder) StartZSet(key []byte, cardinality, expiry int64) {
|
||||
d.initObject(ZSet(nil))
|
||||
}
|
||||
|
||||
func (d *decoder) Zadd(key []byte, score float64, member []byte) {
|
||||
if d.err != nil {
|
||||
return
|
||||
}
|
||||
switch z := d.obj.(type) {
|
||||
default:
|
||||
d.err = fmt.Errorf("invalid object, not a zset")
|
||||
case ZSet:
|
||||
v := struct {
|
||||
Member []byte
|
||||
Score float64
|
||||
}{
|
||||
member,
|
||||
score,
|
||||
}
|
||||
d.obj = append(z, v)
|
||||
}
|
||||
}
|
||||
|
||||
type String []byte
|
||||
type List [][]byte
|
||||
type HashMap []struct {
|
||||
Field, Value []byte
|
||||
}
|
||||
type Set [][]byte
|
||||
type ZSet []struct {
|
||||
Member []byte
|
||||
Score float64
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package rdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/cupcake/rdb"
|
||||
)
|
||||
|
||||
func Dump(obj interface{}) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
|
||||
e := rdb.NewEncoder(&buf)
|
||||
|
||||
switch v := obj.(type) {
|
||||
case String:
|
||||
e.EncodeType(rdb.TypeString)
|
||||
e.EncodeString(v)
|
||||
case HashMap:
|
||||
e.EncodeType(rdb.TypeHash)
|
||||
e.EncodeLength(uint32(len(v)))
|
||||
|
||||
for i := 0; i < len(v); i++ {
|
||||
e.EncodeString(v[i].Field)
|
||||
e.EncodeString(v[i].Value)
|
||||
}
|
||||
case List:
|
||||
e.EncodeType(rdb.TypeList)
|
||||
e.EncodeLength(uint32(len(v)))
|
||||
for i := 0; i < len(v); i++ {
|
||||
e.EncodeString(v[i])
|
||||
}
|
||||
case Set:
|
||||
e.EncodeType(rdb.TypeSet)
|
||||
e.EncodeLength(uint32(len(v)))
|
||||
for i := 0; i < len(v); i++ {
|
||||
e.EncodeString(v[i])
|
||||
}
|
||||
case ZSet:
|
||||
e.EncodeType(rdb.TypeZSet)
|
||||
e.EncodeLength(uint32(len(v)))
|
||||
for i := 0; i < len(v); i++ {
|
||||
e.EncodeString(v[i].Member)
|
||||
e.EncodeFloat(v[i].Score)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid dump type %T", obj)
|
||||
}
|
||||
|
||||
e.EncodeDumpFooter()
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package rdb
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCodec(t *testing.T) {
|
||||
testCodec(String("abc"), t)
|
||||
}
|
||||
|
||||
func testCodec(obj interface{}, t *testing.T) {
|
||||
b, err := Dump(obj)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if o, err := DecodeDump(b); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(obj, o) {
|
||||
t.Fatal("must equal")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2014 Wandoujia Inc.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
|
@ -18,7 +18,7 @@ func checkLedisEqual(master *Ledis, slave *Ledis) error {
|
|||
if v, err := slave.ldb.Get(key); err != nil {
|
||||
return err
|
||||
} else if !bytes.Equal(v, value) {
|
||||
return fmt.Errorf("replication error %d != %d", len(v), len(value))
|
||||
return fmt.Errorf("equal error at %q, %d != %d", key, len(v), len(value))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,11 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
We will not maintain bitmap anymore, and will add bit operations for kv type later.
|
||||
Use your own risk.
|
||||
*/
|
||||
|
||||
const (
|
||||
OPand uint8 = iota + 1
|
||||
OPor
|
||||
|
|
|
@ -374,6 +374,8 @@ func (db *DB) SMembers(key []byte) ([][]byte, error) {
|
|||
v := make([][]byte, 0, 16)
|
||||
|
||||
it := db.bucket.RangeLimitIterator(start, stop, store.RangeROpen, 0, -1)
|
||||
defer it.Close()
|
||||
|
||||
for ; it.Valid(); it.Next() {
|
||||
_, m, err := db.sDecodeSetKey(it.Key())
|
||||
if err != nil {
|
||||
|
@ -383,8 +385,6 @@ func (db *DB) SMembers(key []byte) ([][]byte, error) {
|
|||
v = append(v, m)
|
||||
}
|
||||
|
||||
it.Close()
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package ledis
|
|||
import (
|
||||
"github.com/siddontang/ledisdb/config"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
@ -201,6 +202,9 @@ func testTx(t *testing.T, name string) {
|
|||
|
||||
l, err := Open(cfg)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not registered") {
|
||||
return
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
goledis "github.com/siddontang/ledisdb/client/go/ledis"
|
||||
"github.com/siddontang/ledisdb/config"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"net"
|
||||
|
@ -42,6 +43,9 @@ type App struct {
|
|||
|
||||
rcm sync.Mutex
|
||||
rcs map[*respClient]struct{}
|
||||
|
||||
migrateM sync.Mutex
|
||||
migrateClients map[string]*goledis.Client
|
||||
}
|
||||
|
||||
func netType(s string) string {
|
||||
|
@ -71,6 +75,8 @@ func NewApp(cfg *config.Config) (*App, error) {
|
|||
|
||||
app.rcs = make(map[*respClient]struct{})
|
||||
|
||||
app.migrateClients = make(map[string]*goledis.Client)
|
||||
|
||||
var err error
|
||||
|
||||
if app.info, err = newInfo(app); err != nil {
|
||||
|
@ -132,6 +138,14 @@ func (app *App) Close() {
|
|||
|
||||
app.listener.Close()
|
||||
|
||||
//close all migrate connections
|
||||
app.migrateM.Lock()
|
||||
for k, c := range app.migrateClients {
|
||||
c.Close()
|
||||
delete(app.migrateClients, k)
|
||||
}
|
||||
app.migrateM.Unlock()
|
||||
|
||||
if app.httpListener != nil {
|
||||
app.httpListener.Close()
|
||||
}
|
||||
|
|
|
@ -11,25 +11,29 @@ import (
|
|||
)
|
||||
|
||||
var txUnsupportedCmds = map[string]struct{}{
|
||||
"select": struct{}{},
|
||||
"slaveof": struct{}{},
|
||||
"fullsync": struct{}{},
|
||||
"sync": struct{}{},
|
||||
"begin": struct{}{},
|
||||
"flushall": struct{}{},
|
||||
"flushdb": struct{}{},
|
||||
"eval": struct{}{},
|
||||
"select": struct{}{},
|
||||
"slaveof": struct{}{},
|
||||
"fullsync": struct{}{},
|
||||
"sync": struct{}{},
|
||||
"begin": struct{}{},
|
||||
"flushall": struct{}{},
|
||||
"flushdb": struct{}{},
|
||||
"eval": struct{}{},
|
||||
"xmigrate": struct{}{},
|
||||
"xmigratedb": struct{}{},
|
||||
}
|
||||
|
||||
var scriptUnsupportedCmds = map[string]struct{}{
|
||||
"slaveof": struct{}{},
|
||||
"fullsync": struct{}{},
|
||||
"sync": struct{}{},
|
||||
"begin": struct{}{},
|
||||
"commit": struct{}{},
|
||||
"rollback": struct{}{},
|
||||
"flushall": struct{}{},
|
||||
"flushdb": struct{}{},
|
||||
"slaveof": struct{}{},
|
||||
"fullsync": struct{}{},
|
||||
"sync": struct{}{},
|
||||
"begin": struct{}{},
|
||||
"commit": struct{}{},
|
||||
"rollback": struct{}{},
|
||||
"flushall": struct{}{},
|
||||
"flushdb": struct{}{},
|
||||
"xmigrate": struct{}{},
|
||||
"xmigratedb": struct{}{},
|
||||
}
|
||||
|
||||
type responseWriter interface {
|
||||
|
|
|
@ -0,0 +1,396 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
goledis "github.com/siddontang/ledisdb/client/go/ledis"
|
||||
"github.com/siddontang/ledisdb/ledis"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func dumpCommand(c *client) error {
|
||||
if len(c.args) != 1 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
key := c.args[0]
|
||||
if data, err := c.db.Dump(key); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeBulk(data)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ldumpCommand(c *client) error {
|
||||
if len(c.args) != 1 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
key := c.args[0]
|
||||
if data, err := c.db.LDump(key); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeBulk(data)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func hdumpCommand(c *client) error {
|
||||
if len(c.args) != 1 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
key := c.args[0]
|
||||
if data, err := c.db.HDump(key); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeBulk(data)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func sdumpCommand(c *client) error {
|
||||
if len(c.args) != 1 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
key := c.args[0]
|
||||
if data, err := c.db.SDump(key); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeBulk(data)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func zdumpCommand(c *client) error {
|
||||
if len(c.args) != 1 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
key := c.args[0]
|
||||
if data, err := c.db.ZDump(key); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeBulk(data)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// unlike redis, restore will try to delete old key first
|
||||
func restoreCommand(c *client) error {
|
||||
args := c.args
|
||||
if len(args) != 3 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
key := args[0]
|
||||
ttl, err := ledis.StrInt64(args[1], nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data := args[2]
|
||||
|
||||
if err = c.db.Restore(key, ttl, data); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeStatus(OK)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func xdump(db *ledis.DB, tp string, key []byte) ([]byte, error) {
|
||||
var err error
|
||||
var data []byte
|
||||
switch strings.ToUpper(tp) {
|
||||
case "KV":
|
||||
data, err = db.Dump(key)
|
||||
case "HASH":
|
||||
data, err = db.HDump(key)
|
||||
case "LIST":
|
||||
data, err = db.LDump(key)
|
||||
case "SET":
|
||||
data, err = db.SDump(key)
|
||||
case "ZSET":
|
||||
data, err = db.ZDump(key)
|
||||
default:
|
||||
err = fmt.Errorf("invalid key type %s", tp)
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
func xdel(db *ledis.DB, tp string, key []byte) error {
|
||||
var err error
|
||||
switch strings.ToUpper(tp) {
|
||||
case "KV":
|
||||
_, err = db.Del(key)
|
||||
case "HASH":
|
||||
_, err = db.HClear(key)
|
||||
case "LIST":
|
||||
_, err = db.LClear(key)
|
||||
case "SET":
|
||||
_, err = db.SClear(key)
|
||||
case "ZSET":
|
||||
_, err = db.ZClear(key)
|
||||
default:
|
||||
err = fmt.Errorf("invalid key type %s", tp)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func xttl(db *ledis.DB, tp string, key []byte) (int64, error) {
|
||||
switch strings.ToUpper(tp) {
|
||||
case "KV":
|
||||
return db.TTL(key)
|
||||
case "HASH":
|
||||
return db.HTTL(key)
|
||||
case "LIST":
|
||||
return db.LTTL(key)
|
||||
case "SET":
|
||||
return db.STTL(key)
|
||||
case "ZSET":
|
||||
return db.ZTTL(key)
|
||||
default:
|
||||
return 0, fmt.Errorf("invalid key type %s", tp)
|
||||
}
|
||||
}
|
||||
|
||||
func xscan(db *ledis.DB, tp string, count int) ([][]byte, error) {
|
||||
switch strings.ToUpper(tp) {
|
||||
case "KV":
|
||||
return db.Scan(nil, count, false, "")
|
||||
case "HASH":
|
||||
return db.HScan(nil, count, false, "")
|
||||
case "LIST":
|
||||
return db.LScan(nil, count, false, "")
|
||||
case "SET":
|
||||
return db.SScan(nil, count, false, "")
|
||||
case "ZSET":
|
||||
return db.ZScan(nil, count, false, "")
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid key type %s", tp)
|
||||
}
|
||||
}
|
||||
|
||||
func xdumpCommand(c *client) error {
|
||||
args := c.args
|
||||
if len(args) != 2 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
tp := string(args[0])
|
||||
key := args[1]
|
||||
|
||||
if data, err := xdump(c.db, tp, key); err != nil {
|
||||
return err
|
||||
} else {
|
||||
c.resp.writeBulk(data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *App) getMigrateClient(addr string) *goledis.Client {
|
||||
app.migrateM.Lock()
|
||||
|
||||
mc, ok := app.migrateClients[addr]
|
||||
if !ok {
|
||||
mc = goledis.NewClient(&goledis.Config{addr, 4, 0, 0})
|
||||
app.migrateClients[addr] = mc
|
||||
|
||||
}
|
||||
|
||||
app.migrateM.Unlock()
|
||||
|
||||
return mc
|
||||
}
|
||||
|
||||
//XMIGRATEDB host port tp count db timeout
|
||||
//select count tp type keys and migrate
|
||||
//will block any other write operations
|
||||
//maybe only for xcodis
|
||||
func xmigratedbCommand(c *client) error {
|
||||
args := c.args
|
||||
if len(args) != 6 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1]))
|
||||
if addr == c.app.cfg.Addr {
|
||||
//same server, can not migrate
|
||||
return fmt.Errorf("migrate in same server is not allowed")
|
||||
}
|
||||
|
||||
tp := string(args[2])
|
||||
|
||||
count, err := ledis.StrInt64(args[3], nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if count <= 0 {
|
||||
count = 10
|
||||
}
|
||||
|
||||
db, err := ledis.StrUint64(args[4], nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if db >= uint64(ledis.MaxDBNumber) {
|
||||
return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber)
|
||||
}
|
||||
|
||||
timeout, err := ledis.StrInt64(args[5], nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if timeout < 0 {
|
||||
return fmt.Errorf("invalid timeout %d", timeout)
|
||||
}
|
||||
|
||||
m, err := c.db.Multi()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.Close()
|
||||
|
||||
keys, err := xscan(m.DB, tp, int(count))
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(keys) == 0 {
|
||||
c.resp.writeInteger(0)
|
||||
return nil
|
||||
}
|
||||
|
||||
mc := c.app.getMigrateClient(addr)
|
||||
|
||||
conn := mc.Get()
|
||||
|
||||
//timeout is milliseconds
|
||||
t := time.Duration(timeout) * time.Millisecond
|
||||
conn.SetConnectTimeout(t)
|
||||
|
||||
if _, err = conn.Do("select", db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
data, err := xdump(m.DB, tp, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ttl, err := xttl(m.DB, tp, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(t))
|
||||
|
||||
//ttl is second, but restore need millisecond
|
||||
if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = xdel(m.DB, tp, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
c.resp.writeInteger(int64(len(keys)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//XMIGRATE host port type key destination-db timeout
|
||||
//will block any other write operations
|
||||
//maybe only for xcodis
|
||||
func xmigrateCommand(c *client) error {
|
||||
args := c.args
|
||||
|
||||
if len(args) != 6 {
|
||||
return ErrCmdParams
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%s", string(args[0]), string(args[1]))
|
||||
if addr == c.app.cfg.Addr {
|
||||
//same server, can not migrate
|
||||
return fmt.Errorf("migrate in same server is not allowed")
|
||||
}
|
||||
|
||||
tp := string(args[2])
|
||||
key := args[3]
|
||||
db, err := ledis.StrUint64(args[4], nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if db >= uint64(ledis.MaxDBNumber) {
|
||||
return fmt.Errorf("invalid db index %d, must < %d", db, ledis.MaxDBNumber)
|
||||
}
|
||||
|
||||
timeout, err := ledis.StrInt64(args[5], nil)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if timeout < 0 {
|
||||
return fmt.Errorf("invalid timeout %d", timeout)
|
||||
}
|
||||
|
||||
m, err := c.db.Multi()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.Close()
|
||||
|
||||
data, err := xdump(m.DB, tp, key)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if data == nil {
|
||||
c.resp.writeStatus(NOKEY)
|
||||
return nil
|
||||
}
|
||||
|
||||
ttl, err := xttl(m.DB, tp, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mc := c.app.getMigrateClient(addr)
|
||||
|
||||
conn := mc.Get()
|
||||
|
||||
//timeout is milliseconds
|
||||
t := time.Duration(timeout) * time.Millisecond
|
||||
conn.SetConnectTimeout(t)
|
||||
|
||||
if _, err = conn.Do("select", db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(t))
|
||||
|
||||
//ttl is second, but restore need millisecond
|
||||
if _, err = conn.Do("restore", key, ttl*1e3, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = xdel(m.DB, tp, key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.resp.writeStatus(OK)
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
register("dump", dumpCommand)
|
||||
register("ldump", ldumpCommand)
|
||||
register("hdump", hdumpCommand)
|
||||
register("sdump", sdumpCommand)
|
||||
register("zdump", zdumpCommand)
|
||||
register("restore", restoreCommand)
|
||||
register("xdump", xdumpCommand)
|
||||
register("xmigrate", xmigrateCommand)
|
||||
register("xmigratedb", xmigratedbCommand)
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/siddontang/ledisdb/client/go/ledis"
|
||||
"github.com/siddontang/ledisdb/config"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDumpRestore(t *testing.T) {
|
||||
c := getTestConn()
|
||||
defer c.Close()
|
||||
|
||||
var err error
|
||||
_, err = c.Do("set", "mtest_a", "1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = c.Do("rpush", "mtest_la", "1", "2", "3")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = c.Do("hmset", "mtest_ha", "a", "1", "b", "2")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = c.Do("sadd", "mtest_sa", "1", "2", "3")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = c.Do("zadd", "mtest_za", 1, "a", 2, "b", 3, "c")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testDumpRestore(c, "dump", "mtest_a", t)
|
||||
testDumpRestore(c, "ldump", "mtest_la", t)
|
||||
testDumpRestore(c, "hdump", "mtest_ha", t)
|
||||
testDumpRestore(c, "sdump", "mtest_sa", t)
|
||||
testDumpRestore(c, "zdump", "mtest_za", t)
|
||||
}
|
||||
|
||||
func testDumpRestore(c *ledis.Conn, dump string, key string, t *testing.T) {
|
||||
if data, err := ledis.Bytes(c.Do(dump, key)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if _, err := c.Do("restore", key, 0, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrate(t *testing.T) {
|
||||
data_dir := "/tmp/test_migrate"
|
||||
os.RemoveAll(data_dir)
|
||||
|
||||
s1Cfg := config.NewConfigDefault()
|
||||
s1Cfg.DataDir = fmt.Sprintf("%s/s1", data_dir)
|
||||
s1Cfg.Addr = "127.0.0.1:11185"
|
||||
|
||||
s2Cfg := config.NewConfigDefault()
|
||||
s2Cfg.DataDir = fmt.Sprintf("%s/s2", data_dir)
|
||||
s2Cfg.Addr = "127.0.0.1:11186"
|
||||
|
||||
s1, err := NewApp(s1Cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s1.Close()
|
||||
|
||||
s2, err := NewApp(s2Cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s2.Close()
|
||||
|
||||
go s1.Run()
|
||||
|
||||
go s2.Run()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
c1 := ledis.NewConn(s1Cfg.Addr)
|
||||
defer c1.Close()
|
||||
|
||||
c2 := ledis.NewConn(s2Cfg.Addr)
|
||||
defer c2.Close()
|
||||
|
||||
if _, err = c1.Do("set", "a", "1"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
timeout := 30000
|
||||
if _, err = c1.Do("xmigrate", "127.0.0.1", 11186, "KV", "a", 0, timeout); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if s, err := ledis.String(c2.Do("get", "a")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if s != "1" {
|
||||
t.Fatal(s, "must 1")
|
||||
}
|
||||
|
||||
if s, err := ledis.String(c1.Do("get", "a")); err != nil && err != ledis.ErrNil {
|
||||
t.Fatal(err)
|
||||
} else if s != "" {
|
||||
t.Fatal(s, "must empty")
|
||||
}
|
||||
|
||||
if num, err := ledis.Int(c2.Do("xmigratedb", "127.0.0.1", 11185, "KV", 10, 0, timeout)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if num != 1 {
|
||||
t.Fatal(num, "must number 1")
|
||||
}
|
||||
|
||||
if s, err := ledis.String(c1.Do("get", "a")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if s != "1" {
|
||||
t.Fatal(s, "must 1")
|
||||
}
|
||||
|
||||
if s, err := ledis.String(c2.Do("get", "a")); err != nil && err != ledis.ErrNil {
|
||||
t.Fatal(err)
|
||||
} else if s != "" {
|
||||
t.Fatal(s, "must empty")
|
||||
}
|
||||
|
||||
}
|
|
@ -20,8 +20,9 @@ var (
|
|||
NullBulk = []byte("-1")
|
||||
NullArray = []byte("-1")
|
||||
|
||||
PONG = "PONG"
|
||||
OK = "OK"
|
||||
PONG = "PONG"
|
||||
OK = "OK"
|
||||
NOKEY = "NOKEY"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue