From c5ee8ff3869f70bda616cceadbf58897935f5bfa Mon Sep 17 00:00:00 2001 From: holys Date: Fri, 8 Aug 2014 10:40:52 +0800 Subject: [PATCH 01/27] add redis_import --- tools/redis_import/README.md | 8 ++ tools/redis_import/redis_import.py | 120 +++++++++++++++++++++++++++++ tools/redis_import/test.py | 89 +++++++++++++++++++++ 3 files changed, 217 insertions(+) create mode 100644 tools/redis_import/README.md create mode 100644 tools/redis_import/redis_import.py create mode 100644 tools/redis_import/test.py diff --git a/tools/redis_import/README.md b/tools/redis_import/README.md new file mode 100644 index 0000000..3f97436 --- /dev/null +++ b/tools/redis_import/README.md @@ -0,0 +1,8 @@ +## Notice + +1. We don't support `set` data type. +2. Our `zset` use integer instead of float, so the zset float score in Redis + will be **converted to integer**. +3. Only Support Redis version greater than `2.8.0`, because we use `scan` command to scan data. + Also, you need `redis-py` greater than `2.9.0` + diff --git a/tools/redis_import/redis_import.py b/tools/redis_import/redis_import.py new file mode 100644 index 0000000..7bc7ed3 --- /dev/null +++ b/tools/redis_import/redis_import.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +# coding: utf-8 + +# refer: https://github.com/ideawu/ssdb/blob/master/tools/redis-import.php + +# Notice: for zset, float score will be converted to integer. + +import sys +import os +from collections import OrderedDict as od + +import redis + +total = 0 +entries = 0 + + +def scan_available(redis_client): + """"Scan Command is available since redis-server 2.8.0""" + + if "scan" in dir(redis_client): + info = redis_client.info() + server_version = info["redis_version"] + version_list = server_version.split(".") + if len(version_list) > 2: + n = int(version_list[0]) * 10 + int(version_list[1]) + if n >= 28: + return True + return False + + +def copy_key(redis_client, ledis_client, key): + global entries + k_type = redis_client.type(key) + if k_type == "string": + value = redis_client.get(key) + ledis_client.set(key, value) + entries += 1 + + elif k_type == "list": + _list = redis_client.lrange(key, 0, -1) + for value in _list: + ledis_client.rpush(key, value) + entries += 1 + + elif k_type == "hash": + mapping = od(redis_client.hgetall(key)) + ledis_client.hmset(key, mapping) + entries += 1 + + elif k_type == "zset": + out = redis_client.zrange(key, 0, -1, withscores=True) + pieces = od() + for i in od(out).iteritems(): + pieces[i[0]] = int(i[1]) + ledis_client.zadd(key, **pieces) + entries += 1 + + else: + print "%s is not supported by LedisDB." % k_type + + +def copy_keys(redis_client, ledis_client, keys): + for key in keys: + copy_key(redis_client, ledis_client, key) + + +def copy(redis_client, ledis_client, redis_db): + global total + if scan_available(redis_client): + total = redis_client.dbsize() + # scan return a + keys = redis_client.scan(cursor=0, count=total)[1] + copy_keys(redis_client, ledis_client, keys) + + else: + msg = """We do not support Redis version less than 2.8.0. + Please check both your redis server version and redis-py + version. + """ + print msg + sys.exit() + print "%d keys, %d entries copied" % (total, entries) + + +def usage(): + usage = """ + Usage: + python %s redis_host redis_port redis_db ledis_host ledis_port + """ + print usage % os.path.basename(sys.argv[0]) + + +def main(): + if len(sys.argv) != 6: + usage() + sys.exit() + + (redis_host, redis_port, redis_db, ledis_host, ledis_port) = sys.argv[1:] + + redis_c = redis.Redis(host=redis_host, port=int(redis_port), db=int(redis_db)) + ledis_c = redis.Redis(host=ledis_host, port=int(ledis_port), db=int(redis_db)) + try: + redis_c.ping() + except redis.ConnectionError: + print "Could not connect to Redis Server" + sys.exit() + + try: + ledis_c.ping() + except redis.ConnectionError: + print "Could not connect to LedisDB Server" + sys.exit() + + copy(redis_c, ledis_c, redis_db) + print "done\n" + + +if __name__ == "__main__": + main() diff --git a/tools/redis_import/test.py b/tools/redis_import/test.py new file mode 100644 index 0000000..9395321 --- /dev/null +++ b/tools/redis_import/test.py @@ -0,0 +1,89 @@ +#coding: utf-8 + +import random, string + +import redis + +from redis_import import copy + + +def random_word(words, length): + return ''.join(random.choice(words) for i in range(length)) + + +def get_words(): + word_file = "/usr/share/dict/words" + words = open(word_file).read().splitlines() + return words[:10] + + +def get_mapping(words, length=1000): + d = {} + for word in words: + d[word] = random.randint(1, length) + return d + + +def random_set(client, words, length=1000): + d = get_mapping(words, length) + client.mset(d) + + +def random_hset(client, words, length=1000): + d = get_mapping(words, length) + client.hmset("hashName", d) + + +def random_lpush(client, words, length=1000): + client.lpush("listName", *words) + + +def random_zadd(client, words, length=1000): + d = get_mapping(words, length) + client.zadd("myset", **d) + + +def test(): + words = get_words() + rds = redis.Redis() + print "Flush all redis data before insert new." + rds.flushall() + + random_set(rds, words) + print "random_set done" + random_hset(rds, words) + print "random_hset done" + random_lpush(rds, words) + print "random_lpush done" + random_zadd(rds, words) + + lds = redis.Redis(port=6380) + copy(rds, lds, 0) + + # for all keys + keys = rds.scan(0, count=rds.dbsize()) + for key in keys: + if rds.type(key) == "string" and not lds.exists(key): + print key + print "String data not consistent" + + # for list + l1 = rds.lrange("listName", 0, -1) + l2 = lds.lrange("listName", 0, -1) + assert l1 == l2 + + #for hash + + for key in keys: + if rds.type(key) == "hash" and not lds.hexists("hashName", key): + print "List data not consistent" + + # for zset + z1 = rds.zrange("myset", 0, -1, withscores=True) + z2 = lds.zrange("myset", 0, -1, withscores=True) + assert z1 == z2 + + +if __name__ == "__main__": + test() + print "Test passed." From e83965bfcc537bf2d25915156e4aa0c827f3f1ba Mon Sep 17 00:00:00 2001 From: holys Date: Fri, 8 Aug 2014 18:33:45 +0800 Subject: [PATCH 02/27] convert set to zset; ttl --- tools/redis_import/README.md | 2 +- tools/redis_import/redis_import.py | 84 ++++++++++++++++++++------ tools/redis_import/test.py | 95 +++++++++++++++++++++++------- 3 files changed, 141 insertions(+), 40 deletions(-) diff --git a/tools/redis_import/README.md b/tools/redis_import/README.md index 3f97436..e36b629 100644 --- a/tools/redis_import/README.md +++ b/tools/redis_import/README.md @@ -1,7 +1,7 @@ ## Notice 1. We don't support `set` data type. -2. Our `zset` use integer instead of float, so the zset float score in Redis +2. Our `zset` use integer instead of double, so the zset float score in Redis will be **converted to integer**. 3. Only Support Redis version greater than `2.8.0`, because we use `scan` command to scan data. Also, you need `redis-py` greater than `2.9.0` diff --git a/tools/redis_import/redis_import.py b/tools/redis_import/redis_import.py index 7bc7ed3..820487b 100644 --- a/tools/redis_import/redis_import.py +++ b/tools/redis_import/redis_import.py @@ -10,6 +10,7 @@ import os from collections import OrderedDict as od import redis +import ledis total = 0 entries = 0 @@ -29,23 +30,39 @@ def scan_available(redis_client): return False -def copy_key(redis_client, ledis_client, key): +def set_ttl(redis_client, ledis_client, key, k_type): + k_types = { + "string": ledis_client.expire, + "list": ledis_client.lexpire, + "hash": ledis_client.hexpire, + "set": ledis_client.zexpire, + "zset": ledis_client.zexpire + } + timeout = redis_client.ttl(key) + if timeout > 0: + k_types[k_type](key, timeout) + + +def copy_key(redis_client, ledis_client, key, convert=False): global entries k_type = redis_client.type(key) if k_type == "string": value = redis_client.get(key) ledis_client.set(key, value) + set_ttl(redis_client, ledis_client, key, k_type) entries += 1 elif k_type == "list": _list = redis_client.lrange(key, 0, -1) for value in _list: ledis_client.rpush(key, value) + set_ttl(redis_client, ledis_client, key, k_type) entries += 1 elif k_type == "hash": mapping = od(redis_client.hgetall(key)) ledis_client.hmset(key, mapping) + set_ttl(redis_client, ledis_client, key, k_type) entries += 1 elif k_type == "zset": @@ -54,24 +71,47 @@ def copy_key(redis_client, ledis_client, key): for i in od(out).iteritems(): pieces[i[0]] = int(i[1]) ledis_client.zadd(key, **pieces) + set_ttl(redis_client, ledis_client, key, k_type) entries += 1 + elif k_type == "set": + if convert: + print "Convert set %s to zset\n" % key + members = redis_client.smembers(key) + set_to_zset(ledis_client, key, members) + entries += 1 + else: + print "KEY %s of TYPE %s will not be converted to Zset" % (key, k_type) + else: - print "%s is not supported by LedisDB." % k_type + print "KEY %s of TYPE %s is not supported by LedisDB." % (key, k_type) -def copy_keys(redis_client, ledis_client, keys): +def copy_keys(redis_client, ledis_client, keys, convert=False): for key in keys: - copy_key(redis_client, ledis_client, key) + copy_key(redis_client, ledis_client, key, convert=convert) -def copy(redis_client, ledis_client, redis_db): - global total +def scan(redis_client, count=1000): + keys = [] + total = redis_client.dbsize() + + first = True + cursor = 0 + while cursor != 0 or first: + cursor, data = redis_client.scan(cursor, count=count) + keys.extend(data) + first = False + print len(keys) + print total + assert len(keys) == total + return keys, total + + +def copy(redis_client, ledis_client, count=1000, convert=False): if scan_available(redis_client): - total = redis_client.dbsize() - # scan return a - keys = redis_client.scan(cursor=0, count=total)[1] - copy_keys(redis_client, ledis_client, keys) + keys, total = scan(redis_client, count=count) + copy_keys(redis_client, ledis_client, keys, convert=convert) else: msg = """We do not support Redis version less than 2.8.0. @@ -83,23 +123,33 @@ def copy(redis_client, ledis_client, redis_db): print "%d keys, %d entries copied" % (total, entries) +def set_to_zset(ledis_client, key, members): + d = {} + for m in members: + d[m] = int(0) + ledis_client.zadd(key, **d) + + def usage(): usage = """ Usage: - python %s redis_host redis_port redis_db ledis_host ledis_port + python %s redis_host redis_port redis_db ledis_host ledis_port [True] """ print usage % os.path.basename(sys.argv[0]) def main(): - if len(sys.argv) != 6: + if len(sys.argv) < 6: usage() sys.exit() - - (redis_host, redis_port, redis_db, ledis_host, ledis_port) = sys.argv[1:] + convert = False + if len(sys.argv) >= 6: + (redis_host, redis_port, redis_db, ledis_host, ledis_port) = sys.argv[1:6] + if len(sys.argv) == 7 and sys.argv[-1] == "True" or sys.argv[-1] == "true": + convert = True redis_c = redis.Redis(host=redis_host, port=int(redis_port), db=int(redis_db)) - ledis_c = redis.Redis(host=ledis_host, port=int(ledis_port), db=int(redis_db)) + ledis_c = ledis.Ledis(host=ledis_host, port=int(ledis_port), db=int(redis_db)) try: redis_c.ping() except redis.ConnectionError: @@ -112,8 +162,8 @@ def main(): print "Could not connect to LedisDB Server" sys.exit() - copy(redis_c, ledis_c, redis_db) - print "done\n" + copy(redis_c, ledis_c, convert=convert) + print "done\n" if __name__ == "__main__": diff --git a/tools/redis_import/test.py b/tools/redis_import/test.py index 9395321..a7f7f09 100644 --- a/tools/redis_import/test.py +++ b/tools/redis_import/test.py @@ -3,8 +3,12 @@ import random, string import redis +import ledis -from redis_import import copy +from redis_import import copy, scan, set_ttl + +rds = redis.Redis() +lds = ledis.Ledis(port=6380) def random_word(words, length): @@ -14,7 +18,7 @@ def random_word(words, length): def get_words(): word_file = "/usr/share/dict/words" words = open(word_file).read().splitlines() - return words[:10] + return words[:1000] def get_mapping(words, length=1000): @@ -24,44 +28,57 @@ def get_mapping(words, length=1000): return d -def random_set(client, words, length=1000): +def random_string(client, words, length=1000): d = get_mapping(words, length) client.mset(d) -def random_hset(client, words, length=1000): +def random_hash(client, words, length=1000): d = get_mapping(words, length) client.hmset("hashName", d) -def random_lpush(client, words, length=1000): +def random_list(client, words, length=1000): client.lpush("listName", *words) -def random_zadd(client, words, length=1000): +def random_zset(client, words, length=1000): d = get_mapping(words, length) - client.zadd("myset", **d) + client.zadd("zsetName", **d) + + +def random_set(client, words, length=1000): + client.sadd("setName", *words) def test(): words = get_words() - rds = redis.Redis() print "Flush all redis data before insert new." rds.flushall() + random_string(rds, words) + print "random_string done" + + random_hash(rds, words) + print "random_hash done" + + random_list(rds, words) + print "random_list done" + + random_zset(rds, words) + print "random_zset done" + random_set(rds, words) print "random_set done" - random_hset(rds, words) - print "random_hset done" - random_lpush(rds, words) - print "random_lpush done" - random_zadd(rds, words) - lds = redis.Redis(port=6380) - copy(rds, lds, 0) + lds.lclear("listName") + lds.hclear("hashName") + lds.zclear("zsetName") + lds.zclear("setName") + copy(rds, lds, convert=True) # for all keys - keys = rds.scan(0, count=rds.dbsize()) + keys = scan(rds, 1000) for key in keys: if rds.type(key) == "string" and not lds.exists(key): print key @@ -73,17 +90,51 @@ def test(): assert l1 == l2 #for hash - for key in keys: - if rds.type(key) == "hash" and not lds.hexists("hashName", key): - print "List data not consistent" + if rds.type(key) == "hash": + assert rds.hgetall(key) == lds.hgetall(key) + assert sorted(rds.hkeys(key)) == sorted(lds.hkeys(key)) + assert sorted(rds.hvals(key)) == sorted(lds.hvals(key)) # for zset - z1 = rds.zrange("myset", 0, -1, withscores=True) - z2 = lds.zrange("myset", 0, -1, withscores=True) + z1 = rds.zrange("zsetName", 0, -1, withscores=True) + z2 = lds.zrange("zsetName", 0, -1, withscores=True) assert z1 == z2 - + + # fo set + assert set(rds.smembers("setName")) == set(lds.zrange("setName", 0, -1)) + for key in lds.zrange("setName", 0, -1): + assert int(lds.zscore("setName", key)) == 0 + + +def ledis_ttl(ledis_client, key, k_type): + ttls = { + "string": lds.ttl, + "list": lds.lttl, + "hash": lds.httl, + "zset": lds.zttl, + "set": lds.zttl + } + return ttls[k_type](key) + + +def test_ttl(): + keys, total = scan(rds, 1000) + invalid = [] + for key in keys: + k_type = rds.type(key) + rds.expire(key, 100) + set_ttl(rds, lds, key, k_type) + # if rds.ttl(key) != ledis_ttl(lds, key, k_type): + # print key + # print rds.ttl(key) + # print ledis_ttl(lds, key, k_type) + # invalid.append(key) + + assert rds.ttl(key) == ledis_ttl(lds, key, k_type) + print len(invalid) if __name__ == "__main__": test() + test_ttl() print "Test passed." From 3b5c7466a9ba42e353eb6dc6be7d146068f5a9f2 Mon Sep 17 00:00:00 2001 From: holys Date: Fri, 8 Aug 2014 18:37:23 +0800 Subject: [PATCH 03/27] update readme --- tools/redis_import/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/redis_import/README.md b/tools/redis_import/README.md index e36b629..d7b2737 100644 --- a/tools/redis_import/README.md +++ b/tools/redis_import/README.md @@ -6,3 +6,9 @@ 3. Only Support Redis version greater than `2.8.0`, because we use `scan` command to scan data. Also, you need `redis-py` greater than `2.9.0` + + +## Usage + + + $ python redis_import.py redis_host redis_port redis_db ledis_host ledis_port [True] \ No newline at end of file From ac71ec598cd8d48add0a6278079d11bfc703f564 Mon Sep 17 00:00:00 2001 From: holys Date: Fri, 8 Aug 2014 18:37:23 +0800 Subject: [PATCH 04/27] update readme --- tools/redis_import/README.md | 15 +++++++++++++-- tools/redis_import/redis_import.py | 23 ++++++++++++++++++++--- tools/redis_import/test.py | 1 - 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/tools/redis_import/README.md b/tools/redis_import/README.md index e36b629..44ee28a 100644 --- a/tools/redis_import/README.md +++ b/tools/redis_import/README.md @@ -1,8 +1,19 @@ ## Notice -1. We don't support `set` data type. +1. The tool doesn't support `set` data type. +2. The tool doesn't support `bitmap` data type. 2. Our `zset` use integer instead of double, so the zset float score in Redis will be **converted to integer**. 3. Only Support Redis version greater than `2.8.0`, because we use `scan` command to scan data. - Also, you need `redis-py` greater than `2.9.0` + Also, you need `redis-py` greater than `2.9.0`. + + +## Usage + + + $ python redis_import.py redis_host redis_port redis_db ledis_host ledis_port [True] + +The option `True` means convert `set` to `zset` or not, if not, set it to `False`. + +We will use the same db index as redis. That's to say, data in redis[0] will be transfer to ledisdb[0]. \ No newline at end of file diff --git a/tools/redis_import/redis_import.py b/tools/redis_import/redis_import.py index 820487b..b3d0cfa 100644 --- a/tools/redis_import/redis_import.py +++ b/tools/redis_import/redis_import.py @@ -95,6 +95,8 @@ def copy_keys(redis_client, ledis_client, keys, convert=False): def scan(redis_client, count=1000): keys = [] total = redis_client.dbsize() + if total > 1000: + print "It may take a while, be patient please." first = True cursor = 0 @@ -102,14 +104,13 @@ def scan(redis_client, count=1000): cursor, data = redis_client.scan(cursor, count=count) keys.extend(data) first = False - print len(keys) - print total assert len(keys) == total return keys, total def copy(redis_client, ledis_client, count=1000, convert=False): if scan_available(redis_client): + print "\nTransfer begin ...\n" keys, total = scan(redis_client, count=count) copy_keys(redis_client, ledis_client, keys, convert=convert) @@ -138,6 +139,18 @@ def usage(): print usage % os.path.basename(sys.argv[0]) +def get_prompt(choice): + yes = set(['yes', 'ye', 'y', '']) + no = set(['no', 'n']) + + if choice in yes: + return True + elif choice in no: + return False + else: + sys.stdout.write("Please respond with 'yes' or 'no'") + + def main(): if len(sys.argv) < 6: usage() @@ -148,6 +161,10 @@ def main(): if len(sys.argv) == 7 and sys.argv[-1] == "True" or sys.argv[-1] == "true": convert = True + choice = raw_input("[y/N]").lower() + if not get_prompt(choice): + sys.exit("No proceed") + redis_c = redis.Redis(host=redis_host, port=int(redis_port), db=int(redis_db)) ledis_c = ledis.Ledis(host=ledis_host, port=int(ledis_port), db=int(redis_db)) try: @@ -163,7 +180,7 @@ def main(): sys.exit() copy(redis_c, ledis_c, convert=convert) - print "done\n" + print "Done\n" if __name__ == "__main__": diff --git a/tools/redis_import/test.py b/tools/redis_import/test.py index a7f7f09..4e77efc 100644 --- a/tools/redis_import/test.py +++ b/tools/redis_import/test.py @@ -132,7 +132,6 @@ def test_ttl(): # invalid.append(key) assert rds.ttl(key) == ledis_ttl(lds, key, k_type) - print len(invalid) if __name__ == "__main__": test() From 5ce968006c3873a6bc32dd569624c74f9ff50604 Mon Sep 17 00:00:00 2001 From: holys Date: Mon, 11 Aug 2014 10:10:36 +0800 Subject: [PATCH 05/27] update test --- tools/redis_import/test.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tools/redis_import/test.py b/tools/redis_import/test.py index 4e77efc..f60ee7d 100644 --- a/tools/redis_import/test.py +++ b/tools/redis_import/test.py @@ -120,18 +120,12 @@ def ledis_ttl(ledis_client, key, k_type): def test_ttl(): keys, total = scan(rds, 1000) - invalid = [] for key in keys: k_type = rds.type(key) - rds.expire(key, 100) + rds.expire(key, (60 * 60 * 24)) set_ttl(rds, lds, key, k_type) - # if rds.ttl(key) != ledis_ttl(lds, key, k_type): - # print key - # print rds.ttl(key) - # print ledis_ttl(lds, key, k_type) - # invalid.append(key) - - assert rds.ttl(key) == ledis_ttl(lds, key, k_type) + if rds.ttl(key): + assert ledis_ttl(lds, key, k_type) > 0 if __name__ == "__main__": test() From 74b0c60ecee2e8629e52267f30a6ded1b6d6705c Mon Sep 17 00:00:00 2001 From: holys Date: Mon, 11 Aug 2014 12:38:07 +0800 Subject: [PATCH 06/27] update generate.py; add command_cnf.go --- doc/commands.json | 2 +- generate.py | 98 +++++--- server/command_cnf.go | 510 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 574 insertions(+), 36 deletions(-) create mode 100644 server/command_cnf.go diff --git a/doc/commands.json b/doc/commands.json index 521f86e..8af2c90 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -17,7 +17,7 @@ "BEXPIREAT": { "arguments": "key timestamp", "group": "Bitmap", - "readonly": false, + "readonly": false }, "BGET": { "arguments": "key", diff --git a/generate.py b/generate.py index beba9e5..1295e22 100644 --- a/generate.py +++ b/generate.py @@ -2,32 +2,18 @@ import json import time +import sys +import os from collections import OrderedDict as dict - -def go_array_to_json(path): - """Convert `./cmd/ledis-cli/const.go` to commands.json""" - fp = open(path).read() - commands_str = fp.split("string")[1] - _commands_str = commands_str.splitlines()[1:len(commands_str.splitlines())-1] - commands_d = dict() - values_d = dict() - for i in _commands_str: - t = i.split('"') - values_d.update( - { - "arguments": "%s" % t[3], - "group": "%s" % t[5] - }) - values_d = dict(sorted(values_d.items())) - d = { - "%s" % t[1]: values_d - } - commands_d.update(d) - - fp = open("commands.json", "w") - json.dump(commands_d, fp, indent=4) - fp.close() +content = u"""\n +type cmdConf struct { + name string + argDesc string + group string + readonly bool +} +""" def json_to_js(json_path, js_path): @@ -39,7 +25,7 @@ def json_to_js(json_path, js_path): keys.append(k.encode('utf-8')) with open(js_path, "w") as fp: generate_time(fp) - fp.write("module.exports = [\n" ) + fp.write("module.exports = [\n") for k in sorted(keys): fp.write('\t"%s",\n' % k.lower()) fp.write("]") @@ -53,22 +39,64 @@ def json_to_go_array(json_path, go_path): g_fp.write("package main\n\nvar helpCommands = [][]string{\n") _json_sorted = dict(sorted(_json.items(), key=lambda x: x[0])) for k, v in _json_sorted.iteritems(): - print k, v g_fp.write('\t{"%s", "%s", "%s"},\n' % (k, v["arguments"], v["group"])) g_fp.write("}\n") g_fp.close() +def json_to_command_cnf(json_path, go_path): + g_fp = open(go_path, "w") + + with open(json_path) as fp: + _json = json.load(fp) + generate_time(g_fp) + g_fp.write("package server") + print >> g_fp, content + g_fp.write("var cnfCmds = []cmdConf{\n") + for k, v in _json.iteritems(): + g_fp.write('\t{\n\t\t"%s",\n\t\t"%s",\n\t\t"%s", \n\t\t%s,\n\t},\n' % + (k, v["arguments"], v["group"], "true" if v["readonly"] else "false" )) + g_fp.write("}\n") + g_fp.close() + + def generate_time(fp): - fp.write("//This file was generated by ./generate.py on %s \n" % \ - time.strftime('%a %b %d %Y %H:%M:%S %z')) + fp.write("//This file was generated by ./generate.py on %s \n" % + time.strftime('%a %b %d %Y %H:%M:%S %z')) + if __name__ == "__main__": - path = "./cmd/ledis-cli/const.go" - # go_array_to_json(path) - json_path = "./commands.json" - js_path = "./commands.js" - json_to_js(json_path, js_path) - go_path = "const.go" + usage = """ + Usage: python %s src_path dst_path" - json_to_go_array(json_path, path) + 1. for Node.js client: + + python generate.py /path/to/commands.json /path/to/commands.js + + 2. for cmd/ledis_cli/const.go + + python generate.py /path/to/commands.json /path/to/const.go + + 3. for server/command_cnf.go + + python generate.py /path/to/commands.json /path/to/command_cnf.go + + """ + + if len(sys.argv) != 3: + sys.exit(usage % os.path.basename(sys.argv[0])) + + src_path, dst_path = sys.argv[1:] + dst_path_base = os.path.basename(dst_path) + + if dst_path_base.endswith(".js"): + json_to_js(src_path, dst_path) + + elif dst_path_base.startswith("const.go"): + json_to_go_array(src_path, dst_path) + + elif dst_path_base.startswith("command"): + json_to_command_cnf(src_path, dst_path) + + else: + print "Not support arguments" diff --git a/server/command_cnf.go b/server/command_cnf.go new file mode 100644 index 0000000..e830592 --- /dev/null +++ b/server/command_cnf.go @@ -0,0 +1,510 @@ +//This file was generated by ./generate.py on Mon Aug 11 2014 12:35:56 +0800 +package server + +type cmdConf struct { + name string + argDesc string + group string + readonly bool +} + +var cnfCmds = []cmdConf{ + { + "ZRANGEBYSCORE", + "key min max [WITHSCORES] [LIMIT offset count]", + "ZSet", + true, + }, + { + "ZPERSIST", + "key", + "ZSet", + false, + }, + { + "LTTL", + "key", + "List", + true, + }, + { + "LINDEX", + "key index", + "List", + true, + }, + { + "FULLSYNC", + "-", + "Replication", + false, + }, + { + "ZREVRANK", + "key member", + "ZSet", + true, + }, + { + "ZEXPIRE", + "key seconds", + "ZSet", + false, + }, + { + "SYNC", + "index offset", + "Replication", + false, + }, + { + "BMSETBIT", + "key offset value [offset value ...]", + "Bitmap", + false, + }, + { + "LPOP", + "key", + "List", + false, + }, + { + "HPERSIST", + "key", + "Hash", + false, + }, + { + "EXPIRE", + "key seconds", + "KV", + false, + }, + { + "DEL", + "key [key ...]", + "KV", + false, + }, + { + "LPUSH", + "key value [value ...]", + "List", + false, + }, + { + "PERSIST", + "key", + "KV", + false, + }, + { + "HTTL", + "key", + "Hash", + true, + }, + { + "LEXPIREAT", + "key timestamp", + "List", + false, + }, + { + "ZEXPIREAT", + "key timestamp", + "ZSet", + false, + }, + { + "DECR", + "key", + "KV", + false, + }, + { + "SLAVEOF", + "host port", + "Replication", + false, + }, + { + "INCR", + "key", + "KV", + false, + }, + { + "MSET", + "key value [key value ...]", + "KV", + false, + }, + { + "LEXPIRE", + "key seconds", + "List", + false, + }, + { + "HINCRBY", + "key field increment", + "Hash", + false, + }, + { + "GET", + "key", + "KV", + true, + }, + { + "ZREVRANGE", + "key start stop [WITHSCORES]", + "ZSet", + true, + }, + { + "ZINCRBY", + "key increment member", + "ZSet", + false, + }, + { + "LPERSIST", + "key", + "List", + false, + }, + { + "HEXISTS", + "key field", + "Hash", + true, + }, + { + "ZREM", + "key member [member ...]", + "ZSet", + false, + }, + { + "BOPT", + "operation destkey key [key ...]", + "Bitmap", + false, + }, + { + "ZCLEAR", + "key", + "ZSet", + false, + }, + { + "LCLEAR", + "key", + "List", + false, + }, + { + "ZRANK", + "key member", + "ZSet", + true, + }, + { + "TTL", + "key", + "KV", + true, + }, + { + "ZADD", + "key score member [score member ...]", + "ZSet", + false, + }, + { + "HEXPIRE", + "key seconds", + "Hash", + false, + }, + { + "HDEL", + "key field [field ...]", + "Hash", + false, + }, + { + "HSET", + "key field value", + "Hash", + false, + }, + { + "LLEN", + "key", + "List", + true, + }, + { + "HVALS", + "key", + "Hash", + true, + }, + { + "BCOUNT", + "key [start end]", + "Bitmap", + true, + }, + { + "BGET", + "key", + "Bitmap", + true, + }, + { + "MGET", + "key [key ...]", + "KV", + true, + }, + { + "EXISTS", + "key", + "KV", + true, + }, + { + "HMCLEAR", + "key [key ...]", + "Hash", + false, + }, + { + "ZCOUNT", + "key min max", + "ZSet", + true, + }, + { + "SELECT", + "index", + "Server", + false, + }, + { + "ECHO", + "message", + "Server", + true, + }, + { + "ZTTL", + "key", + "ZSet", + true, + }, + { + "HKEYS", + "key", + "Hash", + true, + }, + { + "HGETALL", + "key", + "Hash", + true, + }, + { + "RPOP", + "key", + "List", + false, + }, + { + "HMGET", + "key field [field ...]", + "Hash", + true, + }, + { + "SETNX", + "key value", + "KV", + false, + }, + { + "HGET", + "key field", + "Hash", + true, + }, + { + "BPERSIST", + "key", + "Bitmap", + false, + }, + { + "INCRBY", + "key increment", + "KV", + false, + }, + { + "BDELETE", + "key", + "ZSet", + false, + }, + { + "ZMCLEAR", + "key [key ...]", + "ZSet", + false, + }, + { + "RPUSH", + "key value [value ...]", + "List", + false, + }, + { + "LRANGE", + "key start stop", + "List", + true, + }, + { + "HLEN", + "key", + "Hash", + true, + }, + { + "ZSCORE", + "key member", + "ZSet", + true, + }, + { + "LMCLEAR", + "key [key ...]", + "List", + false, + }, + { + "EXPIREAT", + "key timestamp", + "KV", + false, + }, + { + "ZREMRANGEBYSCORE", + "key min max", + "ZSet", + false, + }, + { + "ZCARD", + "key", + "ZSet", + true, + }, + { + "ZREMRANGEBYRANK", + "key start stop", + "ZSet", + false, + }, + { + "PING", + "-", + "Server", + true, + }, + { + "HMSET", + "key field value [field value ...]", + "Hash", + false, + }, + { + "BTTL", + "key", + "Bitmap", + true, + }, + { + "HCLEAR", + "key", + "Hash", + false, + }, + { + "ZRANGE", + "key start stop [WITHSCORES]", + "ZSet", + false, + }, + { + "ZREVRANGEBYSCORE", + "key max min [WITHSCORES][LIMIT offset count]", + "ZSet", + true, + }, + { + "BSETBIT", + "key offset value", + "Bitmap", + false, + }, + { + "BEXPIREAT", + "key timestamp", + "Bitmap", + false, + }, + { + "SET", + "key value", + "KV", + false, + }, + { + "BGETBIT", + "key offset", + "Bitmap", + true, + }, + { + "BEXPIRE", + "key seconds", + "Bitmap", + false, + }, + { + "GETSET", + " key value", + "KV", + false, + }, + { + "DECRBY", + "key decrement", + "KV", + false, + }, + { + "HEXPIREAT", + "key timestamp", + "Hash", + false, + }, +} From 057a19bb8fb0e92ac212eaba0aa51c92f771601e Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 11 Aug 2014 14:24:37 +0800 Subject: [PATCH 07/27] update read me --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index a55fb94..9dc0ddb 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,6 @@ See [Issues todo](https://github.com/siddontang/ledisdb/issues?labels=todo&page= ## Links + [Official Website](http://ledisdb.com) -+ [Author's Chinese Blog](http://blog.csdn.net/siddontang/article/category/2264003) + [GoDoc](https://godoc.org/github.com/siddontang/ledisdb) + [Server Commands](https://github.com/siddontang/ledisdb/wiki/Commands) From 55e743354132a883157d360db97a50ee70a2246a Mon Sep 17 00:00:00 2001 From: holys Date: Mon, 11 Aug 2014 14:45:15 +0800 Subject: [PATCH 08/27] remove support for transfering redis set to ledisdb zset --- tools/redis_import/README.md | 5 ++--- tools/redis_import/redis_import.py | 23 ++++------------------- tools/redis_import/test.py | 13 ------------- 3 files changed, 6 insertions(+), 35 deletions(-) diff --git a/tools/redis_import/README.md b/tools/redis_import/README.md index 6c5fbbf..634ed60 100644 --- a/tools/redis_import/README.md +++ b/tools/redis_import/README.md @@ -12,8 +12,7 @@ ## Usage - $ python redis_import.py redis_host redis_port redis_db ledis_host ledis_port [True] + $ python redis_import.py redis_host redis_port redis_db ledis_host ledis_port -The option `True` means convert `set` to `zset` or not, if not, set it to `False`. -We will use the same db index as redis. That's to say, data in redis[0] will be transfer to ledisdb[0]. +We will use the same db index as redis. That's to say, data in redis[0] will be transfer to ledisdb[0]. But if redis db `index >= 16`, we will refuse to transfer, because ledisdb only support db `index < 16`. \ No newline at end of file diff --git a/tools/redis_import/redis_import.py b/tools/redis_import/redis_import.py index b3d0cfa..7a463ae 100644 --- a/tools/redis_import/redis_import.py +++ b/tools/redis_import/redis_import.py @@ -74,15 +74,6 @@ def copy_key(redis_client, ledis_client, key, convert=False): set_ttl(redis_client, ledis_client, key, k_type) entries += 1 - elif k_type == "set": - if convert: - print "Convert set %s to zset\n" % key - members = redis_client.smembers(key) - set_to_zset(ledis_client, key, members) - entries += 1 - else: - print "KEY %s of TYPE %s will not be converted to Zset" % (key, k_type) - else: print "KEY %s of TYPE %s is not supported by LedisDB." % (key, k_type) @@ -124,17 +115,10 @@ def copy(redis_client, ledis_client, count=1000, convert=False): print "%d keys, %d entries copied" % (total, entries) -def set_to_zset(ledis_client, key, members): - d = {} - for m in members: - d[m] = int(0) - ledis_client.zadd(key, **d) - - def usage(): usage = """ Usage: - python %s redis_host redis_port redis_db ledis_host ledis_port [True] + python %s redis_host redis_port redis_db ledis_host ledis_port """ print usage % os.path.basename(sys.argv[0]) @@ -158,8 +142,9 @@ def main(): convert = False if len(sys.argv) >= 6: (redis_host, redis_port, redis_db, ledis_host, ledis_port) = sys.argv[1:6] - if len(sys.argv) == 7 and sys.argv[-1] == "True" or sys.argv[-1] == "true": - convert = True + if int(redis_db) >= 16: + print redis_db + sys.exit("LedisDB only support 16 databases([0-15]") choice = raw_input("[y/N]").lower() if not get_prompt(choice): diff --git a/tools/redis_import/test.py b/tools/redis_import/test.py index f60ee7d..96cceeb 100644 --- a/tools/redis_import/test.py +++ b/tools/redis_import/test.py @@ -47,10 +47,6 @@ def random_zset(client, words, length=1000): client.zadd("zsetName", **d) -def random_set(client, words, length=1000): - client.sadd("setName", *words) - - def test(): words = get_words() print "Flush all redis data before insert new." @@ -68,13 +64,10 @@ def test(): random_zset(rds, words) print "random_zset done" - random_set(rds, words) - print "random_set done" lds.lclear("listName") lds.hclear("hashName") lds.zclear("zsetName") - lds.zclear("setName") copy(rds, lds, convert=True) # for all keys @@ -101,11 +94,6 @@ def test(): z2 = lds.zrange("zsetName", 0, -1, withscores=True) assert z1 == z2 - # fo set - assert set(rds.smembers("setName")) == set(lds.zrange("setName", 0, -1)) - for key in lds.zrange("setName", 0, -1): - assert int(lds.zscore("setName", key)) == 0 - def ledis_ttl(ledis_client, key, k_type): ttls = { @@ -113,7 +101,6 @@ def ledis_ttl(ledis_client, key, k_type): "list": lds.lttl, "hash": lds.httl, "zset": lds.zttl, - "set": lds.zttl } return ttls[k_type](key) From e50153964966eebbb8324ab0c207e1cd2259b881 Mon Sep 17 00:00:00 2001 From: holys Date: Tue, 12 Aug 2014 00:54:53 +0800 Subject: [PATCH 09/27] add HyperLevelDB support --- dev.sh | 11 ++ store/hyperleveldb.go | 9 + store/hyperleveldb/batch.go | 61 ++++++ store/hyperleveldb/cache.go | 20 ++ store/hyperleveldb/db.go | 259 +++++++++++++++++++++++++ store/hyperleveldb/filterpolicy.go | 21 ++ store/hyperleveldb/hyperleveldb_ext.cc | 88 +++++++++ store/hyperleveldb/hyperleveldb_ext.h | 40 ++++ store/hyperleveldb/iterator.go | 70 +++++++ store/hyperleveldb/options.go | 114 +++++++++++ store/hyperleveldb/util.go | 44 +++++ store/hyperleveldb_test.go | 31 +++ 12 files changed, 768 insertions(+) create mode 100644 store/hyperleveldb.go create mode 100644 store/hyperleveldb/batch.go create mode 100644 store/hyperleveldb/cache.go create mode 100644 store/hyperleveldb/db.go create mode 100644 store/hyperleveldb/filterpolicy.go create mode 100644 store/hyperleveldb/hyperleveldb_ext.cc create mode 100644 store/hyperleveldb/hyperleveldb_ext.h create mode 100644 store/hyperleveldb/iterator.go create mode 100644 store/hyperleveldb/options.go create mode 100644 store/hyperleveldb/util.go create mode 100644 store/hyperleveldb_test.go diff --git a/dev.sh b/dev.sh index 6d753f2..798ffab 100644 --- a/dev.sh +++ b/dev.sh @@ -13,6 +13,7 @@ fi SNAPPY_DIR=/usr/local/snappy LEVELDB_DIR=/usr/local/leveldb ROCKSDB_DIR=/usr/local/rocksdb +HYPERLEVELDB_DIR=/usr/local/hyperleveldb function add_path() { @@ -63,6 +64,16 @@ if [ -f $ROCKSDB_DIR/include/rocksdb/c.h ]; then GO_BUILD_TAGS="$GO_BUILD_TAGS rocksdb" fi +#check hyperleveldb +if [ -f $HYPERLEVELDB_DIR/include/hyperleveldb/c.h ]; then + CGO_CFLAGS="$CGO_CFLAGS -I$HYPERLEVELDB_DIR/include" + CGO_CXXFLAGS="$CGO_CXXFLAGS -I$HYPERLEVELDB_DIR/include" + CGO_LDFLAGS="$CGO_LDFLAGS -L$HYPERLEVELDB_DIR/lib -lhyperleveldb" + LD_LIBRARY_PATH=$(add_path $LD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib) + DYLD_LIBRARY_PATH=$(add_path $DYLD_LIBRARY_PATH $HYPERLEVELDB_DIR/lib) + GO_BUILD_TAGS="$GO_BUILD_TAGS hyperleveldb" +fi + export CGO_CFLAGS export CGO_CXXFLAGS export CGO_LDFLAGS diff --git a/store/hyperleveldb.go b/store/hyperleveldb.go new file mode 100644 index 0000000..f26631b --- /dev/null +++ b/store/hyperleveldb.go @@ -0,0 +1,9 @@ +package store + +import ( + "github.com/siddontang/ledisdb/store/hyperleveldb" +) + +func init() { + Register(hyperleveldb.Store{}) +} diff --git a/store/hyperleveldb/batch.go b/store/hyperleveldb/batch.go new file mode 100644 index 0000000..149f0b4 --- /dev/null +++ b/store/hyperleveldb/batch.go @@ -0,0 +1,61 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include "hyperleveldb/c.h" +import "C" + +import ( + "unsafe" +) + +type WriteBatch struct { + db *DB + wbatch *C.leveldb_writebatch_t +} + +func (w *WriteBatch) Close() error { + C.leveldb_writebatch_destroy(w.wbatch) + w.wbatch = nil + + return nil +} + +func (w *WriteBatch) Put(key, value []byte) { + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + + C.leveldb_writebatch_put(w.wbatch, k, C.size_t(lenk), v, C.size_t(lenv)) +} + +func (w *WriteBatch) Delete(key []byte) { + C.leveldb_writebatch_delete(w.wbatch, + (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} + +func (w *WriteBatch) Commit() error { + return w.commit(w.db.writeOpts) +} + +func (w *WriteBatch) Rollback() error { + C.leveldb_writebatch_clear(w.wbatch) + return nil +} + +func (w *WriteBatch) commit(wb *WriteOptions) error { + var errStr *C.char + C.leveldb_write(w.db.db, wb.Opt, w.wbatch, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} diff --git a/store/hyperleveldb/cache.go b/store/hyperleveldb/cache.go new file mode 100644 index 0000000..9b73d21 --- /dev/null +++ b/store/hyperleveldb/cache.go @@ -0,0 +1,20 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +import "C" + +type Cache struct { + Cache *C.leveldb_cache_t +} + +func NewLRUCache(capacity int) *Cache { + return &Cache{C.leveldb_cache_create_lru(C.size_t(capacity))} +} + +func (c *Cache) Close() { + C.leveldb_cache_destroy(c.Cache) +} diff --git a/store/hyperleveldb/db.go b/store/hyperleveldb/db.go new file mode 100644 index 0000000..946a2f6 --- /dev/null +++ b/store/hyperleveldb/db.go @@ -0,0 +1,259 @@ +// +build hyperleveldb + +// Package hyperleveldb is a wrapper for c++ hyperleveldb +package hyperleveldb + +/* +#cgo LDFLAGS: -lhyperleveldb +#include +#include "hyperleveldb_ext.h" +*/ +import "C" + +import ( + "github.com/siddontang/ledisdb/config" + "github.com/siddontang/ledisdb/store/driver" + "os" + "runtime" + "unsafe" +) + +const defaultFilterBits int = 10 + +type Store struct { +} + +func (s Store) String() string { + return "hyperleveldb" +} + +func (s Store) Open(path string, cfg *config.Config) (driver.IDB, error) { + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return nil, err + } + + db := new(DB) + db.path = path + db.cfg = &cfg.LevelDB + + if err := db.open(); err != nil { + return nil, err + } + + return db, nil +} + +func (s Store) Repair(path string, cfg *config.Config) error { + db := new(DB) + db.cfg = &cfg.LevelDB + db.path = path + + err := db.open() + defer db.Close() + + //open ok, do not need repair + if err == nil { + return nil + } + + var errStr *C.char + ldbname := C.CString(path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + C.leveldb_repair_db(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + return saveError(errStr) + } + return nil +} + +type DB struct { + path string + + cfg *config.LevelDBConfig + + db *C.leveldb_t + + opts *Options + + //for default read and write options + readOpts *ReadOptions + writeOpts *WriteOptions + iteratorOpts *ReadOptions + + cache *Cache + + filter *FilterPolicy +} + +func (db *DB) open() error { + db.initOptions(db.cfg) + + var errStr *C.char + ldbname := C.CString(db.path) + defer C.leveldb_free(unsafe.Pointer(ldbname)) + + db.db = C.leveldb_open(db.opts.Opt, ldbname, &errStr) + if errStr != nil { + db.db = nil + return saveError(errStr) + } + return nil +} + +func (db *DB) initOptions(cfg *config.LevelDBConfig) { + opts := NewOptions() + + opts.SetCreateIfMissing(true) + + cfg.Adjust() + + db.cache = NewLRUCache(cfg.CacheSize) + opts.SetCache(db.cache) + + //we must use bloomfilter + db.filter = NewBloomFilter(defaultFilterBits) + opts.SetFilterPolicy(db.filter) + + if !cfg.Compression { + opts.SetCompression(NoCompression) + } else { + opts.SetCompression(SnappyCompression) + } + + opts.SetBlockSize(cfg.BlockSize) + + opts.SetWriteBufferSize(cfg.WriteBufferSize) + + opts.SetMaxOpenFiles(cfg.MaxOpenFiles) + + db.opts = opts + + db.readOpts = NewReadOptions() + db.writeOpts = NewWriteOptions() + + db.iteratorOpts = NewReadOptions() + db.iteratorOpts.SetFillCache(false) +} + +func (db *DB) Close() error { + if db.db != nil { + C.leveldb_close(db.db) + db.db = nil + } + + db.opts.Close() + + if db.cache != nil { + db.cache.Close() + } + + if db.filter != nil { + db.filter.Close() + } + + db.readOpts.Close() + db.writeOpts.Close() + db.iteratorOpts.Close() + + return nil +} + +func (db *DB) Put(key, value []byte) error { + return db.put(db.writeOpts, key, value) +} + +func (db *DB) Get(key []byte) ([]byte, error) { + return db.get(db.readOpts, key) +} + +func (db *DB) Delete(key []byte) error { + return db.delete(db.writeOpts, key) +} + +func (db *DB) NewWriteBatch() driver.IWriteBatch { + wb := &WriteBatch{ + db: db, + wbatch: C.leveldb_writebatch_create(), + } + + runtime.SetFinalizer(wb, func(w *WriteBatch) { + w.Close() + }) + return wb +} + +func (db *DB) NewIterator() driver.IIterator { + it := new(Iterator) + + it.it = C.leveldb_create_iterator(db.db, db.iteratorOpts.Opt) + + return it +} + +func (db *DB) put(wo *WriteOptions, key, value []byte) error { + var errStr *C.char + var k, v *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + if len(value) != 0 { + v = (*C.char)(unsafe.Pointer(&value[0])) + } + + lenk := len(key) + lenv := len(value) + C.leveldb_put( + db.db, wo.Opt, k, C.size_t(lenk), v, C.size_t(lenv), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) get(ro *ReadOptions, key []byte) ([]byte, error) { + var errStr *C.char + var vallen C.size_t + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + var value *C.char + + c := C.hyperleveldb_get_ext( + db.db, ro.Opt, k, C.size_t(len(key)), &value, &vallen, &errStr) + + if errStr != nil { + return nil, saveError(errStr) + } + + if value == nil { + return nil, nil + } + + defer C.hyperleveldb_get_free_ext(unsafe.Pointer(c)) + + return C.GoBytes(unsafe.Pointer(value), C.int(vallen)), nil +} + +func (db *DB) delete(wo *WriteOptions, key []byte) error { + var errStr *C.char + var k *C.char + if len(key) != 0 { + k = (*C.char)(unsafe.Pointer(&key[0])) + } + + C.leveldb_delete( + db.db, wo.Opt, k, C.size_t(len(key)), &errStr) + + if errStr != nil { + return saveError(errStr) + } + return nil +} + +func (db *DB) Begin() (driver.Tx, error) { + return nil, driver.ErrTxSupport +} diff --git a/store/hyperleveldb/filterpolicy.go b/store/hyperleveldb/filterpolicy.go new file mode 100644 index 0000000..1c8f126 --- /dev/null +++ b/store/hyperleveldb/filterpolicy.go @@ -0,0 +1,21 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +import "C" + +type FilterPolicy struct { + Policy *C.leveldb_filterpolicy_t +} + +func NewBloomFilter(bitsPerKey int) *FilterPolicy { + policy := C.leveldb_filterpolicy_create_bloom(C.int(bitsPerKey)) + return &FilterPolicy{policy} +} + +func (fp *FilterPolicy) Close() { + C.leveldb_filterpolicy_destroy(fp.Policy) +} diff --git a/store/hyperleveldb/hyperleveldb_ext.cc b/store/hyperleveldb/hyperleveldb_ext.cc new file mode 100644 index 0000000..dab687c --- /dev/null +++ b/store/hyperleveldb/hyperleveldb_ext.cc @@ -0,0 +1,88 @@ +// +build hyperleveldb + +#include "hyperleveldb_ext.h" + +#include +#include + +#include "hyperleveldb/db.h" + +using namespace leveldb; + +extern "C" { + +static bool SaveError(char** errptr, const Status& s) { + assert(errptr != NULL); + if (s.ok()) { + return false; + } else if (*errptr == NULL) { + *errptr = strdup(s.ToString().c_str()); + } else { + free(*errptr); + *errptr = strdup(s.ToString().c_str()); + } + return true; +} + +void* hyperleveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr) { + + std::string *tmp = new(std::string); + + //very tricky, maybe changed with c++ leveldb upgrade + Status s = (*(DB**)db)->Get(*(ReadOptions*)options, Slice(key, keylen), tmp); + + if (s.ok()) { + *valptr = (char*)tmp->data(); + *vallen = tmp->size(); + } else { + delete(tmp); + tmp = NULL; + *valptr = NULL; + *vallen = 0; + if (!s.IsNotFound()) { + SaveError(errptr, s); + } + } + return tmp; +} + +void hyperleveldb_get_free_ext(void* context) { + std::string* s = (std::string*)context; + + delete(s); +} + + +unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_first(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t* iter) { + leveldb_iter_seek_to_last(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t* iter, const char* k, size_t klen) { + leveldb_iter_seek(iter, k, klen); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t* iter) { + leveldb_iter_next(iter); + return leveldb_iter_valid(iter); +} + +unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t* iter) { + leveldb_iter_prev(iter); + return leveldb_iter_valid(iter); +} + + +} \ No newline at end of file diff --git a/store/hyperleveldb/hyperleveldb_ext.h b/store/hyperleveldb/hyperleveldb_ext.h new file mode 100644 index 0000000..940a090 --- /dev/null +++ b/store/hyperleveldb/hyperleveldb_ext.h @@ -0,0 +1,40 @@ +// +build hyperleveldb + +#ifndef HYPERLEVELDB_EXT_H +#define HYPERLEVELDB_EXT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "hyperleveldb/c.h" + + +/* Returns NULL if not found. Otherwise stores the value in **valptr. + Stores the length of the value in *vallen. + Returns a context must be later to free*/ +extern void* hyperleveldb_get_ext( + leveldb_t* db, + const leveldb_readoptions_t* options, + const char* key, size_t keylen, + char** valptr, + size_t* vallen, + char** errptr); + +// Free context returns by hyperleveldb_get_ext +extern void hyperleveldb_get_free_ext(void* context); + + +// Below iterator functions like leveldb iterator but returns valid status for iterator +extern unsigned char hyperleveldb_iter_seek_to_first_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_seek_to_last_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_seek_ext(leveldb_iterator_t*, const char* k, size_t klen); +extern unsigned char hyperleveldb_iter_next_ext(leveldb_iterator_t*); +extern unsigned char hyperleveldb_iter_prev_ext(leveldb_iterator_t*); + + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/store/hyperleveldb/iterator.go b/store/hyperleveldb/iterator.go new file mode 100644 index 0000000..fc72ccb --- /dev/null +++ b/store/hyperleveldb/iterator.go @@ -0,0 +1,70 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include +// #include "hyperleveldb/c.h" +// #include "hyperleveldb_ext.h" +import "C" + +import ( + "unsafe" +) + +type Iterator struct { + it *C.leveldb_iterator_t + isValid C.uchar +} + +func (it *Iterator) Key() []byte { + var klen C.size_t + kdata := C.leveldb_iter_key(it.it, &klen) + if kdata == nil { + return nil + } + + return slice(unsafe.Pointer(kdata), int(C.int(klen))) +} + +func (it *Iterator) Value() []byte { + var vlen C.size_t + vdata := C.leveldb_iter_value(it.it, &vlen) + if vdata == nil { + return nil + } + + return slice(unsafe.Pointer(vdata), int(C.int(vlen))) +} + +func (it *Iterator) Close() error { + if it.it != nil { + C.leveldb_iter_destroy(it.it) + it.it = nil + } + return nil +} + +func (it *Iterator) Valid() bool { + return ucharToBool(it.isValid) +} + +func (it *Iterator) Next() { + it.isValid = C.hyperleveldb_iter_next_ext(it.it) +} + +func (it *Iterator) Prev() { + it.isValid = C.hyperleveldb_iter_prev_ext(it.it) +} + +func (it *Iterator) First() { + it.isValid = C.hyperleveldb_iter_seek_to_first_ext(it.it) +} + +func (it *Iterator) Last() { + it.isValid = C.hyperleveldb_iter_seek_to_last_ext(it.it) +} + +func (it *Iterator) Seek(key []byte) { + it.isValid = C.hyperleveldb_iter_seek_ext(it.it, (*C.char)(unsafe.Pointer(&key[0])), C.size_t(len(key))) +} diff --git a/store/hyperleveldb/options.go b/store/hyperleveldb/options.go new file mode 100644 index 0000000..09c9a02 --- /dev/null +++ b/store/hyperleveldb/options.go @@ -0,0 +1,114 @@ +// +build hyperleveldb + +package hyperleveldb + +// #cgo LDFLAGS: -lhyperleveldb +// #include "hyperleveldb/c.h" +import "C" + +type CompressionOpt int + +const ( + NoCompression = CompressionOpt(0) + SnappyCompression = CompressionOpt(1) +) + +type Options struct { + Opt *C.leveldb_options_t +} + +type ReadOptions struct { + Opt *C.leveldb_readoptions_t +} + +type WriteOptions struct { + Opt *C.leveldb_writeoptions_t +} + +func NewOptions() *Options { + opt := C.leveldb_options_create() + return &Options{opt} +} + +func NewReadOptions() *ReadOptions { + opt := C.leveldb_readoptions_create() + return &ReadOptions{opt} +} + +func NewWriteOptions() *WriteOptions { + opt := C.leveldb_writeoptions_create() + return &WriteOptions{opt} +} + +func (o *Options) Close() { + C.leveldb_options_destroy(o.Opt) +} + +func (o *Options) SetComparator(cmp *C.leveldb_comparator_t) { + C.leveldb_options_set_comparator(o.Opt, cmp) +} + +func (o *Options) SetErrorIfExists(error_if_exists bool) { + eie := boolToUchar(error_if_exists) + C.leveldb_options_set_error_if_exists(o.Opt, eie) +} + +func (o *Options) SetCache(cache *Cache) { + C.leveldb_options_set_cache(o.Opt, cache.Cache) +} + +func (o *Options) SetWriteBufferSize(s int) { + C.leveldb_options_set_write_buffer_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetParanoidChecks(pc bool) { + C.leveldb_options_set_paranoid_checks(o.Opt, boolToUchar(pc)) +} + +func (o *Options) SetMaxOpenFiles(n int) { + C.leveldb_options_set_max_open_files(o.Opt, C.int(n)) +} + +func (o *Options) SetBlockSize(s int) { + C.leveldb_options_set_block_size(o.Opt, C.size_t(s)) +} + +func (o *Options) SetBlockRestartInterval(n int) { + C.leveldb_options_set_block_restart_interval(o.Opt, C.int(n)) +} + +func (o *Options) SetCompression(t CompressionOpt) { + C.leveldb_options_set_compression(o.Opt, C.int(t)) +} + +func (o *Options) SetCreateIfMissing(b bool) { + C.leveldb_options_set_create_if_missing(o.Opt, boolToUchar(b)) +} + +func (o *Options) SetFilterPolicy(fp *FilterPolicy) { + var policy *C.leveldb_filterpolicy_t + if fp != nil { + policy = fp.Policy + } + C.leveldb_options_set_filter_policy(o.Opt, policy) +} + +func (ro *ReadOptions) Close() { + C.leveldb_readoptions_destroy(ro.Opt) +} + +func (ro *ReadOptions) SetVerifyChecksums(b bool) { + C.leveldb_readoptions_set_verify_checksums(ro.Opt, boolToUchar(b)) +} + +func (ro *ReadOptions) SetFillCache(b bool) { + C.leveldb_readoptions_set_fill_cache(ro.Opt, boolToUchar(b)) +} + +func (wo *WriteOptions) Close() { + C.leveldb_writeoptions_destroy(wo.Opt) +} + +func (wo *WriteOptions) SetSync(b bool) { + C.leveldb_writeoptions_set_sync(wo.Opt, boolToUchar(b)) +} diff --git a/store/hyperleveldb/util.go b/store/hyperleveldb/util.go new file mode 100644 index 0000000..5008e80 --- /dev/null +++ b/store/hyperleveldb/util.go @@ -0,0 +1,44 @@ +// +build hyperleveldb + +package hyperleveldb + +// #include "hyperleveldb/c.h" +import "C" +import ( + "fmt" + "reflect" + "unsafe" +) + +func boolToUchar(b bool) C.uchar { + uc := C.uchar(0) + if b { + uc = C.uchar(1) + } + return uc +} + +func ucharToBool(uc C.uchar) bool { + if uc == C.uchar(0) { + return false + } + return true +} + +func saveError(errStr *C.char) error { + if errStr != nil { + gs := C.GoString(errStr) + C.leveldb_free(unsafe.Pointer(errStr)) + return fmt.Errorf(gs) + } + return nil +} + +func slice(p unsafe.Pointer, n int) []byte { + var b []byte + pbyte := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + pbyte.Data = uintptr(p) + pbyte.Len = n + pbyte.Cap = n + return b +} diff --git a/store/hyperleveldb_test.go b/store/hyperleveldb_test.go new file mode 100644 index 0000000..51bef8f --- /dev/null +++ b/store/hyperleveldb_test.go @@ -0,0 +1,31 @@ +package store + +import ( + "github.com/siddontang/ledisdb/config" + "os" + "testing" +) + +func newTestHyperLevelDB() *DB { + cfg := new(config.Config) + cfg.DBName = "hyperleveldb" + cfg.DataDir = "/tmp/testdb" + + os.RemoveAll(getStorePath(cfg)) + + db, err := Open(cfg) + if err != nil { + println(err.Error()) + panic(err) + } + + return db +} + +func TestHyperLevelDB(t *testing.T) { + db := newTestHyperLevelDB() + + testStore(db, t) + + db.Close() +} From 0b69b5017d6d9e8f21d5c5a6cdcb3036124beadb Mon Sep 17 00:00:00 2001 From: holys Date: Tue, 12 Aug 2014 14:44:16 +0800 Subject: [PATCH 10/27] update readme, add hyperleveldb --- README.md | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9dc0ddb..36f89d7 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,16 @@ Ledisdb is a high performance NoSQL like Redis written by go. It supports some advanced data structure like kv, list, hash, zset, bitmap, and may be alternative for Redis. -LedisDB now supports multi databases as backend to store data, you can test and choose the proper one for you. +LedisDB now supports multiple databases as backend to store data, you can test and choose the proper one for you. ## Features + Rich advanced data structure: KV, List, Hash, ZSet, Bitmap. + Stores lots of data, over the memory limit. -+ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB. ++ Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, HyperLevelDB. + Supports expiration and ttl. + Redis clients, like redis-cli, are supported directly. -+ Multi client API supports, including Go, Python, Lua(Openresty). ++ Multiple client API supports, including Go, Python, Lua(Openresty), Node.js. + Easy to embed in your own Go application. + Restful API support, json/bson/msgpack output. + Replication to guarantee data safe. @@ -51,16 +51,29 @@ Create a workspace and checkout ledisdb source ## RocksDB support -+ Install rocksdb(shared_lib) and snappy first. ++ [Install rocksdb](https://github.com/facebook/rocksdb/blob/master/INSTALL.md)(`make shared_lib`) and snappy first. LedisDB has not supplied a simple script to install, maybe later. -+ Set ```ROCKSDB_DIR``` and ```SNAPPY_DIR``` to the actual install path in dev.sh. ++ Set ```ROCKSDB_DIR``` and ```SNAPPY_DIR``` to the actual install path in `dev.sh`. + ```make``` + + + +## HyperLevelDB support + ++ [Install hyperleveldb](https://github.com/rescrv/HyperLevelDB/blob/master/README) and snappy first. + + LedisDB has not supplied a simple script to install, maybe later. + ++ Set `HYPERLEVELDB` and `SNAPPY_DIR` to the actual install path in `dev.sh`. ++ `make` + + ## Choose store database -LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, it will choose goleveldb as default to store data if you not set. +LedisDB now supports goleveldb, lmdb, leveldb, rocksdb, boltdb, hyperleveldb. it will choose goleveldb as default to store data if you not set. Choosing a store database to use is very simple, you have two ways: From de98e7887eeff272631b8152aa6b82857b3ce6ec Mon Sep 17 00:00:00 2001 From: holys Date: Tue, 12 Aug 2014 14:49:02 +0800 Subject: [PATCH 11/27] update readme, add C/C++ --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 36f89d7..724ad21 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ LedisDB now supports multiple databases as backend to store data, you can test a + Various backend database to use: LevelDB, goleveldb, LMDB, RocksDB, BoltDB, HyperLevelDB. + Supports expiration and ttl. + Redis clients, like redis-cli, are supported directly. -+ Multiple client API supports, including Go, Python, Lua(Openresty), Node.js. ++ Multiple client API supports, including Go, Python, Lua(Openresty), C/C++, Node.js. + Easy to embed in your own Go application. + Restful API support, json/bson/msgpack output. + Replication to guarantee data safe. From 3927d2bd8fe3662ec7251fe240ddae8cf1bb24b2 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Tue, 12 Aug 2014 18:06:02 +0800 Subject: [PATCH 12/27] add zunionstore & zintrestore interface --- ledis/t_zset.go | 158 +++++++++++++++++++++++++++++++++++++++++++ ledis/t_zset_test.go | 33 +++++++++ 2 files changed, 191 insertions(+) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 151f8eb..add519f 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -12,6 +12,10 @@ const ( MinScore int64 = -1<<63 + 1 MaxScore int64 = 1<<63 - 1 InvalidScore int64 = -1 << 63 + + AggregateSum byte = 0 + AggregateMin byte = 1 + AggregateMax byte = 2 ) type ScorePair struct { @@ -23,6 +27,9 @@ var errZSizeKey = errors.New("invalid zsize key") var errZSetKey = errors.New("invalid zset key") var errZScoreKey = errors.New("invalid zscore key") var errScoreOverflow = errors.New("zset score overflow") +var errInvalidAggregate = errors.New("invalid aggregate") +var errInvalidWeightNum = errors.New("invalid weight number") +var errInvalidSrcKeyNum = errors.New("invalid src key number") const ( zsetNScoreSep byte = '<' @@ -839,3 +846,154 @@ func (db *DB) ZPersist(key []byte) (int64, error) { err = t.Commit() return n, err } + +func getAggregateFunc(aggregate byte) func(int64, int64) int64 { + switch aggregate { + case AggregateSum: + return func(a int64, b int64) int64 { + return a + b + } + case AggregateMax: + return func(a int64, b int64) int64 { + if a > b { + return a + } + return b + } + case AggregateMin: + return func(a int64, b int64) int64 { + if a > b { + return b + } + return a + } + } + return nil +} + +func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { + + var destMap = map[string]int64{} + aggregateFunc := getAggregateFunc(aggregate) + if aggregateFunc == nil { + return 0, errInvalidAggregate + } + if len(srcKeys) < 1 { + return 0, errInvalidSrcKeyNum + } + if weights != nil { + if len(srcKeys) != len(weights) { + return 0, errInvalidWeightNum + } + } else { + weights = make([]int64, len(srcKeys)) + for i := 0; i < len(weights); i++ { + weights[i] = 1 + } + } + + for i, key := range srcKeys { + scorePairs, err := db.ZRange(key, 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + if score, ok := destMap[String(pair.Member)]; !ok { + destMap[String(pair.Member)] = pair.Score + } else { + destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) + } + } + } + + t := db.zsetTx + t.Lock() + defer t.Unlock() + + db.zDelete(t, destKey) + + var num int64 = 0 + for member, score := range destMap { + if err := checkZSetKMSize(destKey, []byte(member)); err != nil { + return 0, err + } + + if n, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil { + return 0, err + } else if n == 0 { + //add new + num++ + } + } + + if _, err := db.zIncrSize(t, destKey, num); err != nil { + return 0, err + } + + //todo add binlog + err := t.Commit() + if err != nil { + return 0, err + } + return int64(len(destMap)), nil +} + +func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { + + var destMap = map[string]int64{} + aggregateFunc := getAggregateFunc(aggregate) + if aggregateFunc == nil { + return 0, errInvalidAggregate + } + if len(srcKeys) < 1 { + return 0, errInvalidSrcKeyNum + } + if weights != nil { + if len(srcKeys) != len(weights) { + return 0, errInvalidWeightNum + } + } else { + weights = make([]int64, len(srcKeys)) + for i := 0; i < len(weights); i++ { + weights[i] = 1 + } + } + + var keptMembers [][]byte + for i, key := range srcKeys { + scorePairs, err := db.ZRange(key, 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + if score, ok := destMap[String(pair.Member)]; !ok { + destMap[String(pair.Member)] = pair.Score + } else { + keptMembers = append(keptMembers, pair.Member) + destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) + } + } + } + + t := db.zsetTx + t.Lock() + defer t.Unlock() + + db.zDelete(t, destKey) + + var num int64 = 0 + for _, member := range keptMembers { + score := destMap[String(member)] + if err := checkZSetKMSize(destKey, member); err != nil { + return 0, err + } + + if n, err := db.zSetItem(t, destKey, score, member); err != nil { + return 0, err + } else if n == 0 { + //add new + num++ + } + } + return int64(len(keptMembers)), nil +} diff --git a/ledis/t_zset_test.go b/ledis/t_zset_test.go index 74cf526..857f005 100644 --- a/ledis/t_zset_test.go +++ b/ledis/t_zset_test.go @@ -264,3 +264,36 @@ func TestZSetPersist(t *testing.T) { t.Fatal(n) } } + +func TestZUnionStore(t *testing.T) { + db := getTestDB() + key1 := []byte("key1") + key2 := []byte("key2") + + db.ZAdd(key1, ScorePair{1, []byte("one")}) + db.ZAdd(key1, ScorePair{1, []byte("two")}) + + db.ZAdd(key2, ScorePair{2, []byte("two")}) + db.ZAdd(key2, ScorePair{2, []byte("three")}) + + keys := [][]byte{key1, key2} + weights := []int64{1, 2} + + out := []byte("out") + n, err := db.ZUnionStore(out, keys, weights, AggregateSum) + if err != nil { + t.Fatal(err.Error()) + } + if n != 3 { + t.Fatal("invalid value ", n) + } + + v, err := db.ZScore(out, []byte("two")) + + if err != nil { + t.Fatal(err.Error()) + } + if v != 5 { + t.Fatal("invalid value ", v) + } +} From e1a2ebc27a36592673f226ee01d8730d436dd527 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 09:57:13 +0800 Subject: [PATCH 13/27] add unit test --- ledis/t_zset.go | 8 +++-- ledis/t_zset_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index add519f..e0e8e2a 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -931,8 +931,7 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg } //todo add binlog - err := t.Commit() - if err != nil { + if err := t.Commit(); err != nil { return 0, err } return int64(len(destMap)), nil @@ -995,5 +994,10 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg num++ } } + + //todo add binlog + if err := t.Commit(); err != nil { + return 0, err + } return int64(len(keptMembers)), nil } diff --git a/ledis/t_zset_test.go b/ledis/t_zset_test.go index 857f005..d36572a 100644 --- a/ledis/t_zset_test.go +++ b/ledis/t_zset_test.go @@ -296,4 +296,73 @@ func TestZUnionStore(t *testing.T) { if v != 5 { t.Fatal("invalid value ", v) } + + out = []byte("out") + n, err = db.ZUnionStore(out, keys, weights, AggregateMax) + if err != nil { + t.Fatal(err.Error()) + } + if n != 3 { + t.Fatal("invalid value ", n) + } + + v, err = db.ZScore(out, []byte("two")) + + if err != nil { + t.Fatal(err.Error()) + } + if v != 4 { + t.Fatal("invalid value ", v) + } +} + +func TestZInterStore(t *testing.T) { + db := getTestDB() + + key1 := []byte("key1") + key2 := []byte("key2") + + db.ZAdd(key1, ScorePair{1, []byte("one")}) + db.ZAdd(key1, ScorePair{1, []byte("two")}) + + db.ZAdd(key2, ScorePair{2, []byte("two")}) + db.ZAdd(key2, ScorePair{2, []byte("three")}) + + keys := [][]byte{key1, key2} + weights := []int64{1, 2} + out := []byte("out") + + n, err := db.ZInterStore(out, keys, weights, AggregateSum) + if err != nil { + t.Fatal(err.Error()) + } + if n != 1 { + t.Fatal("invalid value ", n) + } + v, err := db.ZScore(out, []byte("two")) + if err != nil { + t.Fatal(err.Error()) + } + if v != 5 { + t.Fatal("invalid value ", v) + } + + out = []byte("out") + n, err = db.ZInterStore(out, keys, weights, AggregateMin) + if err != nil { + t.Fatal(err.Error()) + } + if n != 1 { + t.Fatal("invalid value ", n) + } + + v, err = db.ZScore(out, []byte("two")) + + if err != nil { + t.Fatal(err.Error()) + } + if v != 1 { + t.Fatal("invalid value ", v) + } + } From 87fdc9125e6dda85e6c91e061bf5f31a7902869e Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 10:26:35 +0800 Subject: [PATCH 14/27] add unit test --- ledis/t_zset.go | 3 +++ ledis/t_zset_test.go | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index e0e8e2a..5d6bfb3 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -995,6 +995,9 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg } } + if _, err := db.zIncrSize(t, destKey, num); err != nil { + return 0, err + } //todo add binlog if err := t.Commit(); err != nil { return 0, err diff --git a/ledis/t_zset_test.go b/ledis/t_zset_test.go index d36572a..fc4f41f 100644 --- a/ledis/t_zset_test.go +++ b/ledis/t_zset_test.go @@ -314,6 +314,16 @@ func TestZUnionStore(t *testing.T) { if v != 4 { t.Fatal("invalid value ", v) } + + pairs, _ := db.ZRange(out, 0, -1) + n, err = db.ZCount(out, 0, 0XFFFE) + + if err != nil { + t.Fatal(err.Error()) + } + if n != 3 { + t.Fatal("invalid value ", v) + } } func TestZInterStore(t *testing.T) { @@ -365,4 +375,12 @@ func TestZInterStore(t *testing.T) { t.Fatal("invalid value ", v) } + n, err = db.ZCount(out, 0, 0XFFFF) + if err != nil { + t.Fatal(err.Error()) + } + if n != 1 { + t.Fatal("invalid value ", n) + } + } From cb908a859c794597a7364f15301693e62bc2338a Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 11:59:43 +0800 Subject: [PATCH 15/27] add zunionstorecommand & zinterstorecommand in server pkg --- ledis/t_zset_test.go | 1 - server/cmd_zset.go | 117 ++++++++++++++++++++++++++++++++++++++++ server/cmd_zset_test.go | 45 ++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) diff --git a/ledis/t_zset_test.go b/ledis/t_zset_test.go index fc4f41f..a772360 100644 --- a/ledis/t_zset_test.go +++ b/ledis/t_zset_test.go @@ -315,7 +315,6 @@ func TestZUnionStore(t *testing.T) { t.Fatal("invalid value ", v) } - pairs, _ := db.ZRange(out, 0, -1) n, err = db.ZCount(out, 0, 0XFFFE) if err != nil { diff --git a/server/cmd_zset.go b/server/cmd_zset.go index e540b32..f8117fc 100644 --- a/server/cmd_zset.go +++ b/server/cmd_zset.go @@ -520,6 +520,120 @@ func zpersistCommand(req *requestContext) error { return nil } +func zparseZsetoptStore(args [][]byte) (destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte, err error) { + destKey = args[0] + nKeys, err := strconv.Atoi(ledis.String(args[1])) + if err != nil { + err = ErrValue + return + } + args = args[2:] + if len(args) < nKeys { + err = ErrSyntax + return + } + + srcKeys = args[:nKeys] + + args = args[nKeys:] + + var weightsFlag = false + var aggregateFlag = false + + for len(args) > 0 { + if strings.ToLower(ledis.String(args[0])) == "weights" { + if weightsFlag { + err = ErrSyntax + return + } + + args = args[1:] + if len(args) < nKeys { + err = ErrSyntax + return + } + + weights = make([]int64, nKeys) + for i, arg := range args[:nKeys] { + if weights[i], err = ledis.StrInt64(arg, nil); err != nil { + err = ErrValue + return + } + } + args = args[nKeys:] + + weightsFlag = true + + } else if strings.ToLower(ledis.String(args[0])) == "aggregate" { + if aggregateFlag { + err = ErrSyntax + return + } + if len(args) < 2 { + err = ErrSyntax + return + } + + if strings.ToLower(ledis.String(args[1])) == "sum" { + aggregate = ledis.AggregateSum + } else if strings.ToLower(ledis.String(args[1])) == "min" { + aggregate = ledis.AggregateMin + } else if strings.ToLower(ledis.String(args[1])) == "max" { + aggregate = ledis.AggregateMax + } else { + err = ErrSyntax + return + } + args = args[2:] + aggregateFlag = true + } else { + err = ErrSyntax + return + } + } + if !aggregateFlag { + aggregate = ledis.AggregateSum + } + return +} + +func zunionstoreCommand(req *requestContext) error { + args := req.args + if len(args) < 2 { + return ErrCmdParams + } + + destKey, srcKeys, weights, aggregate, err := zparseZsetoptStore(args) + if err != nil { + return err + } + if n, err := req.db.ZUnionStore(destKey, srcKeys, weights, aggregate); err != nil { + return err + } else { + req.resp.writeInteger(n) + } + + return nil +} + +func zinterstoreCommand(req *requestContext) error { + args := req.args + if len(args) < 2 { + return ErrCmdParams + } + + destKey, srcKeys, weights, aggregate, err := zparseZsetoptStore(args) + if err != nil { + return err + } + if n, err := req.db.ZInterStore(destKey, srcKeys, weights, aggregate); err != nil { + return err + } else { + req.resp.writeInteger(n) + } + return nil +} + func init() { register("zadd", zaddCommand) register("zcard", zcardCommand) @@ -536,6 +650,9 @@ func init() { register("zrevrangebyscore", zrevrangebyscoreCommand) register("zscore", zscoreCommand) + register("zunionstore", zunionstoreCommand) + register("zinterstore", zinterstoreCommand) + //ledisdb special command register("zclear", zclearCommand) diff --git a/server/cmd_zset_test.go b/server/cmd_zset_test.go index d9b1272..06cd538 100644 --- a/server/cmd_zset_test.go +++ b/server/cmd_zset_test.go @@ -599,3 +599,48 @@ func TestZsetErrorParams(t *testing.T) { } } + +func TestZUnionStore(t *testing.T) { + c := getTestConn() + defer c.Close() + + if _, err := c.Do("zadd", "k1", "1", "one"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k1", "2", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "1", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "2", "three"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "weights", "1", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "weights", "1", "2", "aggregate", "min")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zunionstore", "out", "2", "k1", "k2", "aggregate", "max")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } +} From 396030a06f11b91212b898fb4a0fa8140b05c4a4 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 14:23:27 +0800 Subject: [PATCH 16/27] add unit test --- server/cmd_zset_test.go | 61 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/server/cmd_zset_test.go b/server/cmd_zset_test.go index 06cd538..e6a6a70 100644 --- a/server/cmd_zset_test.go +++ b/server/cmd_zset_test.go @@ -643,4 +643,65 @@ func TestZUnionStore(t *testing.T) { t.Fatal("invalid value ", n) } } + + if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 2 { + t.Fatal("invalid value ", n) + } + } +} + +func TestZInterStore(t *testing.T) { + c := getTestConn() + defer c.Close() + + if _, err := c.Do("zadd", "k1", "1", "one"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k1", "2", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "1", "two"); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.Do("zadd", "k2", "2", "three"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "weights", "1", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "aggregate", "min", "weights", "1", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "2", "k1", "k2", "aggregate", "sum")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 3 { + t.Fatal("invalid value ", n) + } + } } From 48e99a1693961a374f9ffbea5d7e8b1b5d2117fb Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 15:38:59 +0800 Subject: [PATCH 17/27] modify unit test --- ledis/t_zset.go | 4 ++-- ledis/t_zset_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 5d6bfb3..964361e 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -899,7 +899,7 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg } for _, pair := range scorePairs { if score, ok := destMap[String(pair.Member)]; !ok { - destMap[String(pair.Member)] = pair.Score + destMap[String(pair.Member)] = pair.Score * weights[i] } else { destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) } @@ -966,7 +966,7 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg } for _, pair := range scorePairs { if score, ok := destMap[String(pair.Member)]; !ok { - destMap[String(pair.Member)] = pair.Score + destMap[String(pair.Member)] = pair.Score * weights[i] } else { keptMembers = append(keptMembers, pair.Member) destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) diff --git a/ledis/t_zset_test.go b/ledis/t_zset_test.go index a772360..a2232d3 100644 --- a/ledis/t_zset_test.go +++ b/ledis/t_zset_test.go @@ -338,7 +338,7 @@ func TestZInterStore(t *testing.T) { db.ZAdd(key2, ScorePair{2, []byte("three")}) keys := [][]byte{key1, key2} - weights := []int64{1, 2} + weights := []int64{2, 3} out := []byte("out") n, err := db.ZInterStore(out, keys, weights, AggregateSum) @@ -352,7 +352,7 @@ func TestZInterStore(t *testing.T) { if err != nil { t.Fatal(err.Error()) } - if v != 5 { + if v != 8 { t.Fatal("invalid value ", v) } @@ -370,7 +370,7 @@ func TestZInterStore(t *testing.T) { if err != nil { t.Fatal(err.Error()) } - if v != 1 { + if v != 2 { t.Fatal("invalid value ", v) } From 64773d26c5bd4407f6d388bedb72b0bd149ca53e Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 16:18:14 +0800 Subject: [PATCH 18/27] add zunionstore & zinterstore for node.js client --- client/nodejs/example.js | 15 ++++++++++++++- client/nodejs/ledis/lib/commands.js | 2 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/client/nodejs/example.js b/client/nodejs/example.js index f28c50a..101a106 100644 --- a/client/nodejs/example.js +++ b/client/nodejs/example.js @@ -35,4 +35,17 @@ client.bget("bit key 3", function(err, result){ } }); -client.quit(); +//test zunionstore & zinterstore +client.zadd("zset1", 1, "one") +client.zadd("zset1", 2, "two") + +client.zadd("zset2", 1, "one") +client.zadd("zset2", 2, "two") +client.zadd("zset2", 3, "three") + +client.zunionstore("out", 2, "zset1", "zset2", "weights", 2, 3, ledis.print) +client.zrange("out", 0, -1, "withscores", ledis.print) + +client.zinterstore("out", 2, "zset1", "zset2", "weights", 2, 3, ledis.print) +client.zrange("out", 0, -1, "withscores", ledis.print) +client.quit() diff --git a/client/nodejs/ledis/lib/commands.js b/client/nodejs/ledis/lib/commands.js index 8e5a524..e814d23 100644 --- a/client/nodejs/ledis/lib/commands.js +++ b/client/nodejs/ledis/lib/commands.js @@ -83,6 +83,8 @@ module.exports = [ "zrevrank", "zrevrangebyscore", "zscore", + "zunionstore", + "zinterstore", "zclear", From 5b49c67ad9d4afa74b992cffc66dc68de51a0f53 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Wed, 13 Aug 2014 16:28:59 +0800 Subject: [PATCH 19/27] update commands.json & commands.md for zunionstore & zinterstore --- doc/commands.json | 13 +++++++- doc/commands.md | 82 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/doc/commands.json b/doc/commands.json index 8af2c90..c51f304 100644 --- a/doc/commands.json +++ b/doc/commands.json @@ -414,5 +414,16 @@ "arguments": "key", "group": "ZSet", "readonly": true + }, + "ZUNIONSTORE":{ + "arguments": "destkey numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]", + "group": "ZSet", + "readonly": false + }, + + "ZINTERSTORE":{ + "arguments": "destkey numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]", + "group": "ZSet", + "readonly": false } -} \ No newline at end of file +} diff --git a/doc/commands.md b/doc/commands.md index 2157cbd..b4855e7 100644 --- a/doc/commands.md +++ b/doc/commands.md @@ -79,6 +79,11 @@ Table of Contents - [ZEXPIREAT key timestamp](#zexpireat-key-timestamp) - [ZTTL key](#zttl-key) - [ZPERSIST key](#zpersist-key) + - [ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] +](#zunionstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax) + - [ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] +](#zinterstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax) + - [Bitmap](#bitmap) - [BGET key](#bget-key) @@ -1629,6 +1634,83 @@ ledis> ZTTL mset (integer) -1 ``` +### ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] + +Computes the union of numkeys sorted sets given by the specified keys, and stores the result in destination. It is mandatory to provide the number of input keys (numkeys) before passing the input keys and the other (optional) arguments. + +By default, the resulting score of an element is the sum of its scores in the sorted sets where it exists. +Using the WEIGHTS option, it is possible to specify a multiplication factor for each input sorted set. This means that the score of every element in every input sorted set is multiplied by this factor before being passed to the aggregation function. When WEIGHTS is not given, the multiplication factors default to 1. + +With the AGGREGATE option, it is possible to specify how the results of the union are aggregated. This option defaults to SUM, where the score of an element is summed across the inputs where it exists. When this option is set to either MIN or MAX, the resulting set will contain the minimum or maximum score of an element across the inputs where it exists. + +If destination already exists, it is overwritten. + + +**Return value** + +int64: the number of elements in the resulting sorted set at destination. + +**Examples** + +``` +ledis> ZADD zset1 1 "one" +(interger) 1 +ledis> ZADD zset1 2 "two" +(interger) 1 +ledis> ZADD zset2 1 "one" +(interger) 1 +ledis> ZADD zset2 2 "two" +(interger) 1 +ledis> ZADD zset2 3 "three" +(interger) 1 +ledis> ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3 +(interger) 3 +ledis> ZRANGE out 0 -1 WITHSCORES +1) "one" +2) "5" +3) "three" +4) "9" +5) "two" +6) "10" +``` + +### ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX] + +Computes the intersection of numkeys sorted sets given by the specified keys, and stores the result in destination. It is mandatory to provide the number of input keys (numkeys) before passing the input keys and the other (optional) arguments. + +By default, the resulting score of an element is the sum of its scores in the sorted sets where it exists. Because intersection requires an element to be a member of every given sorted set, this results in the score of every element in the resulting sorted set to be equal to the number of input sorted sets. + +For a description of the `WEIGHTS` and `AGGREGATE` options, see [ZUNIONSTORE](#zunionstore-destination-numkeys-key-key--weights-weight-weight--aggregate-summinmax). + +If destination already exists, it is overwritten. + + + +**Return value** + +int64: the number of elements in the resulting sorted set at destination. + +**Examples** + +``` +ledis> ZADD zset1 1 "one" +(interger) 1 +ledis> ZADD zset1 2 "two" +(interger) 1 +ledis> ZADD zset2 1 "one" +(interger) 1 +ledis> ZADD zset2 2 "two" +(interger) 1 +ledis> ZADD zset2 3 "three" +(interger) 1 +ledis> ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 +(interger) 3 +ledis> ZRANGE out 0 -1 WITHSCORES +1) "one" +2) "5" +3) "two" +4) "10" +``` ## Bitmap From 4d6b42671bd5f391c255873106c65999a12da175 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Thu, 14 Aug 2014 10:37:27 +0800 Subject: [PATCH 20/27] fix misspelling --- server/client_http.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/client_http.go b/server/client_http.go index 6707e24..16f3b9a 100644 --- a/server/client_http.go +++ b/server/client_http.go @@ -19,7 +19,7 @@ var allowedContentTypes = map[string]struct{}{ "bson": struct{}{}, "msgpack": struct{}{}, } -var unsopportedCommands = map[string]struct{}{ +var unsupportedCommands = map[string]struct{}{ "slaveof": struct{}{}, "fullsync": struct{}{}, "sync": struct{}{}, @@ -87,7 +87,7 @@ func (c *httpClient) makeRequest(app *App, r *http.Request, w http.ResponseWrite } req.cmd = strings.ToLower(cmd) - if _, ok := unsopportedCommands[req.cmd]; ok { + if _, ok := unsupportedCommands[req.cmd]; ok { return nil, fmt.Errorf("unsupported command: '%s'", cmd) } @@ -210,7 +210,7 @@ func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) } func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) { - w.writeError(fmt.Errorf("unsuport")) + w.writeError(fmt.Errorf("unsupport")) } func (w *httpWriter) flush() { From 94b3dac3e797aa599de471a09715c2358170b4b2 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Thu, 14 Aug 2014 15:54:47 +0800 Subject: [PATCH 21/27] bug fix: zinterstore --- ledis/t_zset.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/ledis/t_zset.go b/ledis/t_zset.go index 964361e..972c872 100644 --- a/ledis/t_zset.go +++ b/ledis/t_zset.go @@ -939,7 +939,6 @@ func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, agg func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) { - var destMap = map[string]int64{} aggregateFunc := getAggregateFunc(aggregate) if aggregateFunc == nil { return 0, errInvalidAggregate @@ -958,20 +957,27 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg } } - var keptMembers [][]byte - for i, key := range srcKeys { + var destMap = map[string]int64{} + scorePairs, err := db.ZRange(srcKeys[0], 0, -1) + if err != nil { + return 0, err + } + for _, pair := range scorePairs { + destMap[String(pair.Member)] = pair.Score * weights[0] + } + + for i, key := range srcKeys[1:] { scorePairs, err := db.ZRange(key, 0, -1) if err != nil { return 0, err } + tmpMap := map[string]int64{} for _, pair := range scorePairs { - if score, ok := destMap[String(pair.Member)]; !ok { - destMap[String(pair.Member)] = pair.Score * weights[i] - } else { - keptMembers = append(keptMembers, pair.Member) - destMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i]) + if score, ok := destMap[String(pair.Member)]; ok { + tmpMap[String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i+1]) } } + destMap = tmpMap } t := db.zsetTx @@ -981,13 +987,12 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg db.zDelete(t, destKey) var num int64 = 0 - for _, member := range keptMembers { - score := destMap[String(member)] - if err := checkZSetKMSize(destKey, member); err != nil { + for member, score := range destMap { + if err := checkZSetKMSize(destKey, []byte(member)); err != nil { return 0, err } - if n, err := db.zSetItem(t, destKey, score, member); err != nil { + if n, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil { return 0, err } else if n == 0 { //add new @@ -1002,5 +1007,5 @@ func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, agg if err := t.Commit(); err != nil { return 0, err } - return int64(len(keptMembers)), nil + return int64(len(destMap)), nil } From 34f1795a5f8a374e8b8db64d679a1e835633e717 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Thu, 14 Aug 2014 16:10:07 +0800 Subject: [PATCH 22/27] modify unit test --- server/cmd_zset_test.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/cmd_zset_test.go b/server/cmd_zset_test.go index e6a6a70..8c74bdc 100644 --- a/server/cmd_zset_test.go +++ b/server/cmd_zset_test.go @@ -704,4 +704,36 @@ func TestZInterStore(t *testing.T) { t.Fatal("invalid value ", n) } } + + if _, err := c.Do("zadd", "k3", "3", "three"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "3", "k1", "k2", "k3", "aggregate", "sum")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 0 { + t.Fatal("invalid value ", n) + } + } + + if _, err := c.Do("zadd", "k3", "3", "two"); err != nil { + t.Fatal(err.Error()) + } + + if n, err := ledis.Int64(c.Do("zinterstore", "out", "3", "k1", "k2", "k3", "aggregate", "sum", "weights", "3", "2", "2")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 1 { + t.Fatal("invalid value ", n) + } + } + + if n, err := ledis.Int64(c.Do("zscore", "out", "two")); err != nil { + t.Fatal(err.Error()) + } else { + if n != 14 { + t.Fatal("invalid value ", n) + } + } } From a3f402c6bdda987f0d648bab575237476d69677c Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 15 Aug 2014 10:46:48 +0800 Subject: [PATCH 23/27] lmdv add nosync config, fix hyperleveldb build bug --- config/config.go | 3 ++- config/config.json | 3 ++- config/config.toml | 1 + etc/ledis.conf | 1 + store/hyperleveldb.go | 2 ++ store/hyperleveldb_test.go | 2 ++ store/mdb/mdb.go | 9 ++++++++- 7 files changed, 18 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 85ec414..0939afb 100644 --- a/config/config.go +++ b/config/config.go @@ -34,7 +34,8 @@ type LevelDBConfig struct { } type LMDBConfig struct { - MapSize int `toml:"map_size" json:"map_size"` + MapSize int `toml:"map_size" json:"map_size"` + NoSync bool `toml:"nosync" json:"nosync"` } type BinLogConfig struct { diff --git a/config/config.json b/config/config.json index 82bc668..2710e0a 100644 --- a/config/config.json +++ b/config/config.json @@ -14,7 +14,8 @@ }, "lmdb" : { - "map_size" : 524288000 + "map_size" : 524288000, + "nosync" : true }, "access_log" : "" diff --git a/config/config.toml b/config/config.toml index 6ef9a6c..573db9a 100644 --- a/config/config.toml +++ b/config/config.toml @@ -34,6 +34,7 @@ max_open_files = 1024 [lmdb] map_size = 524288000 +nosync = true [binlog] max_file_size = 0 diff --git a/etc/ledis.conf b/etc/ledis.conf index 8d92919..2097f65 100644 --- a/etc/ledis.conf +++ b/etc/ledis.conf @@ -36,6 +36,7 @@ max_open_files = 1024 [lmdb] map_size = 524288000 +nosync = true [binlog] # Set either size or num to 0 to disable binlog diff --git a/store/hyperleveldb.go b/store/hyperleveldb.go index f26631b..6ef70f1 100644 --- a/store/hyperleveldb.go +++ b/store/hyperleveldb.go @@ -1,3 +1,5 @@ +// +build hyperleveldb + package store import ( diff --git a/store/hyperleveldb_test.go b/store/hyperleveldb_test.go index 51bef8f..2d3a25b 100644 --- a/store/hyperleveldb_test.go +++ b/store/hyperleveldb_test.go @@ -1,3 +1,5 @@ +// +build hyperleveldb + package store import ( diff --git a/store/mdb/mdb.go b/store/mdb/mdb.go index 5a16463..ad767dc 100644 --- a/store/mdb/mdb.go +++ b/store/mdb/mdb.go @@ -23,6 +23,8 @@ type MDB struct { func (s Store) Open(path string, c *config.Config) (driver.IDB, error) { mapSize := c.LMDB.MapSize + noSync := c.LMDB.NoSync + if mapSize <= 0 { mapSize = 500 * 1024 * 1024 } @@ -48,7 +50,12 @@ func (s Store) Open(path string, c *config.Config) (driver.IDB, error) { } } - err = env.Open(path, mdb.NOSYNC|mdb.NOMETASYNC|mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755) + var flags uint = mdb.CREATE + if noSync { + flags |= mdb.NOSYNC | mdb.NOMETASYNC | mdb.WRITEMAP | mdb.MAPASYNC + } + + err = env.Open(path, flags, 0755) if err != nil { return MDB{}, err } From ed9f1e9ac86f696c7552eb978b4f7a98d0516b87 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 15 Aug 2014 11:42:18 +0800 Subject: [PATCH 24/27] config test bugfix --- config/config_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/config/config_test.go b/config/config_test.go index 14620ec..943c513 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -18,6 +18,7 @@ func TestConfig(t *testing.T) { dstCfg.LevelDB.CacheSize = 524288000 dstCfg.LevelDB.MaxOpenFiles = 1024 dstCfg.LMDB.MapSize = 524288000 + dstCfg.LMDB.NoSync = true cfg, err := NewConfigWithFile("./config.toml") if err != nil { From 6c9d38fab762c0523418b7da44292df1edb231a8 Mon Sep 17 00:00:00 2001 From: wenyekui Date: Fri, 15 Aug 2014 11:32:26 +0800 Subject: [PATCH 25/27] bug fix: encode bitmap meta key --- ledis/t_bit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledis/t_bit.go b/ledis/t_bit.go index 37b788d..bc208a0 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -121,7 +121,7 @@ func (db *DB) bEncodeMetaKey(key []byte) []byte { mk[0] = db.index mk[1] = BitMetaType - copy(mk, key) + copy(mk[2:], key) return mk } From 57b5b33ebb9a9e4e9587d8f2be40e942ea02cdac Mon Sep 17 00:00:00 2001 From: silentsai Date: Fri, 15 Aug 2014 15:42:25 +0800 Subject: [PATCH 26/27] fix expire fail while target key length of 1 bit --- ledis/t_bit.go | 15 ++++++++++----- ledis/t_bit_test.go | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/ledis/t_bit.go b/ledis/t_bit.go index bc208a0..f7edddd 100644 --- a/ledis/t_bit.go +++ b/ledis/t_bit.go @@ -231,15 +231,20 @@ func (db *DB) bSetMeta(t *tx, key []byte, tailSeq uint32, tailOff uint32) { } func (db *DB) bUpdateMeta(t *tx, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { - var ts, to int32 - if ts, to, err = db.bGetMeta(key); err != nil { + var tseq, toff int32 + var update bool = false + + if tseq, toff, err = db.bGetMeta(key); err != nil { return + } else if tseq < 0 { + update = true } else { - tailSeq = uint32(MaxInt32(ts, 0)) - tailOff = uint32(MaxInt32(to, 0)) + tailSeq = uint32(MaxInt32(tseq, 0)) + tailOff = uint32(MaxInt32(toff, 0)) + update = (seq > tailSeq || (seq == tailSeq && off > tailOff)) } - if seq > tailSeq || (seq == tailSeq && off > tailOff) { + if update { db.bSetMeta(t, key, seq, off) tailSeq = seq tailOff = off diff --git a/ledis/t_bit_test.go b/ledis/t_bit_test.go index a356539..9103523 100644 --- a/ledis/t_bit_test.go +++ b/ledis/t_bit_test.go @@ -41,6 +41,7 @@ func TestBinary(t *testing.T) { testOpXor(t) testOpNot(t) testMSetBit(t) + testBitExpire(t) } func testSimple(t *testing.T) { @@ -518,3 +519,20 @@ func testMSetBit(t *testing.T) { return } + +func testBitExpire(t *testing.T) { + db := getTestDB() + db.FlushAll() + + key := []byte("test_b_ttl") + + db.BSetBit(key, 0, 1) + + if res, err := db.BExpire(key, 100); res != 1 || err != nil { + t.Fatal(false) + } + + if ttl, err := db.BTTL(key); ttl != 100 || err != nil { + t.Fatal(false) + } +} From 1b22019e3dce70d9bdd6e3f0a08011b0b482d400 Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 15 Aug 2014 16:22:47 +0800 Subject: [PATCH 27/27] lmdv not support windows --- store/mdb.go | 1 + store/mdb_test.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/store/mdb.go b/store/mdb.go index 97ccf38..1e097f1 100644 --- a/store/mdb.go +++ b/store/mdb.go @@ -1,3 +1,4 @@ +// +build !windows package store import ( diff --git a/store/mdb_test.go b/store/mdb_test.go index cfada26..4cd7a6b 100644 --- a/store/mdb_test.go +++ b/store/mdb_test.go @@ -1,3 +1,5 @@ +// +build !windows + package store import (