diff --git a/README.md b/README.md index a55fb94..724ad21 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,16 @@ Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash, zset, bitmap, and may be alternative for Redis. -LedisDB now supports multi databases as backend to store data, you can test and choose the proper one for you. +LedisDB now supports multiple databases as backend to store data, you can test and choose the proper one for you. ## Features + Rich advanced data structure: KV, List, Hash, ZSet, Bitmap. + Stores lots of data, over the memory limit. -+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB. ++ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, HyperLevelDB. + Supports expiration and ttl. + Redis clients, like redis-cli, are supported directly. -+ Multi client API supports, including Go, Python, Lua(Openresty). ++ Multiple client API supports, including Go, Python, Lua(Openresty), C/C++, Node.js. + Easy to embed in your own Go application. + Restful API support, json/bson/msgpack output. + Replication to guarantee data safe. @@ -51,16 +51,29 @@ Create a workspace and checkout ledisdb source ## RocksDB support -+ Install rocksdb(shared_lib) and snappy first. ++ [Install rocksdb](https://github.com/facebook/rocksdb/blob/master/INSTALL.md)(`make shared_lib`) and snappy first. LedisDB has not supplied a simple script to install, maybe later. -+ Set ```ROCKSDB_DIR``` and ```SNAPPY_DIR``` to the actual install path in dev.sh. ++ Set ```ROCKSDB_DIR``` and ```SNAPPY_DIR``` to the actual install path in `dev.sh`. + ```make``` + + + +## HyperLevelDB support + ++ [Install hyperleveldb](https://github.com/rescrv/HyperLevelDB/blob/master/README) and snappy first. + + LedisDB has not supplied a simple script to install, maybe later. + ++ Set `HYPERLEVELDB` and `SNAPPY_DIR` to the actual install path in `dev.sh`. ++ `make` + + ## Choose store database -LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, it will choose goleveldb as default to store data if you not set. +LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, hyperleveldb. it will choose goleveldb as default to store data if you not set. Choosing a store database to use is very simple, you have two ways: @@ -139,7 +152,6 @@ See [Issues todo](https://github.com/siddontang/ledisdb/issues?labels=todo&page= ## Links + [Official Website](http://ledisdb.com) -+ [Author's Chinese Blog](http://blog.csdn.net/siddontang/article/category/2264003) + [GoDoc](https://godoc.org/github.com/siddontang/ledisdb) + [Server Commands](https://github.com/siddontang/ledisdb/wiki/Commands) diff --git a/client/nodejs/example.js b/client/nodejs/example.js index f28c50a..101a106 100644 --- a/client/nodejs/example.js +++ b/client/nodejs/example.js @@ -35,4 +35,17 @@ client.bget("bit key 3", function(err, result){ } }); -client.quit(); +//test zunionstore & zinterstore +client.zadd("zset1", 1, "one") +client.zadd("zset1", 2, "two") + +client.zadd("zset2", 1, "one") +client.zadd("zset2", 2, "two") +client.zadd("zset2", 3, "three") + +client.zunionstore("out", 2, "zset1", "zset2", "weights", 2, 3, ledis.print) +client.zrange("out", 0, -1, "withscores", ledis.print) + +client.zinterstore("out", 2, "zset1", "zset2", "weights", 2, 3, ledis.print) +client.zrange("out", 0, -1, "withscores", ledis.print) +client.quit() diff --git a/client/nodejs/ledis/lib/commands.js b/client/nodejs/ledis/lib/commands.js index 8e5a524..e814d23 100644 --- a/client/nodejs/ledis/lib/commands.js +++ b/client/nodejs/ledis/lib/commands.js @@ -83,6 +83,8 @@ module.exports = [ "zrevrank", "zrevrangebyscore", "zscore", + "zunionstore", + "zinterstore", "zclear", diff --git a/config/config.go b/config/config.go index 85ec414..0939afb 100644 --- a/config/config.go +++ b/config/config.go @@ -34,7 +34,8 @@ type LevelDBConfig struct { } type LMDBConfig struct { - MapSize int `toml:"map_size" json:"map_size"` + MapSize int `toml:"map_size" json:"map_size"` + NoSync bool `toml:"nosync" json:"nosync"` } type BinLogConfig struct { diff --git a/config/config.json b/config/config.json index 82bc668..2710e0a 100644 --- a/config/config.json +++ b/config/config.json @@ -14,7 +14,8 @@ }, "lmdb" : { - "map_size" : 524288000 + "map_size" : 524288000, + "nosync" : true }, "access_log" : "" diff --git a/config/config.toml b/config/config.toml index 6ef9a6c..573db9a 100644 --- a/config/config.toml +++ b/config/config.toml @@ -34,6 +34,7 @@ max_open_files = 1024 [lmdb] map_size = 524288000 +nosync = true [binlog] max_file_size = 0 diff --git a/config/config_test.go b/config/config_test.go index 14620ec..943c513 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -18,6 +18,7 @@ func TestConfig(t *testing.T) { dstCfg.LevelDB.CacheSize = 524288000 dstCfg.LevelDB.MaxOpenFiles = 1024 dstCfg.LMDB.MapSize = 524288000 + dstCfg.LMDB.NoSync = true cfg, err := NewConfigWithFile("./config.toml") if err != nil { diff --git a/dev.sh b/dev.sh index 6d753f2..798ffab 100644 --- a/dev.sh +++ b/dev.sh @@ -13,6 +13,7 @@ fi SNAPPY_DIR=/usr/local/snappy LEVELDB_DIR=/usr/local/leveldb ROCKSDB_DIR=/usr/local/rocksdb +HYPERLEVELDB_DIR=/usr/local/hyperleveldb function add_path() { @@ -63,6 +64,16 @@ if [ -f $ROCKSDB_DIR/include/rocksdb/c.h ]; then GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb" fi +#check hyperleveldb +if [ -f $HYPERLEVELDB_DIR/include/hyperleveldb/c.h ]; then + CGO_CFLAGS="$CGO_CFLAGS -I$HYPERLEVELDB_DIR/include" + CGO_CXXFLAGS="$CGO_CXXFLAGS -I$HYPERLEVELDB_DIR/include" + CGO_LDFLAGS="$CGO_LDFLAGS -L$HYPERLEVELDB_DIR/lib -lhyperleveldb" + LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib) + DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib) + GO_BUILD_TAGS="$GO_BUILD_TAGS hyperleveldb" +fi + export CGO_CFLAGS export CGO_CXXFLAGS export CGO_LDFLAGS diff --git a/doc/commands.json b/doc/commands.json index 521f86e..c51f304 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -17,7 +17,7 @@ "BEXPIREAT": { "arguments": "key timestamp", "group": "Bitmap", - "readonly": false, + "readonly": false }, "BGET": { "arguments": "key", @@ -414,5 +414,16 @@ "arguments": "key", "group": "ZSet", "readonly": true + }, + "ZUNIONSTORE":{ + "arguments": "destkey numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]", + "group": "ZSet", + "readonly": false + }, + + "ZINTERSTORE":{ + "arguments": "destkey numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]", + "group": "ZSet", + "readonly": false } -} \ No newline at end of file +} diff --git a/doc/commands.md b/doc/commands.md index 2157cbd..b4855e7 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -79,6 +79,11 @@ Table of Contents - [ZEXPIREAT key timestamp](#zexpireat-key-timestamp) - [ZTTL key](#zttl-key) - [ZPERSIST key](#zpersist-key) + - [ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] +](#zunionstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax) + - [ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] +](#zinterstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax) + - [Bitmap](#bitmap) - [BGET key](#bget-key) @@ -1629,6 +1634,83 @@ ledis> ZTTL mset (integer) -1 ``` +### ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] + +Computes the union of numkeys sorted sets given by the specified keys, and stores the result in destination. It is mandatory to provide the number of input keys (numkeys) before passing the input keys and the other (optional) arguments. + +By default, the resulting score of an element is the sum of its scores in the sorted sets where it exists. +Using the WEIGHTS option, it is possible to specify a multiplication factor for each input sorted set. This means that the score of every element in every input sorted set is multiplied by this factor before being passed to the aggregation function. When WEIGHTS is not given, the multiplication factors default to 1. + +With the AGGREGATE option, it is possible to specify how the results of the union are aggregated. This option defaults to SUM, where the score of an element is summed across the inputs where it exists. When this option is set to either MIN or MAX, the resulting set will contain the minimum or maximum score of an element across the inputs where it exists. + +If destination already exists, it is overwritten. + + +**Return value** + +int64: the number of elements in the resulting sorted set at destination. + +**Examples** + +``` +ledis> ZADD zset1 1 "one" +(interger) 1 +ledis> ZADD zset1 2 "two" +(interger) 1 +ledis> ZADD zset2 1 "one" +(interger) 1 +ledis> ZADD zset2 2 "two" +(interger) 1 +ledis> ZADD zset2 3 "three" +(interger) 1 +ledis> ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3 +(interger) 3 +ledis> ZRANGE out 0 -1 WITHSCORES +1) "one" +2) "5" +3) "three" +4) "9" +5) "two" +6) "10" +``` + +### ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] + +Computes the intersection of numkeys sorted sets given by the specified keys, and stores the result in destination. It is mandatory to provide the number of input keys (numkeys) before passing the input keys and the other (optional) arguments. + +By default, the resulting score of an element is the sum of its scores in the sorted sets where it exists. Because intersection requires an element to be a member of every given sorted set, this results in the score of every element in the resulting sorted set to be equal to the number of input sorted sets. + +For a description of the `WEIGHTS` and `AGGREGATE` options, see [ZUNIONSTORE](#zunionstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax). + +If destination already exists, it is overwritten. + + + +**Return value** + +int64: the number of elements in the resulting sorted set at destination. + +**Examples** + +``` +ledis> ZADD zset1 1 "one" +(interger) 1 +ledis> ZADD zset1 2 "two" +(interger) 1 +ledis> ZADD zset2 1 "one" +(interger) 1 +ledis> ZADD zset2 2 "two" +(interger) 1 +ledis> ZADD zset2 3 "three" +(interger) 1 +ledis> ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 +(interger) 3 +ledis> ZRANGE out 0 -1 WITHSCORES +1) "one" +2) "5" +3) "two" +4) "10" +``` ## Bitmap diff --git a/etc/ledis.conf b/etc/ledis.conf index 8d92919..2097f65 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -36,6 +36,7 @@ max_open_files = 1024 [lmdb] map_size = 524288000 +nosync = true [binlog] # Set either size or num to 0 to disable binlog diff --git a/generate.py b/generate.py index beba9e5..1295e22 100644 --- a/generate.py +++ b/generate.py @@ -2,32 +2,18 @@ import json import time +import sys +import os from collections import OrderedDict as dict - -def go_array_to_json(path): - """Convert `./cmd/ledis-cli/const.go` to commands.json""" - fp = open(path).read() - commands_str = fp.split("string")[1] - _commands_str = commands_str.splitlines()[1:len(commands_str.splitlines())-1] - commands_d = dict() - values_d = dict() - for i in _commands_str: - t = i.split('"') - values_d.update( - { - "arguments": "%s" % t[3], - "group": "%s" % t[5] - }) - values_d = dict(sorted(values_d.items())) - d = { - "%s" % t[1]: values_d - } - commands_d.update(d) - - fp = open("commands.json", "w") - json.dump(commands_d, fp, indent=4) - fp.close() +content = u"""\n +type cmdConf struct { + name string + argDesc string + group string + readonly bool +} +""" def json_to_js(json_path, js_path): @@ -39,7 +25,7 @@ def json_to_js(json_path, js_path): keys.append(k.encode('utf-8')) with open(js_path, "w") as fp: generate_time(fp) - fp.write("module.exports = [\n" ) + fp.write("module.exports = [\n") for k in sorted(keys): fp.write('\t"%s",\n' % k.lower()) fp.write("]") @@ -53,22 +39,64 @@ def json_to_go_array(json_path, go_path): g_fp.write("package main\n\nvar helpCommands = [][]string{\n") _json_sorted = dict(sorted(_json.items(), key=lambda x: x[0])) for k, v in _json_sorted.iteritems(): - print k, v g_fp.write('\t{"%s", "%s", "%s"},\n' % (k, v["arguments"], v["group"])) g_fp.write("}\n") g_fp.close() +def json_to_command_cnf(json_path, go_path): + g_fp = open(go_path, "w") + + with open(json_path) as fp: + _json = json.load(fp) + generate_time(g_fp) + g_fp.write("package server") + print >> g_fp, content + g_fp.write("var cnfCmds = []cmdConf{\n") + for k, v in _json.iteritems(): + g_fp.write('\t{\n\t\t"%s",\n\t\t"%s",\n\t\t"%s", \n\t\t%s,\n\t},\n' % + (k, v["arguments"], v["group"], "true" if v["readonly"] else "false" )) + g_fp.write("}\n") + g_fp.close() + + def generate_time(fp): - fp.write("//This file was generated by ./generate.py on %s \n" % \ - time.strftime('%a %b %d %Y %H:%M:%S %z')) + fp.write("//This file was generated by ./generate.py on %s \n" % + time.strftime('%a %b %d %Y %H:%M:%S %z')) + if __name__ == "__main__": - path = "./cmd/ledis-cli/const.go" - # go_array_to_json(path) - json_path = "./commands.json" - js_path = "./commands.js" - json_to_js(json_path, js_path) - go_path = "const.go" + usage = """ + Usage: python %s src_path dst_path" - json_to_go_array(json_path, path) + 1. for Node.js client: + + python generate.py /path/to/commands.json /path/to/commands.js + + 2. for cmd/ledis_cli/const.go + + python generate.py /path/to/commands.json /path/to/const.go + + 3. for server/command_cnf.go + + python generate.py /path/to/commands.json /path/to/command_cnf.go + + """ + + if len(sys.argv) != 3: + sys.exit(usage % os.path.basename(sys.argv[0])) + + src_path, dst_path = sys.argv[1:] + dst_path_base = os.path.basename(dst_path) + + if dst_path_base.endswith(".js"): + json_to_js(src_path, dst_path) + + elif dst_path_base.startswith("const.go"): + json_to_go_array(src_path, dst_path) + + elif dst_path_base.startswith("command"): + json_to_command_cnf(src_path, dst_path) + + else: + print "Not support arguments" diff --git a/ledis/t_bit.go b/ledis/t_bit.go index bc208a0..f7edddd 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -231,15 +231,20 @@ func (db *DB) bSetMeta(t *tx, key []byte, tailSeq uint32, tailOff uint32) { } func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { - var ts, to int32 - if ts, to, err = db.bGetMeta(key); err != nil { + var tseq, toff int32 + var update bool = false + + if tseq, toff, err = db.bGetMeta(key); err != nil { return + } else if tseq < 0 { + update = true } else { - tailSeq = uint32(MaxInt32(ts, 0)) - tailOff = uint32(MaxInt32(to, 0)) + tailSeq = uint32(MaxInt32(tseq, 0)) + tailOff = uint32(MaxInt32(toff, 0)) + update = (seq > tailSeq || (seq == tailSeq && off > tailOff)) } - if seq > tailSeq || (seq == tailSeq && off > tailOff) { + if update { db.bSetMeta(t, key, seq, off) tailSeq = seq tailOff = off diff --git a/ledis/t_bit_test.go b/ledis/t_bit_test.go index a356539..9103523 100644 --- a/ledis/t_bit_test.go +++ b/ledis/t_bit_test.go @@ -41,6 +41,7 @@ func TestBinary(t *testing.T) { testOpXor(t) testOpNot(t) testMSetBit(t) + testBitExpire(t) } func testSimple(t *testing.T) { @@ -518,3 +519,20 @@ func testMSetBit(t *testing.T) { return } + +func testBitExpire(t *testing.T) { + db := getTestDB() + db.FlushAll() + + key := []byte("test_b_ttl") + + db.BSetBit(key, 0, 1) + + if res, err := db.BExpire(key, 100); res != 1 || err != nil { + t.Fatal(false) + } + + if ttl, err := db.BTTL(key); ttl != 100 || err != nil { + t.Fatal(false) + } +} diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 151f8eb..972c872 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -12,6 +12,10 @@ const ( MinScore int64 = -1<<63 + 1 MaxScore int64 = 1<<63 - 1 InvalidScore int64 = -1 << 63 + + AggregateSum byte = 0 + AggregateMin byte = 1 + AggregateMax byte = 2 ) type ScorePair struct { @@ -23,6 +27,9 @@ var errZSizeKey = errors.New("invalid zsize key") var errZSetKey = errors.New("invalid zset key") var errZScoreKey = errors.New("invalid zscore key") var errScoreOverflow = errors.New("zset score overflow") +var errInvalidAggregate = errors.New("invalid aggregate") +var errInvalidWeightNum = errors.New("invalid weight number") +var errInvalidSrcKeyNum = errors.New("invalid src key number") const ( zsetNScoreSep byte = '<' @@ -839,3 +846,166 @@ func (db *DB) ZPersist(key []byte) (int64, error) { err = t.Commit() return n, err } + +func getAggregateFunc(aggregate byte) func(int64, int64) int64 { + switch aggregate { + case AggregateSum: + return func(a int64, b int64) int64 { + return a + b + } + case AggregateMax: + return func(a int64, b int64) int64 { + if a > b { + return a + } + return b + } + case AggregateMin: + return func(a int64, b int64) int64 { + if a > b { + return b + } + return a + } + } + return nil +} + +func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { + + var destMap = map[string]int64{} + aggregateFunc := getAggregateFunc(aggregate) + if aggregateFunc == nil { + return 0, errInvalidAggregate + } + if len(srcKeys) < 1 { + return 0, errInvalidSrcKeyNum + } + if weights != nil { + if len(srcKeys) != len(weights) { + return 0, errInvalidWeightNum + } + } else { + weights = make([]int64, len(srcKeys)) + for i := 0; i < len(weights); i++ { + weights[i] = 1 + } + } + + for i, key := range srcKeys { + scorePairs, err := db.ZRange(key, 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + if score, ok := destMap[String(pair.Member)]; !ok { + destMap[String(pair.Member)] = pair.Score * weights[i] + } else { + destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) + } + } + } + + t := db.zsetTx + t.Lock() + defer t.Unlock() + + db.zDelete(t, destKey) + + var num int64 = 0 + for member, score := range destMap { + if err := checkZSetKMSize(destKey, []byte(member)); err != nil { + return 0, err + } + + if n, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil { + return 0, err + } else if n == 0 { + //add new + num++ + } + } + + if _, err := db.zIncrSize(t, destKey, num); err != nil { + return 0, err + } + + //todo add binlog + if err := t.Commit(); err != nil { + return 0, err + } + return int64(len(destMap)), nil +} + +func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { + + aggregateFunc := getAggregateFunc(aggregate) + if aggregateFunc == nil { + return 0, errInvalidAggregate + } + if len(srcKeys) < 1 { + return 0, errInvalidSrcKeyNum + } + if weights != nil { + if len(srcKeys) != len(weights) { + return 0, errInvalidWeightNum + } + } else { + weights = make([]int64, len(srcKeys)) + for i := 0; i < len(weights); i++ { + weights[i] = 1 + } + } + + var destMap = map[string]int64{} + scorePairs, err := db.ZRange(srcKeys[0], 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + destMap[String(pair.Member)] = pair.Score * weights[0] + } + + for i, key := range srcKeys[1:] { + scorePairs, err := db.ZRange(key, 0, -1) + if err != nil { + return 0, err + } + tmpMap := map[string]int64{} + for _, pair := range scorePairs { + if score, ok := destMap[String(pair.Member)]; ok { + tmpMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i+1]) + } + } + destMap = tmpMap + } + + t := db.zsetTx + t.Lock() + defer t.Unlock() + + db.zDelete(t, destKey) + + var num int64 = 0 + for member, score := range destMap { + if err := checkZSetKMSize(destKey, []byte(member)); err != nil { + return 0, err + } + + if n, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil { + return 0, err + } else if n == 0 { + //add new + num++ + } + } + + if _, err := db.zIncrSize(t, destKey, num); err != nil { + return 0, err + } + //todo add binlog + if err := t.Commit(); err != nil { + return 0, err + } + return int64(len(destMap)), nil +} diff --git a/ledis/t_zset_test.go b/ledis/t_zset_test.go index 74cf526..a2232d3 100644 --- a/ledis/t_zset_test.go +++ b/ledis/t_zset_test.go @@ -264,3 +264,122 @@ func TestZSetPersist(t *testing.T) { t.Fatal(n) } } + +func TestZUnionStore(t *testing.T) { + db := getTestDB() + key1 := []byte("key1") + key2 := []byte("key2") + + db.ZAdd(key1, ScorePair{1, []byte("one")}) + db.ZAdd(key1, ScorePair{1, []byte("two")}) + + db.ZAdd(key2, ScorePair{2, []byte("two")}) + db.ZAdd(key2, ScorePair{2, []byte("three")}) + + keys := [][]byte{key1, key2} + weights := []int64{1, 2} + + out := []byte("out") + n, err := db.ZUnionStore(out, keys, weights, AggregateSum) + if err != nil { + t.Fatal(err.Error()) + } + if n != 3 { + t.Fatal("invalid value ", n) + } + + v, err := db.ZScore(out, []byte("two")) + + if err != nil { + t.Fatal(err.Error()) + } + if v != 5 { + t.Fatal("invalid value ", v) + } + + out = []byte("out") + n, err = db.ZUnionStore(out, keys, weights, AggregateMax) + if err != nil { + t.Fatal(err.Error()) + } + if n != 3 { + t.Fatal("invalid value ", n) + } + + v, err = db.ZScore(out, []byte("two")) + + if err != nil { + t.Fatal(err.Error()) + } + if v != 4 { + t.Fatal("invalid value ", v) + } + + n, err = db.ZCount(out, 0, 0XFFFE) + + if err != nil { + t.Fatal(err.Error()) + } + if n != 3 { + t.Fatal("invalid value ", v) + } +} + +func TestZInterStore(t *testing.T) { + db := getTestDB() + + key1 := []byte("key1") + key2 := []byte("key2") + + db.ZAdd(key1, ScorePair{1, []byte("one")}) + db.ZAdd(key1, ScorePair{1, []byte("two")}) + + db.ZAdd(key2, ScorePair{2, []byte("two")}) + db.ZAdd(key2, ScorePair{2, []byte("three")}) + + keys := [][]byte{key1, key2} + weights := []int64{2, 3} + out := []byte("out") + + n, err := db.ZInterStore(out, keys, weights, AggregateSum) + if err != nil { + t.Fatal(err.Error()) + } + if n != 1 { + t.Fatal("invalid value ", n) + } + v, err := db.ZScore(out, []byte("two")) + if err != nil { + t.Fatal(err.Error()) + } + if v != 8 { + t.Fatal("invalid value ", v) + } + + out = []byte("out") + n, err = db.ZInterStore(out, keys, weights, AggregateMin) + if err != nil { + t.Fatal(err.Error()) + } + if n != 1 { + t.Fatal("invalid value ", n) + } + + v, err = db.ZScore(out, []byte("two")) + + if err != nil { + t.Fatal(err.Error()) + } + if v != 2 { + t.Fatal("invalid value ", v) + } + + n, err = db.ZCount(out, 0, 0XFFFF) + if err != nil { + t.Fatal(err.Error()) + } + if n != 1 { + t.Fatal("invalid value ", n) + } + +} diff --git a/server/client_http.go b/server/client_http.go index 6707e24..16f3b9a 100644 --- a/server/client_http.go +++ b/server/client_http.go @@ -19,7 +19,7 @@ var allowedContentTypes = map[string]struct{}{ "bson": struct{}{}, "msgpack": struct{}{}, } -var unsopportedCommands = map[string]struct{}{ +var unsupportedCommands = map[string]struct{}{ "slaveof": struct{}{}, "fullsync": struct{}{}, "sync": struct{}{}, @@ -87,7 +87,7 @@ func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWrite } req.cmd = strings.ToLower(cmd) - if _, ok := unsopportedCommands[req.cmd]; ok { + if _, ok := unsupportedCommands[req.cmd]; ok { return nil, fmt.Errorf("unsupported command: '%s'", cmd) } @@ -210,7 +210,7 @@ func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) } func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) { - w.writeError(fmt.Errorf("unsuport")) + w.writeError(fmt.Errorf("unsupport")) } func (w *httpWriter) flush() { diff --git a/server/cmd_zset.go b/server/cmd_zset.go index e540b32..f8117fc 100644 --- a/server/cmd_zset.go +++ b/server/cmd_zset.go @@ -520,6 +520,120 @@ func zpersistCommand(req *requestContext) error { return nil } +func zparseZsetoptStore(args [][]byte) (destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte, err error) { + destKey = args[0] + nKeys, err := strconv.Atoi(ledis.String(args[1])) + if err != nil { + err = ErrValue + return + } + args = args[2:] + if len(args) < nKeys { + err = ErrSyntax + return + } + + srcKeys = args[:nKeys] + + args = args[nKeys:] + + var weightsFlag = false + var aggregateFlag = false + + for len(args) > 0 { + if strings.ToLower(ledis.String(args[0])) == "weights" { + if weightsFlag { + err = ErrSyntax + return + } + + args = args[1:] + if len(args) < nKeys { + err = ErrSyntax + return + } + + weights = make([]int64, nKeys) + for i, arg := range args[:nKeys] { + if weights[i], err = ledis.StrInt64(arg, nil); err != nil { + err = ErrValue + return + } + } + args = args[nKeys:] + + weightsFlag = true + + } else if strings.ToLower(ledis.String(args[0])) == "aggregate" { + if aggregateFlag { + err = ErrSyntax + return + } + if len(args) < 2 { + err = ErrSyntax + return + } + + if strings.ToLower(ledis.String(args[1])) == "sum" { + aggregate = ledis.AggregateSum + } else if strings.ToLower(ledis.String(args[1])) == "min" { + aggregate = ledis.AggregateMin + } else if strings.ToLower(ledis.String(args[1])) == "max" { + aggregate = ledis.AggregateMax + } else { + err = ErrSyntax + return + } + args = args[2:] + aggregateFlag = true + } else { + err = ErrSyntax + return + } + } + if !aggregateFlag { + aggregate = ledis.AggregateSum + } + return +} + +func zunionstoreCommand(req *requestContext) error { + args := req.args + if len(args) < 2 { + return ErrCmdParams + } + + destKey, srcKeys, weights, aggregate, err := zparseZsetoptStore(args) + if err != nil { + return err + } + if n, err := req.db.ZUnionStore(destKey, srcKeys, weights, aggregate); err != nil { + return err + } else { + req.resp.writeInteger(n) + } + + return nil +} + +func zinterstoreCommand(req *requestContext) error { + args := req.args + if len(args) < 2 { + return ErrCmdParams + } + + destKey, srcKeys, weights, aggregate, err := zparseZsetoptStore(args) + if err != nil { + return err + } + if n, err := req.db.ZInterStore(destKey, srcKeys, weights, aggregate); err != nil { + return err + } else { + req.resp.writeInteger(n) + } + return nil +} + func init() { register("zadd", zaddCommand) register("zcard", zcardCommand) @@ -536,6 +650,9 @@ func init() { register("zrevrangebyscore", zrevrangebyscoreCommand) register("zscore", zscoreCommand) + register("zunionstore", zunionstoreCommand) + register("zinterstore", zinterstoreCommand) + //ledisdb special command register("zclear", zclearCommand) diff --git a/server/cmd_zset_test.go b/server/cmd_zset_test.go index d9b1272..8c74bdc 100644 --- a/server/cmd_zset_test.go +++ b/server/cmd_zset_test.go @@ -599,3 +599,141 @@ func TestZsetErrorParams(t *testing.T) { } } + +func TestZUnionStore(t *testing.T) { + c := getTestConn() + defer c.Close() + + if _, err := c.Do("zadd", "k1", "1", "one"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k1", "2", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "1", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "2", "three"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "weights", "1", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "weights", "1", "2", "aggregate", "min")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "aggregate", "max")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 2 { + t.Fatal("invalid value ", n) + } + } +} + +func TestZInterStore(t *testing.T) { + c := getTestConn() + defer c.Close() + + if _, err := c.Do("zadd", "k1", "1", "one"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k1", "2", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "1", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "2", "three"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "weights", "1", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "aggregate", "min", "weights", "1", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "aggregate", "sum")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } + + if _, err := c.Do("zadd", "k3", "3", "three"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "3", "k1", "k2", "k3", "aggregate", "sum")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 0 { + t.Fatal("invalid value ", n) + } + } + + if _, err := c.Do("zadd", "k3", "3", "two"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "3", "k1", "k2", "k3", "aggregate", "sum", "weights", "3", "2", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 14 { + t.Fatal("invalid value ", n) + } + } +} diff --git a/server/command_cnf.go b/server/command_cnf.go new file mode 100644 index 0000000..e830592 --- /dev/null +++ b/server/command_cnf.go @@ -0,0 +1,510 @@ +//This file was generated by ./generate.py on Mon Aug 11 2014 12:35:56 +0800 +package server + +type cmdConf struct { + name string + argDesc string + group string + readonly bool +} + +var cnfCmds = []cmdConf{ + { + "ZRANGEBYSCORE", + "key min max [WITHSCORES] [LIMIT offset count]", + "ZSet", + true, + }, + { + "ZPERSIST", + "key", + "ZSet", + false, + }, + { + "LTTL", + "key", + "List", + true, + }, + { + "LINDEX", + "key index", + "List", + true, + }, + { + "FULLSYNC", + "-", + "Replication", + false, + }, + { + "ZREVRANK", + "key member", + "ZSet", + true, + }, + { + "ZEXPIRE", + "key seconds", + "ZSet", + false, + }, + { + "SYNC", + "index offset", + "Replication", + false, + }, + { + "BMSETBIT", + "key offset value [offset value ...]", + "Bitmap", + false, + }, + { + "LPOP", + "key", + "List", + false, + }, + { + "HPERSIST", + "key", + "Hash", + false, + }, + { + "EXPIRE", + "key seconds", + "KV", + false, + }, + { + "DEL", + "key [key ...]", + "KV", + false, + }, + { + "LPUSH", + "key value [value ...]", + "List", + false, + }, + { + "PERSIST", + "key", + "KV", + false, + }, + { + "HTTL", + "key", + "Hash", + true, + }, + { + "LEXPIREAT", + "key timestamp", + "List", + false, + }, + { + "ZEXPIREAT", + "key timestamp", + "ZSet", + false, + }, + { + "DECR", + "key", + "KV", + false, + }, + { + "SLAVEOF", + "host port", + "Replication", + false, + }, + { + "INCR", + "key", + "KV", + false, + }, + { + "MSET", + "key value [key value ...]", + "KV", + false, + }, + { + "LEXPIRE", + "key seconds", + "List", + false, + }, + { + "HINCRBY", + "key field increment", + "Hash", + false, + }, + { + "GET", + "key", + "KV", + true, + }, + { + "ZREVRANGE", + "key start stop [WITHSCORES]", + "ZSet", + true, + }, + { + "ZINCRBY", + "key increment member", + "ZSet", + false, + }, + { + "LPERSIST", + "key", + "List", + false, + }, + { + "HEXISTS", + "key field", + "Hash", + true, + }, + { + "ZREM", + "key member [member ...]", + "ZSet", + false, + }, + { + "BOPT", + "operation destkey key [key ...]", + "Bitmap", + false, + }, + { + "ZCLEAR", + "key", + "ZSet", + false, + }, + { + "LCLEAR", + "key", + "List", + false, + }, + { + "ZRANK", + "key member", + "ZSet", + true, + }, + { + "TTL", + "key", + "KV", + true, + }, + { + "ZADD", + "key score member [score member ...]", + "ZSet", + false, + }, + { + "HEXPIRE", + "key seconds", + "Hash", + false, + }, + { + "HDEL", + "key field [field ...]", + "Hash", + false, + }, + { + "HSET", + "key field value", + "Hash", + false, + }, + { + "LLEN", + "key", + "List", + true, + }, + { + "HVALS", + "key", + "Hash", + true, + }, + { + "BCOUNT", + "key [start end]", + "Bitmap", + true, + }, + { + "BGET", + "key", + "Bitmap", + true, + }, + { + "MGET", + "key [key ...]", + "KV", + true, + }, + { + "EXISTS", + "key", + "KV", + true, + }, + { + "HMCLEAR", + "key [key ...]", + "Hash", + false, + }, + { + "ZCOUNT", + "key min max", + "ZSet", + true, + }, + { + "SELECT", + "index", + "Server", + false, + }, + { + "ECHO", + "message", + "Server", + true, + }, + { + "ZTTL", + "key", + "ZSet", + true, + }, + { + "HKEYS", + "key", + "Hash", + true, + }, + { + "HGETALL", + "key", + "Hash", + true, + }, + { + "RPOP", + "key", + "List", + false, + }, + { + "HMGET", + "key field [field ...]", + "Hash", + true, + }, + { + "SETNX", + "key value", + "KV", + false, + }, + { + "HGET", + "key field", + "Hash", + true, + }, + { + "BPERSIST", + "key", + "Bitmap", + false, + }, + { + "INCRBY", + "key increment", + "KV", + false, + }, + { + "BDELETE", + "key", + "ZSet", + false, + }, + { + "ZMCLEAR", + "key [key ...]", + "ZSet", + false, + }, + { + "RPUSH", + "key value [value ...]", + "List", + false, + }, + { + "LRANGE", + "key start stop", + "List", + true, + }, + { + "HLEN", + "key", + "Hash", + true, + }, + { + "ZSCORE", + "key member", + "ZSet", + true, + }, + { + "LMCLEAR", + "key [key ...]", + "List", + false, + }, + { + "EXPIREAT", + "key timestamp", + "KV", + false, + }, + { + "ZREMRANGEBYSCORE", + "key min max", + "ZSet", + false, + }, + { + "ZCARD", + "key", + "ZSet", + true, + }, + { + "ZREMRANGEBYRANK", + "key start stop", + "ZSet", + false, + }, + { + "PING", + "-", + "Server", + true, + }, + { + "HMSET", + "key field value [field value ...]", + "Hash", + false, + }, + { + "BTTL", + "key", + "Bitmap", + true, + }, + { + "HCLEAR", + "key", + "Hash", + false, + }, + { + "ZRANGE", + "key start stop [WITHSCORES]", + "ZSet", + false, + }, + { + "ZREVRANGEBYSCORE", + "key max min [WITHSCORES][LIMIT offset count]", + "ZSet", + true, + }, + { + "BSETBIT", + "key offset value", + "Bitmap", + false, + }, + { + "BEXPIREAT", + "key timestamp", + "Bitmap", + false, + }, + { + "SET", + "key value", + "KV", + false, + }, + { + "BGETBIT", + "key offset", + "Bitmap", + true, + }, + { + "BEXPIRE", + "key seconds", + "Bitmap", + false, + }, + { + "GETSET", + " key value", + "KV", + false, + }, + { + "DECRBY", + "key decrement", + "KV", + false, + }, + { + "HEXPIREAT", + "key timestamp", + "Hash", + false, + }, +} diff --git a/store/hyperleveldb.go b/store/hyperleveldb.go new file mode 100644 index 0000000..6ef70f1 --- /dev/null +++ b/store/hyperleveldb.go @@ -0,0 +1,11 @@ +// +build hyperleveldb + +package store + +import ( + "github.com/siddontang/ledisdb/store/hyperleveldb" +) + +func init() { + Register(hyperleveldb.Store{}) +} diff --git a/store/hyperleveldb/batch.go b/store/hyperleveldb/batch.go new file mode 100644 index 0000000..149f0b4 --- /dev/null +++ b/store/hyperleveldb/batch.go @@ -0,0 +1,61 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include "hyperleveldb/c.h" +import "C" + +import ( + "unsafe" +) + +type WriteBatch struct { + db *DB + wbatch *C.leveldb_writebatch_t +} + +func (w *WriteBatch) Close() error { + C.leveldb_writebatch_destroy(w.wbatch) + w.wbatch = nil + + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + + C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv)) +} + +func (w *WriteBatch) Delete(key []byte) { + C.leveldb_writebatch_delete(w.wbatch, + (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} + +func (w *WriteBatch) Commit() error { + return w.commit(w.db.writeOpts) +} + +func (w *WriteBatch) Rollback() error { + C.leveldb_writebatch_clear(w.wbatch) + return nil +} + +func (w *WriteBatch) commit(wb *WriteOptions) error { + var errStr *C.char + C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/store/hyperleveldb/cache.go b/store/hyperleveldb/cache.go new file mode 100644 index 0000000..9b73d21 --- /dev/null +++ b/store/hyperleveldb/cache.go @@ -0,0 +1,20 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +import "C" + +type Cache struct { + Cache *C.leveldb_cache_t +} + +func NewLRUCache(capacity int) *Cache { + return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))} +} + +func (c *Cache) Close() { + C.leveldb_cache_destroy(c.Cache) +} diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go new file mode 100644 index 0000000..946a2f6 --- /dev/null +++ b/store/hyperleveldb/db.go @@ -0,0 +1,259 @@ +// +build hyperleveldb + +// Package hyperleveldb is a wrapper for c++ hyperleveldb +package hyperleveldb + +/* +#cgo LDFLAGS: -lhyperleveldb +#include +#include "hyperleveldb_ext.h" +*/ +import "C" + +import ( + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store/driver" + "os" + "runtime" + "unsafe" +) + +const defaultFilterBits int = 10 + +type Store struct { +} + +func (s Store) String() string { + return "hyperleveldb" +} + +func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.path = path + db.cfg = &cfg.LevelDB + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func (s Store) Repair(path string, cfg *config.Config) error { + db := new(DB) + db.cfg = &cfg.LevelDB + db.path = path + + err := db.open() + defer db.Close() + + //open ok, do not need repair + if err == nil { + return nil + } + + var errStr *C.char + ldbname := C.CString(path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + C.leveldb_repair_db(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} + +type DB struct { + path string + + cfg *config.LevelDBConfig + + db *C.leveldb_t + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + cache *Cache + + filter *FilterPolicy +} + +func (db *DB) open() error { + db.initOptions(db.cfg) + + var errStr *C.char + ldbname := C.CString(db.path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + db.db = nil + return saveError(errStr) + } + return nil +} + +func (db *DB) initOptions(cfg *config.LevelDBConfig) { + opts := NewOptions() + + opts.SetCreateIfMissing(true) + + cfg.Adjust() + + db.cache = NewLRUCache(cfg.CacheSize) + opts.SetCache(db.cache) + + //we must use bloomfilter + db.filter = NewBloomFilter(defaultFilterBits) + opts.SetFilterPolicy(db.filter) + + if !cfg.Compression { + opts.SetCompression(NoCompression) + } else { + opts.SetCompression(SnappyCompression) + } + + opts.SetBlockSize(cfg.BlockSize) + + opts.SetWriteBufferSize(cfg.WriteBufferSize) + + opts.SetMaxOpenFiles(cfg.MaxOpenFiles) + + db.opts = opts + + db.readOpts = NewReadOptions() + db.writeOpts = NewWriteOptions() + + db.iteratorOpts = NewReadOptions() + db.iteratorOpts.SetFillCache(false) +} + +func (db *DB) Close() error { + if db.db != nil { + C.leveldb_close(db.db) + db.db = nil + } + + db.opts.Close() + + if db.cache != nil { + db.cache.Close() + } + + if db.filter != nil { + db.filter.Close() + } + + db.readOpts.Close() + db.writeOpts.Close() + db.iteratorOpts.Close() + + return nil +} + +func (db *DB) Put(key, value []byte) error { + return db.put(db.writeOpts, key, value) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + return db.get(db.readOpts, key) +} + +func (db *DB) Delete(key []byte) error { + return db.delete(db.writeOpts, key) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: C.leveldb_writebatch_create(), + } + + runtime.SetFinalizer(wb, func(w *WriteBatch) { + w.Close() + }) + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := new(Iterator) + + it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) + + return it +} + +func (db *DB) put(wo *WriteOptions, key, value []byte) error { + var errStr *C.char + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + C.leveldb_put( + db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { + var errStr *C.char + var vallen C.size_t + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + var value *C.char + + c := C.hyperleveldb_get_ext( + db.db, ro.Opt, k, C.size_t(len(key)), &value, &vallen, &errStr) + + if errStr != nil { + return nil, saveError(errStr) + } + + if value == nil { + return nil, nil + } + + defer C.hyperleveldb_get_free_ext(unsafe.Pointer(c)) + + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil +} + +func (db *DB) delete(wo *WriteOptions, key []byte) error { + var errStr *C.char + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + C.leveldb_delete( + db.db, wo.Opt, k, C.size_t(len(key)), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) Begin() (driver.Tx, error) { + return nil, driver.ErrTxSupport +} diff --git a/store/hyperleveldb/filterpolicy.go b/store/hyperleveldb/filterpolicy.go new file mode 100644 index 0000000..1c8f126 --- /dev/null +++ b/store/hyperleveldb/filterpolicy.go @@ -0,0 +1,21 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +import "C" + +type FilterPolicy struct { + Policy *C.leveldb_filterpolicy_t +} + +func NewBloomFilter(bitsPerKey int) *FilterPolicy { + policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey)) + return &FilterPolicy{policy} +} + +func (fp *FilterPolicy) Close() { + C.leveldb_filterpolicy_destroy(fp.Policy) +} diff --git a/store/hyperleveldb/hyperleveldb_ext.cc b/store/hyperleveldb/hyperleveldb_ext.cc new file mode 100644 index 0000000..dab687c --- /dev/null +++ b/store/hyperleveldb/hyperleveldb_ext.cc @@ -0,0 +1,88 @@ +// +build hyperleveldb + +#include "hyperleveldb_ext.h" + +#include +#include + +#include "hyperleveldb/db.h" + +using namespace leveldb; + +extern "C" { + +static bool SaveError(char** errptr, const Status& s) { + assert(errptr != NULL); + if (s.ok()) { + return false; + } else if (*errptr == NULL) { + *errptr = strdup(s.ToString().c_str()); + } else { + free(*errptr); + *errptr = strdup(s.ToString().c_str()); + } + return true; +} + +void* hyperleveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr) { + + std::string *tmp = new(std::string); + + //very tricky, maybe changed with c++ leveldb upgrade + Status s = (*(DB**)db)->Get(*(ReadOptions*)options, Slice(key, keylen), tmp); + + if (s.ok()) { + *valptr = (char*)tmp->data(); + *vallen = tmp->size(); + } else { + delete(tmp); + tmp = NULL; + *valptr = NULL; + *vallen = 0; + if (!s.IsNotFound()) { + SaveError(errptr, s); + } + } + return tmp; +} + +void hyperleveldb_get_free_ext(void* context) { + std::string* s = (std::string*)context; + + delete(s); +} + + +unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_first(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_last(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t* iter, const char* k, size_t klen) { + leveldb_iter_seek(iter, k, klen); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t* iter) { + leveldb_iter_next(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t* iter) { + leveldb_iter_prev(iter); + return leveldb_iter_valid(iter); +} + + +} \ No newline at end of file diff --git a/store/hyperleveldb/hyperleveldb_ext.h b/store/hyperleveldb/hyperleveldb_ext.h new file mode 100644 index 0000000..940a090 --- /dev/null +++ b/store/hyperleveldb/hyperleveldb_ext.h @@ -0,0 +1,40 @@ +// +build hyperleveldb + +#ifndef HYPERLEVELDB_EXT_H +#define HYPERLEVELDB_EXT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "hyperleveldb/c.h" + + +/* Returns NULL if not found. Otherwise stores the value in **valptr. + Stores the length of the value in *vallen. + Returns a context must be later to free*/ +extern void* hyperleveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr); + +// Free context returns by hyperleveldb_get_ext +extern void hyperleveldb_get_free_ext(void* context); + + +// Below iterator functions like leveldb iterator but returns valid status for iterator +extern unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, size_t klen); +extern unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t*); + + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/store/hyperleveldb/iterator.go b/store/hyperleveldb/iterator.go new file mode 100644 index 0000000..fc72ccb --- /dev/null +++ b/store/hyperleveldb/iterator.go @@ -0,0 +1,70 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +// #include "hyperleveldb_ext.h" +import "C" + +import ( + "unsafe" +) + +type Iterator struct { + it *C.leveldb_iterator_t + isValid C.uchar +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +func (it *Iterator) Close() error { + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(it.isValid) +} + +func (it *Iterator) Next() { + it.isValid = C.hyperleveldb_iter_next_ext(it.it) +} + +func (it *Iterator) Prev() { + it.isValid = C.hyperleveldb_iter_prev_ext(it.it) +} + +func (it *Iterator) First() { + it.isValid = C.hyperleveldb_iter_seek_to_first_ext(it.it) +} + +func (it *Iterator) Last() { + it.isValid = C.hyperleveldb_iter_seek_to_last_ext(it.it) +} + +func (it *Iterator) Seek(key []byte) { + it.isValid = C.hyperleveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} diff --git a/store/hyperleveldb/options.go b/store/hyperleveldb/options.go new file mode 100644 index 0000000..09c9a02 --- /dev/null +++ b/store/hyperleveldb/options.go @@ -0,0 +1,114 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include "hyperleveldb/c.h" +import "C" + +type CompressionOpt int + +const ( + NoCompression = CompressionOpt(0) + SnappyCompression = CompressionOpt(1) +) + +type Options struct { + Opt *C.leveldb_options_t +} + +type ReadOptions struct { + Opt *C.leveldb_readoptions_t +} + +type WriteOptions struct { + Opt *C.leveldb_writeoptions_t +} + +func NewOptions() *Options { + opt := C.leveldb_options_create() + return &Options{opt} +} + +func NewReadOptions() *ReadOptions { + opt := C.leveldb_readoptions_create() + return &ReadOptions{opt} +} + +func NewWriteOptions() *WriteOptions { + opt := C.leveldb_writeoptions_create() + return &WriteOptions{opt} +} + +func (o *Options) Close() { + C.leveldb_options_destroy(o.Opt) +} + +func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) { + C.leveldb_options_set_comparator(o.Opt, cmp) +} + +func (o *Options) SetErrorIfExists(error_if_exists bool) { + eie := boolToUchar(error_if_exists) + C.leveldb_options_set_error_if_exists(o.Opt, eie) +} + +func (o *Options) SetCache(cache *Cache) { + C.leveldb_options_set_cache(o.Opt, cache.Cache) +} + +func (o *Options) SetWriteBufferSize(s int) { + C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetParanoidChecks(pc bool) { + C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc)) +} + +func (o *Options) SetMaxOpenFiles(n int) { + C.leveldb_options_set_max_open_files(o.Opt, C.int(n)) +} + +func (o *Options) SetBlockSize(s int) { + C.leveldb_options_set_block_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetBlockRestartInterval(n int) { + C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n)) +} + +func (o *Options) SetCompression(t CompressionOpt) { + C.leveldb_options_set_compression(o.Opt, C.int(t)) +} + +func (o *Options) SetCreateIfMissing(b bool) { + C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b)) +} + +func (o *Options) SetFilterPolicy(fp *FilterPolicy) { + var policy *C.leveldb_filterpolicy_t + if fp != nil { + policy = fp.Policy + } + C.leveldb_options_set_filter_policy(o.Opt, policy) +} + +func (ro *ReadOptions) Close() { + C.leveldb_readoptions_destroy(ro.Opt) +} + +func (ro *ReadOptions) SetVerifyChecksums(b bool) { + C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b)) +} + +func (ro *ReadOptions) SetFillCache(b bool) { + C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) +} + +func (wo *WriteOptions) Close() { + C.leveldb_writeoptions_destroy(wo.Opt) +} + +func (wo *WriteOptions) SetSync(b bool) { + C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) +} diff --git a/store/hyperleveldb/util.go b/store/hyperleveldb/util.go new file mode 100644 index 0000000..5008e80 --- /dev/null +++ b/store/hyperleveldb/util.go @@ -0,0 +1,44 @@ +// +build hyperleveldb + +package hyperleveldb + +// #include "hyperleveldb/c.h" +import "C" +import ( + "fmt" + "reflect" + "unsafe" +) + +func boolToUchar(b bool) C.uchar { + uc := C.uchar(0) + if b { + uc = C.uchar(1) + } + return uc +} + +func ucharToBool(uc C.uchar) bool { + if uc == C.uchar(0) { + return false + } + return true +} + +func saveError(errStr *C.char) error { + if errStr != nil { + gs := C.GoString(errStr) + C.leveldb_free(unsafe.Pointer(errStr)) + return fmt.Errorf(gs) + } + return nil +} + +func slice(p unsafe.Pointer, n int) []byte { + var b []byte + pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pbyte.Data = uintptr(p) + pbyte.Len = n + pbyte.Cap = n + return b +} diff --git a/store/hyperleveldb_test.go b/store/hyperleveldb_test.go new file mode 100644 index 0000000..2d3a25b --- /dev/null +++ b/store/hyperleveldb_test.go @@ -0,0 +1,33 @@ +// +build hyperleveldb + +package store + +import ( + "github.com/siddontang/ledisdb/config" + "os" + "testing" +) + +func newTestHyperLevelDB() *DB { + cfg := new(config.Config) + cfg.DBName = "hyperleveldb" + cfg.DataDir = "/tmp/testdb" + + os.RemoveAll(getStorePath(cfg)) + + db, err := Open(cfg) + if err != nil { + println(err.Error()) + panic(err) + } + + return db +} + +func TestHyperLevelDB(t *testing.T) { + db := newTestHyperLevelDB() + + testStore(db, t) + + db.Close() +} diff --git a/store/mdb.go b/store/mdb.go index 97ccf38..1e097f1 100644 --- a/store/mdb.go +++ b/store/mdb.go @@ -1,3 +1,4 @@ +// +build !windows package store import ( diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go index 5a16463..ad767dc 100644 --- a/store/mdb/mdb.go +++ b/store/mdb/mdb.go @@ -23,6 +23,8 @@ type MDB struct { func (s Store) Open(path string, c *config.Config) (driver.IDB, error) { mapSize := c.LMDB.MapSize + noSync := c.LMDB.NoSync + if mapSize <= 0 { mapSize = 500 * 1024 * 1024 } @@ -48,7 +50,12 @@ func (s Store) Open(path string, c *config.Config) (driver.IDB, error) { } } - err = env.Open(path, mdb.NOSYNC|mdb.NOMETASYNC|mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755) + var flags uint = mdb.CREATE + if noSync { + flags |= mdb.NOSYNC | mdb.NOMETASYNC | mdb.WRITEMAP | mdb.MAPASYNC + } + + err = env.Open(path, flags, 0755) if err != nil { return MDB{}, err } diff --git a/store/mdb_test.go b/store/mdb_test.go index cfada26..4cd7a6b 100644 --- a/store/mdb_test.go +++ b/store/mdb_test.go @@ -1,3 +1,5 @@ +// +build !windows + package store import ( diff --git a/tools/redis_import/README.md b/tools/redis_import/README.md new file mode 100644 index 0000000..634ed60 --- /dev/null +++ b/tools/redis_import/README.md @@ -0,0 +1,18 @@ +## Notice + +1. The tool doesn't support `set` data type. +2. The tool doesn't support `bitmap` data type. +2. Our `zset` use integer instead of double, so the zset float score in Redis + will be **converted to integer**. +3. Only Support Redis version greater than `2.8.0`, because we use `scan` command to scan data. + Also, you need `redis-py` greater than `2.9.0`. + + + +## Usage + + + $ python redis_import.py redis_host redis_port redis_db ledis_host ledis_port + + +We will use the same db index as redis. That's to say, data in redis[0] will be transfer to ledisdb[0]. But if redis db `index >= 16`, we will refuse to transfer, because ledisdb only support db `index < 16`. \ No newline at end of file diff --git a/tools/redis_import/redis_import.py b/tools/redis_import/redis_import.py new file mode 100644 index 0000000..7a463ae --- /dev/null +++ b/tools/redis_import/redis_import.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# coding: utf-8 + +# refer: https://github.com/ideawu/ssdb/blob/master/tools/redis-import.php + +# Notice: for zset, float score will be converted to integer. + +import sys +import os +from collections import OrderedDict as od + +import redis +import ledis + +total = 0 +entries = 0 + + +def scan_available(redis_client): + """"Scan Command is available since redis-server 2.8.0""" + + if "scan" in dir(redis_client): + info = redis_client.info() + server_version = info["redis_version"] + version_list = server_version.split(".") + if len(version_list) > 2: + n = int(version_list[0]) * 10 + int(version_list[1]) + if n >= 28: + return True + return False + + +def set_ttl(redis_client, ledis_client, key, k_type): + k_types = { + "string": ledis_client.expire, + "list": ledis_client.lexpire, + "hash": ledis_client.hexpire, + "set": ledis_client.zexpire, + "zset": ledis_client.zexpire + } + timeout = redis_client.ttl(key) + if timeout > 0: + k_types[k_type](key, timeout) + + +def copy_key(redis_client, ledis_client, key, convert=False): + global entries + k_type = redis_client.type(key) + if k_type == "string": + value = redis_client.get(key) + ledis_client.set(key, value) + set_ttl(redis_client, ledis_client, key, k_type) + entries += 1 + + elif k_type == "list": + _list = redis_client.lrange(key, 0, -1) + for value in _list: + ledis_client.rpush(key, value) + set_ttl(redis_client, ledis_client, key, k_type) + entries += 1 + + elif k_type == "hash": + mapping = od(redis_client.hgetall(key)) + ledis_client.hmset(key, mapping) + set_ttl(redis_client, ledis_client, key, k_type) + entries += 1 + + elif k_type == "zset": + out = redis_client.zrange(key, 0, -1, withscores=True) + pieces = od() + for i in od(out).iteritems(): + pieces[i[0]] = int(i[1]) + ledis_client.zadd(key, **pieces) + set_ttl(redis_client, ledis_client, key, k_type) + entries += 1 + + else: + print "KEY %s of TYPE %s is not supported by LedisDB." % (key, k_type) + + +def copy_keys(redis_client, ledis_client, keys, convert=False): + for key in keys: + copy_key(redis_client, ledis_client, key, convert=convert) + + +def scan(redis_client, count=1000): + keys = [] + total = redis_client.dbsize() + if total > 1000: + print "It may take a while, be patient please." + + first = True + cursor = 0 + while cursor != 0 or first: + cursor, data = redis_client.scan(cursor, count=count) + keys.extend(data) + first = False + assert len(keys) == total + return keys, total + + +def copy(redis_client, ledis_client, count=1000, convert=False): + if scan_available(redis_client): + print "\nTransfer begin ...\n" + keys, total = scan(redis_client, count=count) + copy_keys(redis_client, ledis_client, keys, convert=convert) + + else: + msg = """We do not support Redis version less than 2.8.0. + Please check both your redis server version and redis-py + version. + """ + print msg + sys.exit() + print "%d keys, %d entries copied" % (total, entries) + + +def usage(): + usage = """ + Usage: + python %s redis_host redis_port redis_db ledis_host ledis_port + """ + print usage % os.path.basename(sys.argv[0]) + + +def get_prompt(choice): + yes = set(['yes', 'ye', 'y', '']) + no = set(['no', 'n']) + + if choice in yes: + return True + elif choice in no: + return False + else: + sys.stdout.write("Please respond with 'yes' or 'no'") + + +def main(): + if len(sys.argv) < 6: + usage() + sys.exit() + convert = False + if len(sys.argv) >= 6: + (redis_host, redis_port, redis_db, ledis_host, ledis_port) = sys.argv[1:6] + if int(redis_db) >= 16: + print redis_db + sys.exit("LedisDB only support 16 databases([0-15]") + + choice = raw_input("[y/N]").lower() + if not get_prompt(choice): + sys.exit("No proceed") + + redis_c = redis.Redis(host=redis_host, port=int(redis_port), db=int(redis_db)) + ledis_c = ledis.Ledis(host=ledis_host, port=int(ledis_port), db=int(redis_db)) + try: + redis_c.ping() + except redis.ConnectionError: + print "Could not connect to Redis Server" + sys.exit() + + try: + ledis_c.ping() + except redis.ConnectionError: + print "Could not connect to LedisDB Server" + sys.exit() + + copy(redis_c, ledis_c, convert=convert) + print "Done\n" + + +if __name__ == "__main__": + main() diff --git a/tools/redis_import/test.py b/tools/redis_import/test.py new file mode 100644 index 0000000..96cceeb --- /dev/null +++ b/tools/redis_import/test.py @@ -0,0 +1,120 @@ +#coding: utf-8 + +import random, string + +import redis +import ledis + +from redis_import import copy, scan, set_ttl + +rds = redis.Redis() +lds = ledis.Ledis(port=6380) + + +def random_word(words, length): + return ''.join(random.choice(words) for i in range(length)) + + +def get_words(): + word_file = "/usr/share/dict/words" + words = open(word_file).read().splitlines() + return words[:1000] + + +def get_mapping(words, length=1000): + d = {} + for word in words: + d[word] = random.randint(1, length) + return d + + +def random_string(client, words, length=1000): + d = get_mapping(words, length) + client.mset(d) + + +def random_hash(client, words, length=1000): + d = get_mapping(words, length) + client.hmset("hashName", d) + + +def random_list(client, words, length=1000): + client.lpush("listName", *words) + + +def random_zset(client, words, length=1000): + d = get_mapping(words, length) + client.zadd("zsetName", **d) + + +def test(): + words = get_words() + print "Flush all redis data before insert new." + rds.flushall() + + random_string(rds, words) + print "random_string done" + + random_hash(rds, words) + print "random_hash done" + + random_list(rds, words) + print "random_list done" + + random_zset(rds, words) + print "random_zset done" + + + lds.lclear("listName") + lds.hclear("hashName") + lds.zclear("zsetName") + copy(rds, lds, convert=True) + + # for all keys + keys = scan(rds, 1000) + for key in keys: + if rds.type(key) == "string" and not lds.exists(key): + print key + print "String data not consistent" + + # for list + l1 = rds.lrange("listName", 0, -1) + l2 = lds.lrange("listName", 0, -1) + assert l1 == l2 + + #for hash + for key in keys: + if rds.type(key) == "hash": + assert rds.hgetall(key) == lds.hgetall(key) + assert sorted(rds.hkeys(key)) == sorted(lds.hkeys(key)) + assert sorted(rds.hvals(key)) == sorted(lds.hvals(key)) + + # for zset + z1 = rds.zrange("zsetName", 0, -1, withscores=True) + z2 = lds.zrange("zsetName", 0, -1, withscores=True) + assert z1 == z2 + + +def ledis_ttl(ledis_client, key, k_type): + ttls = { + "string": lds.ttl, + "list": lds.lttl, + "hash": lds.httl, + "zset": lds.zttl, + } + return ttls[k_type](key) + + +def test_ttl(): + keys, total = scan(rds, 1000) + for key in keys: + k_type = rds.type(key) + rds.expire(key, (60 * 60 * 24)) + set_ttl(rds, lds, key, k_type) + if rds.ttl(key): + assert ledis_ttl(lds, key, k_type) > 0 + +if __name__ == "__main__": + test() + test_ttl() + print "Test passed."