ledisdb/tools/redis_import/test.py

127 lines
2.7 KiB
Python
Raw Normal View History

2014-08-08 06:40:52 +04:00
#coding: utf-8
import random, string
import redis
2014-08-08 14:33:45 +04:00
import ledis
2014-08-08 06:40:52 +04:00
2014-08-08 14:33:45 +04:00
from redis_import import copy, scan, set_ttl
rds = redis.Redis()
lds = ledis.Ledis(port=6380)
2014-08-08 06:40:52 +04:00
def random_word(words, length):
return ''.join(random.choice(words) for i in range(length))
def get_words():
word_file = "/usr/share/dict/words"
words = open(word_file).read().splitlines()
2014-08-08 14:33:45 +04:00
return words[:1000]
2014-08-08 06:40:52 +04:00
def get_mapping(words, length=1000):
d = {}
for word in words:
d[word] = random.randint(1, length)
return d
2014-08-08 14:33:45 +04:00
def random_string(client, words, length=1000):
2014-08-08 06:40:52 +04:00
d = get_mapping(words, length)
client.mset(d)
2014-08-08 14:33:45 +04:00
def random_hash(client, words, length=1000):
2014-08-08 06:40:52 +04:00
d = get_mapping(words, length)
client.hmset("hashName", d)
2014-08-08 14:33:45 +04:00
def random_list(client, words, length=1000):
2014-08-08 06:40:52 +04:00
client.lpush("listName", *words)
2014-08-08 14:33:45 +04:00
def random_zset(client, words, length=1000):
2014-08-08 06:40:52 +04:00
d = get_mapping(words, length)
2014-08-08 14:33:45 +04:00
client.zadd("zsetName", **d)
def random_set(client, words, length=1000):
client.sadd("setName", *words)
2014-08-08 06:40:52 +04:00
def test():
words = get_words()
print "Flush all redis data before insert new."
rds.flushall()
2014-08-08 14:33:45 +04:00
random_string(rds, words)
print "random_string done"
random_hash(rds, words)
print "random_hash done"
random_list(rds, words)
print "random_list done"
random_zset(rds, words)
print "random_zset done"
random_set(rds, words)
print "random_set done"
2014-08-08 06:40:52 +04:00
2014-08-08 14:33:45 +04:00
copy(rds, lds, convert=True)
2014-08-08 06:40:52 +04:00
# for all keys
2014-08-08 14:33:45 +04:00
keys = scan(rds, 1000)
2014-08-08 06:40:52 +04:00
for key in keys:
if rds.type(key) == "string" and not lds.exists(key):
print key
print "String data not consistent"
# for list
l1 = rds.lrange("listName", 0, -1)
l2 = lds.lrange("listName", 0, -1)
assert l1 == l2
#for hash
for key in keys:
2014-08-08 14:33:45 +04:00
if rds.type(key) == "hash":
assert rds.hgetall(key) == lds.hgetall(key)
assert sorted(rds.hkeys(key)) == sorted(lds.hkeys(key))
assert sorted(rds.hvals(key)) == sorted(lds.hvals(key))
2014-08-08 06:40:52 +04:00
# for zset
2014-08-08 14:33:45 +04:00
z1 = rds.zrange("zsetName", 0, -1, withscores=True)
z2 = lds.zrange("zsetName", 0, -1, withscores=True)
2014-08-08 06:40:52 +04:00
assert z1 == z2
2014-08-08 14:33:45 +04:00
def ledis_ttl(ledis_client, key, k_type):
ttls = {
"string": lds.ttl,
"list": lds.lttl,
"hash": lds.httl,
"zset": lds.zttl,
"set": lds.sttl,
2014-08-08 14:33:45 +04:00
}
return ttls[k_type](key)
def test_ttl():
keys, total = scan(rds, 1000)
for key in keys:
k_type = rds.type(key)
2014-08-11 06:10:36 +04:00
rds.expire(key, (60 * 60 * 24))
2014-08-08 14:33:45 +04:00
set_ttl(rds, lds, key, k_type)
2014-08-11 06:10:36 +04:00
if rds.ttl(key):
assert ledis_ttl(lds, key, k_type) > 0
2014-08-08 06:40:52 +04:00
if __name__ == "__main__":
rds.flushdb()
lds.flushdb()
2014-08-08 06:40:52 +04:00
test()
2014-08-08 14:33:45 +04:00
test_ttl()
2014-08-08 06:40:52 +04:00
print "Test passed."