Merge branch 'develop'

This commit is contained in:
siddontang 2014-08-15 16:23:16 +08:00
commit 88d2c1b27c
37 changed files with 2378 additions and 56 deletions

View File

@ -2,16 +2,16 @@
Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash, zset, bitmap, and may be alternative for Redis. Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash, zset, bitmap, and may be alternative for Redis.
LedisDB now supports multi databases as backend to store data, you can test and choose the proper one for you. LedisDB now supports multiple databases as backend to store data, you can test and choose the proper one for you.
## Features ## Features
+ Rich advanced data structure: KV, List, Hash, ZSet, Bitmap. + Rich advanced data structure: KV, List, Hash, ZSet, Bitmap.
+ Stores lots of data, over the memory limit. + Stores lots of data, over the memory limit.
+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB. + Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, HyperLevelDB.
+ Supports expiration and ttl. + Supports expiration and ttl.
+ Redis clients, like redis-cli, are supported directly. + Redis clients, like redis-cli, are supported directly.
+ Multi client API supports, including Go, Python, Lua(Openresty). + Multiple client API supports, including Go, Python, Lua(Openresty), C/C++, Node.js.
+ Easy to embed in your own Go application. + Easy to embed in your own Go application.
+ Restful API support, json/bson/msgpack output. + Restful API support, json/bson/msgpack output.
+ Replication to guarantee data safe. + Replication to guarantee data safe.
@ -51,16 +51,29 @@ Create a workspace and checkout ledisdb source
## RocksDB support ## RocksDB support
+ Install rocksdb(shared_lib) and snappy first. + [Install rocksdb](https://github.com/facebook/rocksdb/blob/master/INSTALL.md)(`make shared_lib`) and snappy first.
LedisDB has not supplied a simple script to install, maybe later. LedisDB has not supplied a simple script to install, maybe later.
+ Set ```ROCKSDB_DIR``` and ```SNAPPY_DIR``` to the actual install path in dev.sh. + Set ```ROCKSDB_DIR``` and ```SNAPPY_DIR``` to the actual install path in `dev.sh`.
+ ```make``` + ```make```
## HyperLevelDB support
+ [Install hyperleveldb](https://github.com/rescrv/HyperLevelDB/blob/master/README) and snappy first.
LedisDB has not supplied a simple script to install, maybe later.
+ Set `HYPERLEVELDB` and `SNAPPY_DIR` to the actual install path in `dev.sh`.
+ `make`
## Choose store database ## Choose store database
LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, it will choose goleveldb as default to store data if you not set. LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, hyperleveldb. it will choose goleveldb as default to store data if you not set.
Choosing a store database to use is very simple, you have two ways: Choosing a store database to use is very simple, you have two ways:
@ -139,7 +152,6 @@ See [Issues todo](https://github.com/siddontang/ledisdb/issues?labels=todo&page=
## Links ## Links
+ [Official Website](http://ledisdb.com) + [Official Website](http://ledisdb.com)
+ [Author's Chinese Blog](http://blog.csdn.net/siddontang/article/category/2264003)
+ [GoDoc](https://godoc.org/github.com/siddontang/ledisdb) + [GoDoc](https://godoc.org/github.com/siddontang/ledisdb)
+ [Server Commands](https://github.com/siddontang/ledisdb/wiki/Commands) + [Server Commands](https://github.com/siddontang/ledisdb/wiki/Commands)

View File

@ -35,4 +35,17 @@ client.bget("bit key 3", function(err, result){
} }
}); });
client.quit(); //test zunionstore & zinterstore
client.zadd("zset1", 1, "one")
client.zadd("zset1", 2, "two")
client.zadd("zset2", 1, "one")
client.zadd("zset2", 2, "two")
client.zadd("zset2", 3, "three")
client.zunionstore("out", 2, "zset1", "zset2", "weights", 2, 3, ledis.print)
client.zrange("out", 0, -1, "withscores", ledis.print)
client.zinterstore("out", 2, "zset1", "zset2", "weights", 2, 3, ledis.print)
client.zrange("out", 0, -1, "withscores", ledis.print)
client.quit()

View File

@ -83,6 +83,8 @@ module.exports = [
"zrevrank", "zrevrank",
"zrevrangebyscore", "zrevrangebyscore",
"zscore", "zscore",
"zunionstore",
"zinterstore",
"zclear", "zclear",

View File

@ -35,6 +35,7 @@ type LevelDBConfig struct {
type LMDBConfig struct { type LMDBConfig struct {
MapSize int `toml:"map_size" json:"map_size"` MapSize int `toml:"map_size" json:"map_size"`
NoSync bool `toml:"nosync" json:"nosync"`
} }
type BinLogConfig struct { type BinLogConfig struct {

View File

@ -14,7 +14,8 @@
}, },
"lmdb" : { "lmdb" : {
"map_size" : 524288000 "map_size" : 524288000,
"nosync" : true
}, },
"access_log" : "" "access_log" : ""

View File

@ -34,6 +34,7 @@ max_open_files = 1024
[lmdb] [lmdb]
map_size = 524288000 map_size = 524288000
nosync = true
[binlog] [binlog]
max_file_size = 0 max_file_size = 0

View File

@ -18,6 +18,7 @@ func TestConfig(t *testing.T) {
dstCfg.LevelDB.CacheSize = 524288000 dstCfg.LevelDB.CacheSize = 524288000
dstCfg.LevelDB.MaxOpenFiles = 1024 dstCfg.LevelDB.MaxOpenFiles = 1024
dstCfg.LMDB.MapSize = 524288000 dstCfg.LMDB.MapSize = 524288000
dstCfg.LMDB.NoSync = true
cfg, err := NewConfigWithFile("./config.toml") cfg, err := NewConfigWithFile("./config.toml")
if err != nil { if err != nil {

11
dev.sh
View File

@ -13,6 +13,7 @@ fi
SNAPPY_DIR=/usr/local/snappy SNAPPY_DIR=/usr/local/snappy
LEVELDB_DIR=/usr/local/leveldb LEVELDB_DIR=/usr/local/leveldb
ROCKSDB_DIR=/usr/local/rocksdb ROCKSDB_DIR=/usr/local/rocksdb
HYPERLEVELDB_DIR=/usr/local/hyperleveldb
function add_path() function add_path()
{ {
@ -63,6 +64,16 @@ if [ -f $ROCKSDB_DIR/include/rocksdb/c.h ]; then
GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb" GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb"
fi fi
#check hyperleveldb
if [ -f $HYPERLEVELDB_DIR/include/hyperleveldb/c.h ]; then
CGO_CFLAGS="$CGO_CFLAGS -I$HYPERLEVELDB_DIR/include"
CGO_CXXFLAGS="$CGO_CXXFLAGS -I$HYPERLEVELDB_DIR/include"
CGO_LDFLAGS="$CGO_LDFLAGS -L$HYPERLEVELDB_DIR/lib -lhyperleveldb"
LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib)
DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib)
GO_BUILD_TAGS="$GO_BUILD_TAGS hyperleveldb"
fi
export CGO_CFLAGS export CGO_CFLAGS
export CGO_CXXFLAGS export CGO_CXXFLAGS
export CGO_LDFLAGS export CGO_LDFLAGS

View File

@ -17,7 +17,7 @@
"BEXPIREAT": { "BEXPIREAT": {
"arguments": "key timestamp", "arguments": "key timestamp",
"group": "Bitmap", "group": "Bitmap",
"readonly": false, "readonly": false
}, },
"BGET": { "BGET": {
"arguments": "key", "arguments": "key",
@ -414,5 +414,16 @@
"arguments": "key", "arguments": "key",
"group": "ZSet", "group": "ZSet",
"readonly": true "readonly": true
},
"ZUNIONSTORE":{
"arguments": "destkey numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]",
"group": "ZSet",
"readonly": false
},
"ZINTERSTORE":{
"arguments": "destkey numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]",
"group": "ZSet",
"readonly": false
} }
} }

View File

@ -79,6 +79,11 @@ Table of Contents
- [ZEXPIREAT key timestamp](#zexpireat-key-timestamp) - [ZEXPIREAT key timestamp](#zexpireat-key-timestamp)
- [ZTTL key](#zttl-key) - [ZTTL key](#zttl-key)
- [ZPERSIST key](#zpersist-key) - [ZPERSIST key](#zpersist-key)
- [ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
](#zunionstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax)
- [ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
](#zinterstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax)
- [Bitmap](#bitmap) - [Bitmap](#bitmap)
- [BGET key](#bget-key) - [BGET key](#bget-key)
@ -1629,6 +1634,83 @@ ledis> ZTTL mset
(integer) -1 (integer) -1
``` ```
### ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
Computes the union of numkeys sorted sets given by the specified keys, and stores the result in destination. It is mandatory to provide the number of input keys (numkeys) before passing the input keys and the other (optional) arguments.
By default, the resulting score of an element is the sum of its scores in the sorted sets where it exists.
Using the WEIGHTS option, it is possible to specify a multiplication factor for each input sorted set. This means that the score of every element in every input sorted set is multiplied by this factor before being passed to the aggregation function. When WEIGHTS is not given, the multiplication factors default to 1.
With the AGGREGATE option, it is possible to specify how the results of the union are aggregated. This option defaults to SUM, where the score of an element is summed across the inputs where it exists. When this option is set to either MIN or MAX, the resulting set will contain the minimum or maximum score of an element across the inputs where it exists.
If destination already exists, it is overwritten.
**Return value**
int64: the number of elements in the resulting sorted set at destination.
**Examples**
```
ledis> ZADD zset1 1 "one"
(interger) 1
ledis> ZADD zset1 2 "two"
(interger) 1
ledis> ZADD zset2 1 "one"
(interger) 1
ledis> ZADD zset2 2 "two"
(interger) 1
ledis> ZADD zset2 3 "three"
(interger) 1
ledis> ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3
(interger) 3
ledis> ZRANGE out 0 -1 WITHSCORES
1) "one"
2) "5"
3) "three"
4) "9"
5) "two"
6) "10"
```
### ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
Computes the intersection of numkeys sorted sets given by the specified keys, and stores the result in destination. It is mandatory to provide the number of input keys (numkeys) before passing the input keys and the other (optional) arguments.
By default, the resulting score of an element is the sum of its scores in the sorted sets where it exists. Because intersection requires an element to be a member of every given sorted set, this results in the score of every element in the resulting sorted set to be equal to the number of input sorted sets.
For a description of the `WEIGHTS` and `AGGREGATE` options, see [ZUNIONSTORE](#zunionstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax).
If destination already exists, it is overwritten.
**Return value**
int64: the number of elements in the resulting sorted set at destination.
**Examples**
```
ledis> ZADD zset1 1 "one"
(interger) 1
ledis> ZADD zset1 2 "two"
(interger) 1
ledis> ZADD zset2 1 "one"
(interger) 1
ledis> ZADD zset2 2 "two"
(interger) 1
ledis> ZADD zset2 3 "three"
(interger) 1
ledis> ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3
(interger) 3
ledis> ZRANGE out 0 -1 WITHSCORES
1) "one"
2) "5"
3) "two"
4) "10"
```
## Bitmap ## Bitmap

View File

@ -36,6 +36,7 @@ max_open_files = 1024
[lmdb] [lmdb]
map_size = 524288000 map_size = 524288000
nosync = true
[binlog] [binlog]
# Set either size or num to 0 to disable binlog # Set either size or num to 0 to disable binlog

View File

@ -2,32 +2,18 @@
import json import json
import time import time
import sys
import os
from collections import OrderedDict as dict from collections import OrderedDict as dict
content = u"""\n
def go_array_to_json(path): type cmdConf struct {
"""Convert `./cmd/ledis-cli/const.go` to commands.json""" name string
fp = open(path).read() argDesc string
commands_str = fp.split("string")[1] group string
_commands_str = commands_str.splitlines()[1:len(commands_str.splitlines())-1] readonly bool
commands_d = dict()
values_d = dict()
for i in _commands_str:
t = i.split('"')
values_d.update(
{
"arguments": "%s" % t[3],
"group": "%s" % t[5]
})
values_d = dict(sorted(values_d.items()))
d = {
"%s" % t[1]: values_d
} }
commands_d.update(d) """
fp = open("commands.json", "w")
json.dump(commands_d, fp, indent=4)
fp.close()
def json_to_js(json_path, js_path): def json_to_js(json_path, js_path):
@ -53,22 +39,64 @@ def json_to_go_array(json_path, go_path):
g_fp.write("package main\n\nvar helpCommands = [][]string{\n") g_fp.write("package main\n\nvar helpCommands = [][]string{\n")
_json_sorted = dict(sorted(_json.items(), key=lambda x: x[0])) _json_sorted = dict(sorted(_json.items(), key=lambda x: x[0]))
for k, v in _json_sorted.iteritems(): for k, v in _json_sorted.iteritems():
print k, v
g_fp.write('\t{"%s", "%s", "%s"},\n' % (k, v["arguments"], v["group"])) g_fp.write('\t{"%s", "%s", "%s"},\n' % (k, v["arguments"], v["group"]))
g_fp.write("}\n") g_fp.write("}\n")
g_fp.close() g_fp.close()
def json_to_command_cnf(json_path, go_path):
g_fp = open(go_path, "w")
with open(json_path) as fp:
_json = json.load(fp)
generate_time(g_fp)
g_fp.write("package server")
print >> g_fp, content
g_fp.write("var cnfCmds = []cmdConf{\n")
for k, v in _json.iteritems():
g_fp.write('\t{\n\t\t"%s",\n\t\t"%s",\n\t\t"%s", \n\t\t%s,\n\t},\n' %
(k, v["arguments"], v["group"], "true" if v["readonly"] else "false" ))
g_fp.write("}\n")
g_fp.close()
def generate_time(fp): def generate_time(fp):
fp.write("//This file was generated by ./generate.py on %s \n" % \ fp.write("//This file was generated by ./generate.py on %s \n" %
time.strftime('%a %b %d %Y %H:%M:%S %z')) time.strftime('%a %b %d %Y %H:%M:%S %z'))
if __name__ == "__main__":
path = "./cmd/ledis-cli/const.go"
# go_array_to_json(path)
json_path = "./commands.json"
js_path = "./commands.js"
json_to_js(json_path, js_path)
go_path = "const.go"
json_to_go_array(json_path, path) if __name__ == "__main__":
usage = """
Usage: python %s src_path dst_path"
1. for Node.js client:
python generate.py /path/to/commands.json /path/to/commands.js
2. for cmd/ledis_cli/const.go
python generate.py /path/to/commands.json /path/to/const.go
3. for server/command_cnf.go
python generate.py /path/to/commands.json /path/to/command_cnf.go
"""
if len(sys.argv) != 3:
sys.exit(usage % os.path.basename(sys.argv[0]))
src_path, dst_path = sys.argv[1:]
dst_path_base = os.path.basename(dst_path)
if dst_path_base.endswith(".js"):
json_to_js(src_path, dst_path)
elif dst_path_base.startswith("const.go"):
json_to_go_array(src_path, dst_path)
elif dst_path_base.startswith("command"):
json_to_command_cnf(src_path, dst_path)
else:
print "Not support arguments"

View File

@ -231,15 +231,20 @@ func (db *DB) bSetMeta(t *tx, key []byte, tailSeq uint32, tailOff uint32) {
} }
func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) {
var ts, to int32 var tseq, toff int32
if ts, to, err = db.bGetMeta(key); err != nil { var update bool = false
if tseq, toff, err = db.bGetMeta(key); err != nil {
return return
} else if tseq < 0 {
update = true
} else { } else {
tailSeq = uint32(MaxInt32(ts, 0)) tailSeq = uint32(MaxInt32(tseq, 0))
tailOff = uint32(MaxInt32(to, 0)) tailOff = uint32(MaxInt32(toff, 0))
update = (seq > tailSeq || (seq == tailSeq && off > tailOff))
} }
if seq > tailSeq || (seq == tailSeq && off > tailOff) { if update {
db.bSetMeta(t, key, seq, off) db.bSetMeta(t, key, seq, off)
tailSeq = seq tailSeq = seq
tailOff = off tailOff = off

View File

@ -41,6 +41,7 @@ func TestBinary(t *testing.T) {
testOpXor(t) testOpXor(t)
testOpNot(t) testOpNot(t)
testMSetBit(t) testMSetBit(t)
testBitExpire(t)
} }
func testSimple(t *testing.T) { func testSimple(t *testing.T) {
@ -518,3 +519,20 @@ func testMSetBit(t *testing.T) {
return return
} }
func testBitExpire(t *testing.T) {
db := getTestDB()
db.FlushAll()
key := []byte("test_b_ttl")
db.BSetBit(key, 0, 1)
if res, err := db.BExpire(key, 100); res != 1 || err != nil {
t.Fatal(false)
}
if ttl, err := db.BTTL(key); ttl != 100 || err != nil {
t.Fatal(false)
}
}

View File

@ -12,6 +12,10 @@ const (
MinScore int64 = -1<<63 + 1 MinScore int64 = -1<<63 + 1
MaxScore int64 = 1<<63 - 1 MaxScore int64 = 1<<63 - 1
InvalidScore int64 = -1 << 63 InvalidScore int64 = -1 << 63
AggregateSum byte = 0
AggregateMin byte = 1
AggregateMax byte = 2
) )
type ScorePair struct { type ScorePair struct {
@ -23,6 +27,9 @@ var errZSizeKey = errors.New("invalid zsize key")
var errZSetKey = errors.New("invalid zset key") var errZSetKey = errors.New("invalid zset key")
var errZScoreKey = errors.New("invalid zscore key") var errZScoreKey = errors.New("invalid zscore key")
var errScoreOverflow = errors.New("zset score overflow") var errScoreOverflow = errors.New("zset score overflow")
var errInvalidAggregate = errors.New("invalid aggregate")
var errInvalidWeightNum = errors.New("invalid weight number")
var errInvalidSrcKeyNum = errors.New("invalid src key number")
const ( const (
zsetNScoreSep byte = '<' zsetNScoreSep byte = '<'
@ -839,3 +846,166 @@ func (db *DB) ZPersist(key []byte) (int64, error) {
err = t.Commit() err = t.Commit()
return n, err return n, err
} }
func getAggregateFunc(aggregate byte) func(int64, int64) int64 {
switch aggregate {
case AggregateSum:
return func(a int64, b int64) int64 {
return a + b
}
case AggregateMax:
return func(a int64, b int64) int64 {
if a > b {
return a
}
return b
}
case AggregateMin:
return func(a int64, b int64) int64 {
if a > b {
return b
}
return a
}
}
return nil
}
func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) {
var destMap = map[string]int64{}
aggregateFunc := getAggregateFunc(aggregate)
if aggregateFunc == nil {
return 0, errInvalidAggregate
}
if len(srcKeys) < 1 {
return 0, errInvalidSrcKeyNum
}
if weights != nil {
if len(srcKeys) != len(weights) {
return 0, errInvalidWeightNum
}
} else {
weights = make([]int64, len(srcKeys))
for i := 0; i < len(weights); i++ {
weights[i] = 1
}
}
for i, key := range srcKeys {
scorePairs, err := db.ZRange(key, 0, -1)
if err != nil {
return 0, err
}
for _, pair := range scorePairs {
if score, ok := destMap[String(pair.Member)]; !ok {
destMap[String(pair.Member)] = pair.Score * weights[i]
} else {
destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i])
}
}
}
t := db.zsetTx
t.Lock()
defer t.Unlock()
db.zDelete(t, destKey)
var num int64 = 0
for member, score := range destMap {
if err := checkZSetKMSize(destKey, []byte(member)); err != nil {
return 0, err
}
if n, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil {
return 0, err
} else if n == 0 {
//add new
num++
}
}
if _, err := db.zIncrSize(t, destKey, num); err != nil {
return 0, err
}
//todo add binlog
if err := t.Commit(); err != nil {
return 0, err
}
return int64(len(destMap)), nil
}
func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) {
aggregateFunc := getAggregateFunc(aggregate)
if aggregateFunc == nil {
return 0, errInvalidAggregate
}
if len(srcKeys) < 1 {
return 0, errInvalidSrcKeyNum
}
if weights != nil {
if len(srcKeys) != len(weights) {
return 0, errInvalidWeightNum
}
} else {
weights = make([]int64, len(srcKeys))
for i := 0; i < len(weights); i++ {
weights[i] = 1
}
}
var destMap = map[string]int64{}
scorePairs, err := db.ZRange(srcKeys[0], 0, -1)
if err != nil {
return 0, err
}
for _, pair := range scorePairs {
destMap[String(pair.Member)] = pair.Score * weights[0]
}
for i, key := range srcKeys[1:] {
scorePairs, err := db.ZRange(key, 0, -1)
if err != nil {
return 0, err
}
tmpMap := map[string]int64{}
for _, pair := range scorePairs {
if score, ok := destMap[String(pair.Member)]; ok {
tmpMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i+1])
}
}
destMap = tmpMap
}
t := db.zsetTx
t.Lock()
defer t.Unlock()
db.zDelete(t, destKey)
var num int64 = 0
for member, score := range destMap {
if err := checkZSetKMSize(destKey, []byte(member)); err != nil {
return 0, err
}
if n, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil {
return 0, err
} else if n == 0 {
//add new
num++
}
}
if _, err := db.zIncrSize(t, destKey, num); err != nil {
return 0, err
}
//todo add binlog
if err := t.Commit(); err != nil {
return 0, err
}
return int64(len(destMap)), nil
}

View File

@ -264,3 +264,122 @@ func TestZSetPersist(t *testing.T) {
t.Fatal(n) t.Fatal(n)
} }
} }
func TestZUnionStore(t *testing.T) {
db := getTestDB()
key1 := []byte("key1")
key2 := []byte("key2")
db.ZAdd(key1, ScorePair{1, []byte("one")})
db.ZAdd(key1, ScorePair{1, []byte("two")})
db.ZAdd(key2, ScorePair{2, []byte("two")})
db.ZAdd(key2, ScorePair{2, []byte("three")})
keys := [][]byte{key1, key2}
weights := []int64{1, 2}
out := []byte("out")
n, err := db.ZUnionStore(out, keys, weights, AggregateSum)
if err != nil {
t.Fatal(err.Error())
}
if n != 3 {
t.Fatal("invalid value ", n)
}
v, err := db.ZScore(out, []byte("two"))
if err != nil {
t.Fatal(err.Error())
}
if v != 5 {
t.Fatal("invalid value ", v)
}
out = []byte("out")
n, err = db.ZUnionStore(out, keys, weights, AggregateMax)
if err != nil {
t.Fatal(err.Error())
}
if n != 3 {
t.Fatal("invalid value ", n)
}
v, err = db.ZScore(out, []byte("two"))
if err != nil {
t.Fatal(err.Error())
}
if v != 4 {
t.Fatal("invalid value ", v)
}
n, err = db.ZCount(out, 0, 0XFFFE)
if err != nil {
t.Fatal(err.Error())
}
if n != 3 {
t.Fatal("invalid value ", v)
}
}
func TestZInterStore(t *testing.T) {
db := getTestDB()
key1 := []byte("key1")
key2 := []byte("key2")
db.ZAdd(key1, ScorePair{1, []byte("one")})
db.ZAdd(key1, ScorePair{1, []byte("two")})
db.ZAdd(key2, ScorePair{2, []byte("two")})
db.ZAdd(key2, ScorePair{2, []byte("three")})
keys := [][]byte{key1, key2}
weights := []int64{2, 3}
out := []byte("out")
n, err := db.ZInterStore(out, keys, weights, AggregateSum)
if err != nil {
t.Fatal(err.Error())
}
if n != 1 {
t.Fatal("invalid value ", n)
}
v, err := db.ZScore(out, []byte("two"))
if err != nil {
t.Fatal(err.Error())
}
if v != 8 {
t.Fatal("invalid value ", v)
}
out = []byte("out")
n, err = db.ZInterStore(out, keys, weights, AggregateMin)
if err != nil {
t.Fatal(err.Error())
}
if n != 1 {
t.Fatal("invalid value ", n)
}
v, err = db.ZScore(out, []byte("two"))
if err != nil {
t.Fatal(err.Error())
}
if v != 2 {
t.Fatal("invalid value ", v)
}
n, err = db.ZCount(out, 0, 0XFFFF)
if err != nil {
t.Fatal(err.Error())
}
if n != 1 {
t.Fatal("invalid value ", n)
}
}

View File

@ -19,7 +19,7 @@ var allowedContentTypes = map[string]struct{}{
"bson": struct{}{}, "bson": struct{}{},
"msgpack": struct{}{}, "msgpack": struct{}{},
} }
var unsopportedCommands = map[string]struct{}{ var unsupportedCommands = map[string]struct{}{
"slaveof": struct{}{}, "slaveof": struct{}{},
"fullsync": struct{}{}, "fullsync": struct{}{},
"sync": struct{}{}, "sync": struct{}{},
@ -87,7 +87,7 @@ func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWrite
} }
req.cmd = strings.ToLower(cmd) req.cmd = strings.ToLower(cmd)
if _, ok := unsopportedCommands[req.cmd]; ok { if _, ok := unsupportedCommands[req.cmd]; ok {
return nil, fmt.Errorf("unsupported command: '%s'", cmd) return nil, fmt.Errorf("unsupported command: '%s'", cmd)
} }
@ -210,7 +210,7 @@ func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool)
} }
func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) { func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) {
w.writeError(fmt.Errorf("unsuport")) w.writeError(fmt.Errorf("unsupport"))
} }
func (w *httpWriter) flush() { func (w *httpWriter) flush() {

View File

@ -520,6 +520,120 @@ func zpersistCommand(req *requestContext) error {
return nil return nil
} }
func zparseZsetoptStore(args [][]byte) (destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte, err error) {
destKey = args[0]
nKeys, err := strconv.Atoi(ledis.String(args[1]))
if err != nil {
err = ErrValue
return
}
args = args[2:]
if len(args) < nKeys {
err = ErrSyntax
return
}
srcKeys = args[:nKeys]
args = args[nKeys:]
var weightsFlag = false
var aggregateFlag = false
for len(args) > 0 {
if strings.ToLower(ledis.String(args[0])) == "weights" {
if weightsFlag {
err = ErrSyntax
return
}
args = args[1:]
if len(args) < nKeys {
err = ErrSyntax
return
}
weights = make([]int64, nKeys)
for i, arg := range args[:nKeys] {
if weights[i], err = ledis.StrInt64(arg, nil); err != nil {
err = ErrValue
return
}
}
args = args[nKeys:]
weightsFlag = true
} else if strings.ToLower(ledis.String(args[0])) == "aggregate" {
if aggregateFlag {
err = ErrSyntax
return
}
if len(args) < 2 {
err = ErrSyntax
return
}
if strings.ToLower(ledis.String(args[1])) == "sum" {
aggregate = ledis.AggregateSum
} else if strings.ToLower(ledis.String(args[1])) == "min" {
aggregate = ledis.AggregateMin
} else if strings.ToLower(ledis.String(args[1])) == "max" {
aggregate = ledis.AggregateMax
} else {
err = ErrSyntax
return
}
args = args[2:]
aggregateFlag = true
} else {
err = ErrSyntax
return
}
}
if !aggregateFlag {
aggregate = ledis.AggregateSum
}
return
}
func zunionstoreCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
destKey, srcKeys, weights, aggregate, err := zparseZsetoptStore(args)
if err != nil {
return err
}
if n, err := req.db.ZUnionStore(destKey, srcKeys, weights, aggregate); err != nil {
return err
} else {
req.resp.writeInteger(n)
}
return nil
}
func zinterstoreCommand(req *requestContext) error {
args := req.args
if len(args) < 2 {
return ErrCmdParams
}
destKey, srcKeys, weights, aggregate, err := zparseZsetoptStore(args)
if err != nil {
return err
}
if n, err := req.db.ZInterStore(destKey, srcKeys, weights, aggregate); err != nil {
return err
} else {
req.resp.writeInteger(n)
}
return nil
}
func init() { func init() {
register("zadd", zaddCommand) register("zadd", zaddCommand)
register("zcard", zcardCommand) register("zcard", zcardCommand)
@ -536,6 +650,9 @@ func init() {
register("zrevrangebyscore", zrevrangebyscoreCommand) register("zrevrangebyscore", zrevrangebyscoreCommand)
register("zscore", zscoreCommand) register("zscore", zscoreCommand)
register("zunionstore", zunionstoreCommand)
register("zinterstore", zinterstoreCommand)
//ledisdb special command //ledisdb special command
register("zclear", zclearCommand) register("zclear", zclearCommand)

View File

@ -599,3 +599,141 @@ func TestZsetErrorParams(t *testing.T) {
} }
} }
func TestZUnionStore(t *testing.T) {
c := getTestConn()
defer c.Close()
if _, err := c.Do("zadd", "k1", "1", "one"); err != nil {
t.Fatal(err.Error())
}
if _, err := c.Do("zadd", "k1", "2", "two"); err != nil {
t.Fatal(err.Error())
}
if _, err := c.Do("zadd", "k2", "1", "two"); err != nil {
t.Fatal(err.Error())
}
if _, err := c.Do("zadd", "k2", "2", "three"); err != nil {
t.Fatal(err.Error())
}
if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "weights", "1", "2")); err != nil {
t.Fatal(err.Error())
} else {
if n != 3 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "weights", "1", "2", "aggregate", "min")); err != nil {
t.Fatal(err.Error())
} else {
if n != 3 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "aggregate", "max")); err != nil {
t.Fatal(err.Error())
} else {
if n != 3 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil {
t.Fatal(err.Error())
} else {
if n != 2 {
t.Fatal("invalid value ", n)
}
}
}
func TestZInterStore(t *testing.T) {
c := getTestConn()
defer c.Close()
if _, err := c.Do("zadd", "k1", "1", "one"); err != nil {
t.Fatal(err.Error())
}
if _, err := c.Do("zadd", "k1", "2", "two"); err != nil {
t.Fatal(err.Error())
}
if _, err := c.Do("zadd", "k2", "1", "two"); err != nil {
t.Fatal(err.Error())
}
if _, err := c.Do("zadd", "k2", "2", "three"); err != nil {
t.Fatal(err.Error())
}
if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "weights", "1", "2")); err != nil {
t.Fatal(err.Error())
} else {
if n != 1 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "aggregate", "min", "weights", "1", "2")); err != nil {
t.Fatal(err.Error())
} else {
if n != 1 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "aggregate", "sum")); err != nil {
t.Fatal(err.Error())
} else {
if n != 1 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil {
t.Fatal(err.Error())
} else {
if n != 3 {
t.Fatal("invalid value ", n)
}
}
if _, err := c.Do("zadd", "k3", "3", "three"); err != nil {
t.Fatal(err.Error())
}
if n, err := ledis.Int64(c.Do("zinterstore", "out", "3", "k1", "k2", "k3", "aggregate", "sum")); err != nil {
t.Fatal(err.Error())
} else {
if n != 0 {
t.Fatal("invalid value ", n)
}
}
if _, err := c.Do("zadd", "k3", "3", "two"); err != nil {
t.Fatal(err.Error())
}
if n, err := ledis.Int64(c.Do("zinterstore", "out", "3", "k1", "k2", "k3", "aggregate", "sum", "weights", "3", "2", "2")); err != nil {
t.Fatal(err.Error())
} else {
if n != 1 {
t.Fatal("invalid value ", n)
}
}
if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil {
t.Fatal(err.Error())
} else {
if n != 14 {
t.Fatal("invalid value ", n)
}
}
}

510
server/command_cnf.go Normal file
View File

@ -0,0 +1,510 @@
//This file was generated by ./generate.py on Mon Aug 11 2014 12:35:56 +0800
package server
type cmdConf struct {
name string
argDesc string
group string
readonly bool
}
var cnfCmds = []cmdConf{
{
"ZRANGEBYSCORE",
"key min max [WITHSCORES] [LIMIT offset count]",
"ZSet",
true,
},
{
"ZPERSIST",
"key",
"ZSet",
false,
},
{
"LTTL",
"key",
"List",
true,
},
{
"LINDEX",
"key index",
"List",
true,
},
{
"FULLSYNC",
"-",
"Replication",
false,
},
{
"ZREVRANK",
"key member",
"ZSet",
true,
},
{
"ZEXPIRE",
"key seconds",
"ZSet",
false,
},
{
"SYNC",
"index offset",
"Replication",
false,
},
{
"BMSETBIT",
"key offset value [offset value ...]",
"Bitmap",
false,
},
{
"LPOP",
"key",
"List",
false,
},
{
"HPERSIST",
"key",
"Hash",
false,
},
{
"EXPIRE",
"key seconds",
"KV",
false,
},
{
"DEL",
"key [key ...]",
"KV",
false,
},
{
"LPUSH",
"key value [value ...]",
"List",
false,
},
{
"PERSIST",
"key",
"KV",
false,
},
{
"HTTL",
"key",
"Hash",
true,
},
{
"LEXPIREAT",
"key timestamp",
"List",
false,
},
{
"ZEXPIREAT",
"key timestamp",
"ZSet",
false,
},
{
"DECR",
"key",
"KV",
false,
},
{
"SLAVEOF",
"host port",
"Replication",
false,
},
{
"INCR",
"key",
"KV",
false,
},
{
"MSET",
"key value [key value ...]",
"KV",
false,
},
{
"LEXPIRE",
"key seconds",
"List",
false,
},
{
"HINCRBY",
"key field increment",
"Hash",
false,
},
{
"GET",
"key",
"KV",
true,
},
{
"ZREVRANGE",
"key start stop [WITHSCORES]",
"ZSet",
true,
},
{
"ZINCRBY",
"key increment member",
"ZSet",
false,
},
{
"LPERSIST",
"key",
"List",
false,
},
{
"HEXISTS",
"key field",
"Hash",
true,
},
{
"ZREM",
"key member [member ...]",
"ZSet",
false,
},
{
"BOPT",
"operation destkey key [key ...]",
"Bitmap",
false,
},
{
"ZCLEAR",
"key",
"ZSet",
false,
},
{
"LCLEAR",
"key",
"List",
false,
},
{
"ZRANK",
"key member",
"ZSet",
true,
},
{
"TTL",
"key",
"KV",
true,
},
{
"ZADD",
"key score member [score member ...]",
"ZSet",
false,
},
{
"HEXPIRE",
"key seconds",
"Hash",
false,
},
{
"HDEL",
"key field [field ...]",
"Hash",
false,
},
{
"HSET",
"key field value",
"Hash",
false,
},
{
"LLEN",
"key",
"List",
true,
},
{
"HVALS",
"key",
"Hash",
true,
},
{
"BCOUNT",
"key [start end]",
"Bitmap",
true,
},
{
"BGET",
"key",
"Bitmap",
true,
},
{
"MGET",
"key [key ...]",
"KV",
true,
},
{
"EXISTS",
"key",
"KV",
true,
},
{
"HMCLEAR",
"key [key ...]",
"Hash",
false,
},
{
"ZCOUNT",
"key min max",
"ZSet",
true,
},
{
"SELECT",
"index",
"Server",
false,
},
{
"ECHO",
"message",
"Server",
true,
},
{
"ZTTL",
"key",
"ZSet",
true,
},
{
"HKEYS",
"key",
"Hash",
true,
},
{
"HGETALL",
"key",
"Hash",
true,
},
{
"RPOP",
"key",
"List",
false,
},
{
"HMGET",
"key field [field ...]",
"Hash",
true,
},
{
"SETNX",
"key value",
"KV",
false,
},
{
"HGET",
"key field",
"Hash",
true,
},
{
"BPERSIST",
"key",
"Bitmap",
false,
},
{
"INCRBY",
"key increment",
"KV",
false,
},
{
"BDELETE",
"key",
"ZSet",
false,
},
{
"ZMCLEAR",
"key [key ...]",
"ZSet",
false,
},
{
"RPUSH",
"key value [value ...]",
"List",
false,
},
{
"LRANGE",
"key start stop",
"List",
true,
},
{
"HLEN",
"key",
"Hash",
true,
},
{
"ZSCORE",
"key member",
"ZSet",
true,
},
{
"LMCLEAR",
"key [key ...]",
"List",
false,
},
{
"EXPIREAT",
"key timestamp",
"KV",
false,
},
{
"ZREMRANGEBYSCORE",
"key min max",
"ZSet",
false,
},
{
"ZCARD",
"key",
"ZSet",
true,
},
{
"ZREMRANGEBYRANK",
"key start stop",
"ZSet",
false,
},
{
"PING",
"-",
"Server",
true,
},
{
"HMSET",
"key field value [field value ...]",
"Hash",
false,
},
{
"BTTL",
"key",
"Bitmap",
true,
},
{
"HCLEAR",
"key",
"Hash",
false,
},
{
"ZRANGE",
"key start stop [WITHSCORES]",
"ZSet",
false,
},
{
"ZREVRANGEBYSCORE",
"key max min [WITHSCORES][LIMIT offset count]",
"ZSet",
true,
},
{
"BSETBIT",
"key offset value",
"Bitmap",
false,
},
{
"BEXPIREAT",
"key timestamp",
"Bitmap",
false,
},
{
"SET",
"key value",
"KV",
false,
},
{
"BGETBIT",
"key offset",
"Bitmap",
true,
},
{
"BEXPIRE",
"key seconds",
"Bitmap",
false,
},
{
"GETSET",
" key value",
"KV",
false,
},
{
"DECRBY",
"key decrement",
"KV",
false,
},
{
"HEXPIREAT",
"key timestamp",
"Hash",
false,
},
}

11
store/hyperleveldb.go Normal file
View File

@ -0,0 +1,11 @@
// +build hyperleveldb
package store
import (
"github.com/siddontang/ledisdb/store/hyperleveldb"
)
func init() {
Register(hyperleveldb.Store{})
}

View File

@ -0,0 +1,61 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include "hyperleveldb/c.h"
import "C"
import (
"unsafe"
)
type WriteBatch struct {
db *DB
wbatch *C.leveldb_writebatch_t
}
func (w *WriteBatch) Close() error {
C.leveldb_writebatch_destroy(w.wbatch)
w.wbatch = nil
return nil
}
func (w *WriteBatch) Put(key, value []byte) {
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv))
}
func (w *WriteBatch) Delete(key []byte) {
C.leveldb_writebatch_delete(w.wbatch,
(*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}
func (w *WriteBatch) Commit() error {
return w.commit(w.db.writeOpts)
}
func (w *WriteBatch) Rollback() error {
C.leveldb_writebatch_clear(w.wbatch)
return nil
}
func (w *WriteBatch) commit(wb *WriteOptions) error {
var errStr *C.char
C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}

View File

@ -0,0 +1,20 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include <stdint.h>
// #include "hyperleveldb/c.h"
import "C"
type Cache struct {
Cache *C.leveldb_cache_t
}
func NewLRUCache(capacity int) *Cache {
return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))}
}
func (c *Cache) Close() {
C.leveldb_cache_destroy(c.Cache)
}

259
store/hyperleveldb/db.go Normal file
View File

@ -0,0 +1,259 @@
// +build hyperleveldb
// Package hyperleveldb is a wrapper for c++ hyperleveldb
package hyperleveldb
/*
#cgo LDFLAGS: -lhyperleveldb
#include <hyperleveldb/c.h>
#include "hyperleveldb_ext.h"
*/
import "C"
import (
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/store/driver"
"os"
"runtime"
"unsafe"
)
const defaultFilterBits int = 10
type Store struct {
}
func (s Store) String() string {
return "hyperleveldb"
}
func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) {
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, err
}
db := new(DB)
db.path = path
db.cfg = &cfg.LevelDB
if err := db.open(); err != nil {
return nil, err
}
return db, nil
}
func (s Store) Repair(path string, cfg *config.Config) error {
db := new(DB)
db.cfg = &cfg.LevelDB
db.path = path
err := db.open()
defer db.Close()
//open ok, do not need repair
if err == nil {
return nil
}
var errStr *C.char
ldbname := C.CString(path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
C.leveldb_repair_db(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
type DB struct {
path string
cfg *config.LevelDBConfig
db *C.leveldb_t
opts *Options
//for default read and write options
readOpts *ReadOptions
writeOpts *WriteOptions
iteratorOpts *ReadOptions
cache *Cache
filter *FilterPolicy
}
func (db *DB) open() error {
db.initOptions(db.cfg)
var errStr *C.char
ldbname := C.CString(db.path)
defer C.leveldb_free(unsafe.Pointer(ldbname))
db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr)
if errStr != nil {
db.db = nil
return saveError(errStr)
}
return nil
}
func (db *DB) initOptions(cfg *config.LevelDBConfig) {
opts := NewOptions()
opts.SetCreateIfMissing(true)
cfg.Adjust()
db.cache = NewLRUCache(cfg.CacheSize)
opts.SetCache(db.cache)
//we must use bloomfilter
db.filter = NewBloomFilter(defaultFilterBits)
opts.SetFilterPolicy(db.filter)
if !cfg.Compression {
opts.SetCompression(NoCompression)
} else {
opts.SetCompression(SnappyCompression)
}
opts.SetBlockSize(cfg.BlockSize)
opts.SetWriteBufferSize(cfg.WriteBufferSize)
opts.SetMaxOpenFiles(cfg.MaxOpenFiles)
db.opts = opts
db.readOpts = NewReadOptions()
db.writeOpts = NewWriteOptions()
db.iteratorOpts = NewReadOptions()
db.iteratorOpts.SetFillCache(false)
}
func (db *DB) Close() error {
if db.db != nil {
C.leveldb_close(db.db)
db.db = nil
}
db.opts.Close()
if db.cache != nil {
db.cache.Close()
}
if db.filter != nil {
db.filter.Close()
}
db.readOpts.Close()
db.writeOpts.Close()
db.iteratorOpts.Close()
return nil
}
func (db *DB) Put(key, value []byte) error {
return db.put(db.writeOpts, key, value)
}
func (db *DB) Get(key []byte) ([]byte, error) {
return db.get(db.readOpts, key)
}
func (db *DB) Delete(key []byte) error {
return db.delete(db.writeOpts, key)
}
func (db *DB) NewWriteBatch() driver.IWriteBatch {
wb := &WriteBatch{
db: db,
wbatch: C.leveldb_writebatch_create(),
}
runtime.SetFinalizer(wb, func(w *WriteBatch) {
w.Close()
})
return wb
}
func (db *DB) NewIterator() driver.IIterator {
it := new(Iterator)
it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt)
return it
}
func (db *DB) put(wo *WriteOptions, key, value []byte) error {
var errStr *C.char
var k, v *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
if len(value) != 0 {
v = (*C.char)(unsafe.Pointer(&value[0]))
}
lenk := len(key)
lenv := len(value)
C.leveldb_put(
db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) {
var errStr *C.char
var vallen C.size_t
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
var value *C.char
c := C.hyperleveldb_get_ext(
db.db, ro.Opt, k, C.size_t(len(key)), &value, &vallen, &errStr)
if errStr != nil {
return nil, saveError(errStr)
}
if value == nil {
return nil, nil
}
defer C.hyperleveldb_get_free_ext(unsafe.Pointer(c))
return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil
}
func (db *DB) delete(wo *WriteOptions, key []byte) error {
var errStr *C.char
var k *C.char
if len(key) != 0 {
k = (*C.char)(unsafe.Pointer(&key[0]))
}
C.leveldb_delete(
db.db, wo.Opt, k, C.size_t(len(key)), &errStr)
if errStr != nil {
return saveError(errStr)
}
return nil
}
func (db *DB) Begin() (driver.Tx, error) {
return nil, driver.ErrTxSupport
}

View File

@ -0,0 +1,21 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include <stdlib.h>
// #include "hyperleveldb/c.h"
import "C"
type FilterPolicy struct {
Policy *C.leveldb_filterpolicy_t
}
func NewBloomFilter(bitsPerKey int) *FilterPolicy {
policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey))
return &FilterPolicy{policy}
}
func (fp *FilterPolicy) Close() {
C.leveldb_filterpolicy_destroy(fp.Policy)
}

View File

@ -0,0 +1,88 @@
// +build hyperleveldb
#include "hyperleveldb_ext.h"
#include <stdlib.h>
#include <string>
#include "hyperleveldb/db.h"
using namespace leveldb;
extern "C" {
static bool SaveError(char** errptr, const Status& s) {
assert(errptr != NULL);
if (s.ok()) {
return false;
} else if (*errptr == NULL) {
*errptr = strdup(s.ToString().c_str());
} else {
free(*errptr);
*errptr = strdup(s.ToString().c_str());
}
return true;
}
void* hyperleveldb_get_ext(
leveldb_t* db,
const leveldb_readoptions_t* options,
const char* key, size_t keylen,
char** valptr,
size_t* vallen,
char** errptr) {
std::string *tmp = new(std::string);
//very tricky, maybe changed with c++ leveldb upgrade
Status s = (*(DB**)db)->Get(*(ReadOptions*)options, Slice(key, keylen), tmp);
if (s.ok()) {
*valptr = (char*)tmp->data();
*vallen = tmp->size();
} else {
delete(tmp);
tmp = NULL;
*valptr = NULL;
*vallen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return tmp;
}
void hyperleveldb_get_free_ext(void* context) {
std::string* s = (std::string*)context;
delete(s);
}
unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t* iter) {
leveldb_iter_seek_to_first(iter);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t* iter) {
leveldb_iter_seek_to_last(iter);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t* iter, const char* k, size_t klen) {
leveldb_iter_seek(iter, k, klen);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t* iter) {
leveldb_iter_next(iter);
return leveldb_iter_valid(iter);
}
unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t* iter) {
leveldb_iter_prev(iter);
return leveldb_iter_valid(iter);
}
}

View File

@ -0,0 +1,40 @@
// +build hyperleveldb
#ifndef HYPERLEVELDB_EXT_H
#define HYPERLEVELDB_EXT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "hyperleveldb/c.h"
/* Returns NULL if not found. Otherwise stores the value in **valptr.
Stores the length of the value in *vallen.
Returns a context must be later to free*/
extern void* hyperleveldb_get_ext(
leveldb_t* db,
const leveldb_readoptions_t* options,
const char* key, size_t keylen,
char** valptr,
size_t* vallen,
char** errptr);
// Free context returns by hyperleveldb_get_ext
extern void hyperleveldb_get_free_ext(void* context);
// Below iterator functions like leveldb iterator but returns valid status for iterator
extern unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t*);
extern unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t*);
extern unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, size_t klen);
extern unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t*);
extern unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t*);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,70 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include <stdlib.h>
// #include "hyperleveldb/c.h"
// #include "hyperleveldb_ext.h"
import "C"
import (
"unsafe"
)
type Iterator struct {
it *C.leveldb_iterator_t
isValid C.uchar
}
func (it *Iterator) Key() []byte {
var klen C.size_t
kdata := C.leveldb_iter_key(it.it, &klen)
if kdata == nil {
return nil
}
return slice(unsafe.Pointer(kdata), int(C.int(klen)))
}
func (it *Iterator) Value() []byte {
var vlen C.size_t
vdata := C.leveldb_iter_value(it.it, &vlen)
if vdata == nil {
return nil
}
return slice(unsafe.Pointer(vdata), int(C.int(vlen)))
}
func (it *Iterator) Close() error {
if it.it != nil {
C.leveldb_iter_destroy(it.it)
it.it = nil
}
return nil
}
func (it *Iterator) Valid() bool {
return ucharToBool(it.isValid)
}
func (it *Iterator) Next() {
it.isValid = C.hyperleveldb_iter_next_ext(it.it)
}
func (it *Iterator) Prev() {
it.isValid = C.hyperleveldb_iter_prev_ext(it.it)
}
func (it *Iterator) First() {
it.isValid = C.hyperleveldb_iter_seek_to_first_ext(it.it)
}
func (it *Iterator) Last() {
it.isValid = C.hyperleveldb_iter_seek_to_last_ext(it.it)
}
func (it *Iterator) Seek(key []byte) {
it.isValid = C.hyperleveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key)))
}

View File

@ -0,0 +1,114 @@
// +build hyperleveldb
package hyperleveldb
// #cgo LDFLAGS: -lhyperleveldb
// #include "hyperleveldb/c.h"
import "C"
type CompressionOpt int
const (
NoCompression = CompressionOpt(0)
SnappyCompression = CompressionOpt(1)
)
type Options struct {
Opt *C.leveldb_options_t
}
type ReadOptions struct {
Opt *C.leveldb_readoptions_t
}
type WriteOptions struct {
Opt *C.leveldb_writeoptions_t
}
func NewOptions() *Options {
opt := C.leveldb_options_create()
return &Options{opt}
}
func NewReadOptions() *ReadOptions {
opt := C.leveldb_readoptions_create()
return &ReadOptions{opt}
}
func NewWriteOptions() *WriteOptions {
opt := C.leveldb_writeoptions_create()
return &WriteOptions{opt}
}
func (o *Options) Close() {
C.leveldb_options_destroy(o.Opt)
}
func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) {
C.leveldb_options_set_comparator(o.Opt, cmp)
}
func (o *Options) SetErrorIfExists(error_if_exists bool) {
eie := boolToUchar(error_if_exists)
C.leveldb_options_set_error_if_exists(o.Opt, eie)
}
func (o *Options) SetCache(cache *Cache) {
C.leveldb_options_set_cache(o.Opt, cache.Cache)
}
func (o *Options) SetWriteBufferSize(s int) {
C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s))
}
func (o *Options) SetParanoidChecks(pc bool) {
C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc))
}
func (o *Options) SetMaxOpenFiles(n int) {
C.leveldb_options_set_max_open_files(o.Opt, C.int(n))
}
func (o *Options) SetBlockSize(s int) {
C.leveldb_options_set_block_size(o.Opt, C.size_t(s))
}
func (o *Options) SetBlockRestartInterval(n int) {
C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n))
}
func (o *Options) SetCompression(t CompressionOpt) {
C.leveldb_options_set_compression(o.Opt, C.int(t))
}
func (o *Options) SetCreateIfMissing(b bool) {
C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b))
}
func (o *Options) SetFilterPolicy(fp *FilterPolicy) {
var policy *C.leveldb_filterpolicy_t
if fp != nil {
policy = fp.Policy
}
C.leveldb_options_set_filter_policy(o.Opt, policy)
}
func (ro *ReadOptions) Close() {
C.leveldb_readoptions_destroy(ro.Opt)
}
func (ro *ReadOptions) SetVerifyChecksums(b bool) {
C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b))
}
func (ro *ReadOptions) SetFillCache(b bool) {
C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b))
}
func (wo *WriteOptions) Close() {
C.leveldb_writeoptions_destroy(wo.Opt)
}
func (wo *WriteOptions) SetSync(b bool) {
C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b))
}

View File

@ -0,0 +1,44 @@
// +build hyperleveldb
package hyperleveldb
// #include "hyperleveldb/c.h"
import "C"
import (
"fmt"
"reflect"
"unsafe"
)
func boolToUchar(b bool) C.uchar {
uc := C.uchar(0)
if b {
uc = C.uchar(1)
}
return uc
}
func ucharToBool(uc C.uchar) bool {
if uc == C.uchar(0) {
return false
}
return true
}
func saveError(errStr *C.char) error {
if errStr != nil {
gs := C.GoString(errStr)
C.leveldb_free(unsafe.Pointer(errStr))
return fmt.Errorf(gs)
}
return nil
}
func slice(p unsafe.Pointer, n int) []byte {
var b []byte
pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b))
pbyte.Data = uintptr(p)
pbyte.Len = n
pbyte.Cap = n
return b
}

View File

@ -0,0 +1,33 @@
// +build hyperleveldb
package store
import (
"github.com/siddontang/ledisdb/config"
"os"
"testing"
)
func newTestHyperLevelDB() *DB {
cfg := new(config.Config)
cfg.DBName = "hyperleveldb"
cfg.DataDir = "/tmp/testdb"
os.RemoveAll(getStorePath(cfg))
db, err := Open(cfg)
if err != nil {
println(err.Error())
panic(err)
}
return db
}
func TestHyperLevelDB(t *testing.T) {
db := newTestHyperLevelDB()
testStore(db, t)
db.Close()
}

View File

@ -1,3 +1,4 @@
// +build !windows
package store package store
import ( import (

View File

@ -23,6 +23,8 @@ type MDB struct {
func (s Store) Open(path string, c *config.Config) (driver.IDB, error) { func (s Store) Open(path string, c *config.Config) (driver.IDB, error) {
mapSize := c.LMDB.MapSize mapSize := c.LMDB.MapSize
noSync := c.LMDB.NoSync
if mapSize <= 0 { if mapSize <= 0 {
mapSize = 500 * 1024 * 1024 mapSize = 500 * 1024 * 1024
} }
@ -48,7 +50,12 @@ func (s Store) Open(path string, c *config.Config) (driver.IDB, error) {
} }
} }
err = env.Open(path, mdb.NOSYNC|mdb.NOMETASYNC|mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755) var flags uint = mdb.CREATE
if noSync {
flags |= mdb.NOSYNC | mdb.NOMETASYNC | mdb.WRITEMAP | mdb.MAPASYNC
}
err = env.Open(path, flags, 0755)
if err != nil { if err != nil {
return MDB{}, err return MDB{}, err
} }

View File

@ -1,3 +1,5 @@
// +build !windows
package store package store
import ( import (

View File

@ -0,0 +1,18 @@
## Notice
1. The tool doesn't support `set` data type.
2. The tool doesn't support `bitmap` data type.
2. Our `zset` use integer instead of double, so the zset float score in Redis
will be **converted to integer**.
3. Only Support Redis version greater than `2.8.0`, because we use `scan` command to scan data.
Also, you need `redis-py` greater than `2.9.0`.
## Usage
$ python redis_import.py redis_host redis_port redis_db ledis_host ledis_port
We will use the same db index as redis. That's to say, data in redis[0] will be transfer to ledisdb[0]. But if redis db `index >= 16`, we will refuse to transfer, because ledisdb only support db `index < 16`.

View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
# coding: utf-8
# refer: https://github.com/ideawu/ssdb/blob/master/tools/redis-import.php
# Notice: for zset, float score will be converted to integer.
import sys
import os
from collections import OrderedDict as od
import redis
import ledis
total = 0
entries = 0
def scan_available(redis_client):
""""Scan Command is available since redis-server 2.8.0"""
if "scan" in dir(redis_client):
info = redis_client.info()
server_version = info["redis_version"]
version_list = server_version.split(".")
if len(version_list) > 2:
n = int(version_list[0]) * 10 + int(version_list[1])
if n >= 28:
return True
return False
def set_ttl(redis_client, ledis_client, key, k_type):
k_types = {
"string": ledis_client.expire,
"list": ledis_client.lexpire,
"hash": ledis_client.hexpire,
"set": ledis_client.zexpire,
"zset": ledis_client.zexpire
}
timeout = redis_client.ttl(key)
if timeout > 0:
k_types[k_type](key, timeout)
def copy_key(redis_client, ledis_client, key, convert=False):
global entries
k_type = redis_client.type(key)
if k_type == "string":
value = redis_client.get(key)
ledis_client.set(key, value)
set_ttl(redis_client, ledis_client, key, k_type)
entries += 1
elif k_type == "list":
_list = redis_client.lrange(key, 0, -1)
for value in _list:
ledis_client.rpush(key, value)
set_ttl(redis_client, ledis_client, key, k_type)
entries += 1
elif k_type == "hash":
mapping = od(redis_client.hgetall(key))
ledis_client.hmset(key, mapping)
set_ttl(redis_client, ledis_client, key, k_type)
entries += 1
elif k_type == "zset":
out = redis_client.zrange(key, 0, -1, withscores=True)
pieces = od()
for i in od(out).iteritems():
pieces[i[0]] = int(i[1])
ledis_client.zadd(key, **pieces)
set_ttl(redis_client, ledis_client, key, k_type)
entries += 1
else:
print "KEY %s of TYPE %s is not supported by LedisDB." % (key, k_type)
def copy_keys(redis_client, ledis_client, keys, convert=False):
for key in keys:
copy_key(redis_client, ledis_client, key, convert=convert)
def scan(redis_client, count=1000):
keys = []
total = redis_client.dbsize()
if total > 1000:
print "It may take a while, be patient please."
first = True
cursor = 0
while cursor != 0 or first:
cursor, data = redis_client.scan(cursor, count=count)
keys.extend(data)
first = False
assert len(keys) == total
return keys, total
def copy(redis_client, ledis_client, count=1000, convert=False):
if scan_available(redis_client):
print "\nTransfer begin ...\n"
keys, total = scan(redis_client, count=count)
copy_keys(redis_client, ledis_client, keys, convert=convert)
else:
msg = """We do not support Redis version less than 2.8.0.
Please check both your redis server version and redis-py
version.
"""
print msg
sys.exit()
print "%d keys, %d entries copied" % (total, entries)
def usage():
usage = """
Usage:
python %s redis_host redis_port redis_db ledis_host ledis_port
"""
print usage % os.path.basename(sys.argv[0])
def get_prompt(choice):
yes = set(['yes', 'ye', 'y', ''])
no = set(['no', 'n'])
if choice in yes:
return True
elif choice in no:
return False
else:
sys.stdout.write("Please respond with 'yes' or 'no'")
def main():
if len(sys.argv) < 6:
usage()
sys.exit()
convert = False
if len(sys.argv) >= 6:
(redis_host, redis_port, redis_db, ledis_host, ledis_port) = sys.argv[1:6]
if int(redis_db) >= 16:
print redis_db
sys.exit("LedisDB only support 16 databases([0-15]")
choice = raw_input("[y/N]").lower()
if not get_prompt(choice):
sys.exit("No proceed")
redis_c = redis.Redis(host=redis_host, port=int(redis_port), db=int(redis_db))
ledis_c = ledis.Ledis(host=ledis_host, port=int(ledis_port), db=int(redis_db))
try:
redis_c.ping()
except redis.ConnectionError:
print "Could not connect to Redis Server"
sys.exit()
try:
ledis_c.ping()
except redis.ConnectionError:
print "Could not connect to LedisDB Server"
sys.exit()
copy(redis_c, ledis_c, convert=convert)
print "Done\n"
if __name__ == "__main__":
main()

120
tools/redis_import/test.py Normal file
View File

@ -0,0 +1,120 @@
#coding: utf-8
import random, string
import redis
import ledis
from redis_import import copy, scan, set_ttl
rds = redis.Redis()
lds = ledis.Ledis(port=6380)
def random_word(words, length):
return ''.join(random.choice(words) for i in range(length))
def get_words():
word_file = "/usr/share/dict/words"
words = open(word_file).read().splitlines()
return words[:1000]
def get_mapping(words, length=1000):
d = {}
for word in words:
d[word] = random.randint(1, length)
return d
def random_string(client, words, length=1000):
d = get_mapping(words, length)
client.mset(d)
def random_hash(client, words, length=1000):
d = get_mapping(words, length)
client.hmset("hashName", d)
def random_list(client, words, length=1000):
client.lpush("listName", *words)
def random_zset(client, words, length=1000):
d = get_mapping(words, length)
client.zadd("zsetName", **d)
def test():
words = get_words()
print "Flush all redis data before insert new."
rds.flushall()
random_string(rds, words)
print "random_string done"
random_hash(rds, words)
print "random_hash done"
random_list(rds, words)
print "random_list done"
random_zset(rds, words)
print "random_zset done"
lds.lclear("listName")
lds.hclear("hashName")
lds.zclear("zsetName")
copy(rds, lds, convert=True)
# for all keys
keys = scan(rds, 1000)
for key in keys:
if rds.type(key) == "string" and not lds.exists(key):
print key
print "String data not consistent"
# for list
l1 = rds.lrange("listName", 0, -1)
l2 = lds.lrange("listName", 0, -1)
assert l1 == l2
#for hash
for key in keys:
if rds.type(key) == "hash":
assert rds.hgetall(key) == lds.hgetall(key)
assert sorted(rds.hkeys(key)) == sorted(lds.hkeys(key))
assert sorted(rds.hvals(key)) == sorted(lds.hvals(key))
# for zset
z1 = rds.zrange("zsetName", 0, -1, withscores=True)
z2 = lds.zrange("zsetName", 0, -1, withscores=True)
assert z1 == z2
def ledis_ttl(ledis_client, key, k_type):
ttls = {
"string": lds.ttl,
"list": lds.lttl,
"hash": lds.httl,
"zset": lds.zttl,
}
return ttls[k_type](key)
def test_ttl():
keys, total = scan(rds, 1000)
for key in keys:
k_type = rds.type(key)
rds.expire(key, (60 * 60 * 24))
set_ttl(rds, lds, key, k_type)
if rds.ttl(key):
assert ledis_ttl(lds, key, k_type) > 0
if __name__ == "__main__":
test()
test_ttl()
print "Test passed."