remove ledis unsupported feature code, update testcases

This commit is contained in:
holys 2014-07-01 21:25:59 +08:00
parent 38de79a67f
commit 20ebfeafc2
5 changed files with 322 additions and 363 deletions

View File

@ -64,12 +64,6 @@ def int_or_none(response):
return int(response)
def float_or_none(response):
if response is None:
return None
return float(response)
class Ledis(object):
"""
Implementation of the Redis protocol.
@ -82,19 +76,20 @@ class Ledis(object):
"""
RESPONSE_CALLBACKS = dict_merge(
string_keys_to_dict(
'EXISTS EXPIRE EXPIREAT HEXISTS HMSET SETNX',
'EXISTS EXPIRE EXPIREAT HEXISTS HMSET SETNX '
'PERSIST HPERSIST LPERSIST ZPERSIST',
bool
),
string_keys_to_dict(
'DECRBY DEL HDEL HLEN INCRBY LLEN '
'ZADD ZCARD ZREM ZREMRANGEBYRANK ZREMRANGEBYSCORE',
'ZADD ZCARD ZREM ZREMRANGEBYRANK ZREMRANGEBYSCORE'
'LMCLEAR HMCLEAR ZMCLEAR',
int
),
string_keys_to_dict(
'LPUSH RPUSH',
lambda r: isinstance(r, long) and r or nativestr(r) == 'OK'
),
string_keys_to_dict('ZSCORE ZINCRBY', float_or_none),
string_keys_to_dict(
'MSET SELECT SLAVEOF',
lambda r: nativestr(r) == 'OK'
@ -103,7 +98,7 @@ class Ledis(object):
'ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
zset_score_pairs
),
string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
string_keys_to_dict('ZRANK ZREVRANK ZSCORE ZINCRBY', int_or_none),
{
'HGETALL': lambda r: r and pairs_to_dict(r) or {},
'PING': lambda r: nativestr(r) == 'PONG',
@ -113,11 +108,11 @@ class Ledis(object):
@classmethod
def from_url(cls, url, db=None, **kwargs):
"""
Return a Redis client object configured from the given URL.
Return a Ledis client object configured from the given URL.
For example::
redis://[:password]@localhost:6379/0
redis://[:password]@localhost:6380/0
unix://[:password]@/path/to/socket.sock?db=0
There are several ways to specify a database number. The parse function
@ -136,15 +131,14 @@ class Ledis(object):
connection_pool = ConnectionPool.from_url(url, db=db, **kwargs)
return cls(connection_pool=connection_pool)
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
def __init__(self, host='localhost', port=6380,
db=0, socket_timeout=None,
connection_pool=None, charset='utf-8',
errors='strict', decode_responses=False,
unix_socket_path=None):
if not connection_pool:
kwargs = {
'db': db,
'password': password,
'socket_timeout': socket_timeout,
'encoding': charset,
'encoding_errors': errors,
@ -337,38 +331,11 @@ class Ledis(object):
items.extend(pair)
return self.execute_command('MSET', *items)
#TODO: remove ex, px, nx, xx params.
def set(self, name, value, ex=None, px=None, nx=False, xx=False):
def set(self, name, value):
"""
Set the value at key ``name`` to ``value``
``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.
``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.
``nx`` if set to True, set the value at key ``name`` to ``value`` if it
does not already exist.
``xx`` if set to True, set the value at key ``name`` to ``value`` if it
already exists.
Set a key.
"""
pieces = [name, value]
if ex:
pieces.append('EX')
if isinstance(ex, datetime.timedelta):
ex = ex.seconds + ex.days * 24 * 3600
pieces.append(ex)
if px:
pieces.append('PX')
if isinstance(px, datetime.timedelta):
ms = int(px.microseconds / 1000)
px = (px.seconds + px.days * 24 * 3600) * 1000 + ms
pieces.append(px)
if nx:
pieces.append('NX')
if xx:
pieces.append('XX')
return self.execute_command('SET', *pieces)
def setnx(self, name, value):
@ -383,8 +350,6 @@ class Ledis(object):
return self.execute_command('PERSIST', name)
#### LIST COMMANDS ####
def lindex(self, name, index):
"""
@ -457,7 +422,6 @@ class Ledis(object):
return self.execute_command('LPERSIST', name)
#### SORTED SET COMMANDS ####
def zadd(self, name, *args, **kwargs):
"""
@ -492,8 +456,7 @@ class Ledis(object):
"Increment the score of ``value`` in sorted set ``name`` by ``amount``"
return self.execute_command('ZINCRBY', name, amount, value)
def zrange(self, name, start, end, desc=False, withscores=False,
score_cast_func=float):
def zrange(self, name, start, end, desc=False, withscores=False):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``end`` sorted in ascending order.
@ -514,7 +477,7 @@ class Ledis(object):
if withscores:
pieces.append('withscores')
options = {
'withscores': withscores, 'score_cast_func': score_cast_func}
'withscores': withscores, 'score_cast_func': int}
return self.execute_command(*pieces, **options)
def zrangebyscore(self, name, min, max, start=None, num=None,
@ -570,8 +533,7 @@ class Ledis(object):
"""
return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
def zrevrange(self, name, start, num, withscores=False,
score_cast_func=float):
def zrevrange(self, name, start, num, withscores=False):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``num`` sorted in descending order.
@ -587,11 +549,11 @@ class Ledis(object):
if withscores:
pieces.append('withscores')
options = {
'withscores': withscores, 'score_cast_func': score_cast_func}
'withscores': withscores, 'score_cast_func': int}
return self.execute_command(*pieces, **options)
def zrevrangebyscore(self, name, min, max, start=None, num=None,
withscores=False, score_cast_func=float):
withscores=False):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max`` in descending order.
@ -613,7 +575,7 @@ class Ledis(object):
if withscores:
pieces.append('withscores')
options = {
'withscores': withscores, 'score_cast_func': score_cast_func}
'withscores': withscores, 'score_cast_func': int}
return self.execute_command(*pieces, **options)
def zrevrank(self, name, value):

View File

@ -1,5 +1,5 @@
# coding: utf-8
# Test Cases for list commands
# Test Cases for hash commands
import unittest
import sys
@ -11,133 +11,135 @@ from ledis._compat import b, iteritems, itervalues
from ledis import ResponseError
l = ledis.Ledis(port=6380)
def current_time():
return datetime.datetime.now()
class TestCmdHash(unittest.TestCase):
def setUp(self):
self.l = ledis.Ledis(port=6666)
pass
def tearDown(self):
self.l.hmclear('myhash', 'a')
l.hmclear('myhash', 'a')
def test_hdel(self):
self.l.hset('myhash', 'field1', 'foo')
assert self.l.hdel('myhash', 'field1') == 1
assert self.l.hdel('myhash', 'field1') == 0
assert self.l.hdel('myhash', 'field1', 'field2') == 0
l.hset('myhash', 'field1', 'foo')
assert l.hdel('myhash', 'field1') == 1
assert l.hdel('myhash', 'field1') == 0
assert l.hdel('myhash', 'field1', 'field2') == 0
def test_hexists(self):
self.l.hset('myhash', 'field1', 'foo')
self.l.hdel('myhash', 'field2')
assert self.l.hexists('myhash', 'field1') == 1
assert self.l.hexists('myhash', 'field2') == 0
l.hset('myhash', 'field1', 'foo')
l.hdel('myhash', 'field2')
assert l.hexists('myhash', 'field1') == 1
assert l.hexists('myhash', 'field2') == 0
def test_hget(self):
self.l.hset('myhash', 'field1', 'foo')
assert self.l.hget('myhash', 'field1') == 'foo'
self.assertIsNone(self.l.hget('myhash', 'field2'))
l.hset('myhash', 'field1', 'foo')
assert l.hget('myhash', 'field1') == 'foo'
assert (l.hget('myhash', 'field2')) is None
def test_hgetall(self):
h = {'field1': 'foo', 'field2': 'bar'}
self.l.hmset('myhash', h)
assert self.l.hgetall('myhash') == h
l.hmset('myhash', h)
assert l.hgetall('myhash') == h
def test_hincrby(self):
assert self.l.hincrby('myhash', 'field1') == 1
self.l.hclear('myhash')
assert self.l.hincrby('myhash', 'field1', 1) == 1
assert self.l.hincrby('myhash', 'field1', 5) == 6
assert self.l.hincrby('myhash', 'field1', -10) == -4
assert l.hincrby('myhash', 'field1') == 1
l.hclear('myhash')
assert l.hincrby('myhash', 'field1', 1) == 1
assert l.hincrby('myhash', 'field1', 5) == 6
assert l.hincrby('myhash', 'field1', -10) == -4
def test_hkeys(self):
h = {'field1': 'foo', 'field2': 'bar'}
self.l.hmset('myhash', h)
assert self.l.hkeys('myhash') == ['field1', 'field2']
l.hmset('myhash', h)
assert l.hkeys('myhash') == ['field1', 'field2']
def test_hlen(self):
self.l.hset('myhash', 'field1', 'foo')
assert self.l.hlen('myhash') == 1
self.l.hset('myhash', 'field2', 'bar')
assert self.l.hlen('myhash') == 2
l.hset('myhash', 'field1', 'foo')
assert l.hlen('myhash') == 1
l.hset('myhash', 'field2', 'bar')
assert l.hlen('myhash') == 2
def test_hmget(self):
assert self.l.hmset('myhash', {'a': '1', 'b': '2', 'c': '3'})
assert self.l.hmget('myhash', 'a', 'b', 'c') == ['1', '2', '3']
assert l.hmset('myhash', {'a': '1', 'b': '2', 'c': '3'})
assert l.hmget('myhash', 'a', 'b', 'c') == ['1', '2', '3']
def test_hmset(self):
h = {'a': '1', 'b': '2', 'c': '3'}
assert self.l.hmset('myhash', h)
assert self.l.hgetall('myhash') == h
assert l.hmset('myhash', h)
assert l.hgetall('myhash') == h
def test_hset(self):
self.l.hclear('myhash')
assert int(self.l.hset('myhash', 'field1', 'foo')) == 1
assert self.l.hset('myhash', 'field1', 'foo') == 0
l.hclear('myhash')
assert int(l.hset('myhash', 'field1', 'foo')) == 1
assert l.hset('myhash', 'field1', 'foo') == 0
def test_hvals(self):
h = {'a': '1', 'b': '2', 'c': '3'}
self.l.hmset('myhash', h)
l.hmset('myhash', h)
local_vals = list(itervalues(h))
remote_vals = self.l.hvals('myhash')
remote_vals = l.hvals('myhash')
assert sorted(local_vals) == sorted(remote_vals)
def test_hclear(self):
h = {'a': '1', 'b': '2', 'c': '3'}
self.l.hmset('myhash', h)
assert self.l.hclear('myhash') == 3
assert self.l.hclear('myhash') == 0
l.hmset('myhash', h)
assert l.hclear('myhash') == 3
assert l.hclear('myhash') == 0
def test_hmclear(self):
h = {'a': '1', 'b': '2', 'c': '3'}
self.l.hmset('myhash1', h)
self.l.hmset('myhash2', h)
assert self.l.hmclear('myhash1', 'myhash2') == 2
l.hmset('myhash1', h)
l.hmset('myhash2', h)
assert l.hmclear('myhash1', 'myhash2') == 2
def test_hexpire(self):
assert self.l.hexpire('myhash', 100) == 0
self.l.hset('myhash', 'field1', 'foo')
assert self.l.hexpire('myhash', 100) == 1
assert self.l.httl('myhash') <= 100
assert l.hexpire('myhash', 100) == 0
l.hset('myhash', 'field1', 'foo')
assert l.hexpire('myhash', 100) == 1
assert l.httl('myhash') <= 100
def test_hexpireat_datetime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.hset('a', 'f', 'foo')
assert self.l.hexpireat('a', expire_at)
assert 0 < self.l.httl('a') <= 61
l.hset('a', 'f', 'foo')
assert l.hexpireat('a', expire_at)
assert 0 < l.httl('a') <= 61
def test_hexpireat_unixtime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.hset('a', 'f', 'foo')
l.hset('a', 'f', 'foo')
expire_at_seconds = int(time.mktime(expire_at.timetuple()))
assert self.l.hexpireat('a', expire_at_seconds)
assert 0 < self.l.httl('a') <= 61
assert l.hexpireat('a', expire_at_seconds)
assert 0 < l.httl('a') <= 61
def test_zexpireat_no_key(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
assert not self.l.hexpireat('a', expire_at)
assert not l.hexpireat('a', expire_at)
def test_hexpireat(self):
assert self.l.hexpireat('myhash', 1577808000) == 0
self.l.hset('myhash', 'field1', 'foo')
assert self.l.hexpireat('myhash', 1577808000) == 1
assert l.hexpireat('myhash', 1577808000) == 0
l.hset('myhash', 'field1', 'foo')
assert l.hexpireat('myhash', 1577808000) == 1
def test_httl(self):
self.l.hset('myhash', 'field1', 'foo')
assert self.l.hexpire('myhash', 100)
assert self.l.httl('myhash') <= 100
l.hset('myhash', 'field1', 'foo')
assert l.hexpire('myhash', 100)
assert l.httl('myhash') <= 100
def test_hpersist(self):
self.l.hset('myhash', 'field1', 'foo')
self.l.hexpire('myhash', 100)
assert self.l.httl('myhash') <= 100
assert self.l.hpersist('myhash')
assert self.l.httl('myhash') == -1
l.hset('myhash', 'field1', 'foo')
l.hexpire('myhash', 100)
assert l.httl('myhash') <= 100
assert l.hpersist('myhash')
assert l.httl('myhash') == -1

View File

@ -9,6 +9,8 @@ sys.path.append('..')
import ledis
from ledis._compat import b, iteritems
l = ledis.Ledis(port=6380)
def current_time():
return datetime.datetime.now()
@ -16,141 +18,141 @@ def current_time():
class TestCmdKv(unittest.TestCase):
def setUp(self):
self.l = ledis.Ledis(port=6666)
pass
def tearDown(self):
self.l.delete('a', 'b', 'c', 'non_exist_key')
l.delete('a', 'b', 'c', 'non_exist_key')
def test_decr(self):
assert self.l.delete('a') == 1
assert self.l.decr('a') == -1
assert self.l['a'] == b('-1')
assert self.l.decr('a') == -2
assert self.l['a'] == b('-2')
assert self.l.decr('a', amount=5) == -7
assert self.l['a'] == b('-7')
assert l.delete('a') == 1
assert l.decr('a') == -1
assert l['a'] == b('-1')
assert l.decr('a') == -2
assert l['a'] == b('-2')
assert l.decr('a', amount=5) == -7
assert l['a'] == b('-7')
#FIXME: how to test exception?
# self.l.set('b', '234293482390480948029348230948')
# self.assertRaises(ResponseError, self.l.delete('b'))
# l.set('b', '234293482390480948029348230948')
# self.assertRaises(ResponseError, l.delete('b'))
def test_decrby(self):
assert self.l.delete('a') == 1
assert self.l.decrby('a') == -1
assert self.l['a'] == b('-1')
assert self.l.decrby('a') == -2
assert self.l['a'] == b('-2')
assert self.l.decrby('a', amount=5) == -7
assert self.l['a'] == b('-7')
assert l.delete('a') == 1
assert l.decrby('a') == -1
assert l['a'] == b('-1')
assert l.decrby('a') == -2
assert l['a'] == b('-2')
assert l.decrby('a', amount=5) == -7
assert l['a'] == b('-7')
def test_del(self):
assert self.l.delete('a') == 1
assert self.l.delete('a', 'b', 'c') == 3
assert l.delete('a') == 1
assert l.delete('a', 'b', 'c') == 3
def test_exists(self):
self.l.delete('a', 'non_exist_key')
self.l.set('a', 'hello')
self.assertTrue(self.l.exists('a'))
self.assertFalse(self.l.exists('non_exist_key'))
l.delete('a', 'non_exist_key')
l.set('a', 'hello')
assert (l.exists('a'))
assert not (l.exists('non_exist_key'))
def test_get(self):
self.l.set('a', 'hello')
assert self.l.get('a') == 'hello'
self.l.set('b', '中文')
assert self.l.get('b') == '中文'
self.l.delete('non_exist_key')
self.assertIsNone(self.l.get('non_exist_key'))
l.set('a', 'hello')
assert l.get('a') == 'hello'
l.set('b', '中文')
assert l.get('b') == '中文'
l.delete('non_exist_key')
assert (l.get('non_exist_key')) is None
def test_getset(self):
self.l.set('a', 'hello')
assert self.l.getset('a', 'world') == 'hello'
assert self.l.get('a') == 'world'
self.l.delete('non_exist_key')
self.assertIsNone(self.l.getset('non_exist_key', 'non'))
l.set('a', 'hello')
assert l.getset('a', 'world') == 'hello'
assert l.get('a') == 'world'
l.delete('non_exist_key')
assert (l.getset('non_exist_key', 'non')) is None
def test_incr(self):
self.l.delete('non_exist_key')
assert self.l.incr('non_exist_key') == 1
self.l.set('a', 100)
assert self.l.incr('a') == 101
l.delete('non_exist_key')
assert l.incr('non_exist_key') == 1
l.set('a', 100)
assert l.incr('a') == 101
def test_incrby(self):
self.l.delete('a')
assert self.l.incrby('a', 100) == 100
l.delete('a')
assert l.incrby('a', 100) == 100
self.l.set('a', 100)
assert self.l.incrby('a', 100) == 200
assert self.l.incrby('a', amount=100) == 300
l.set('a', 100)
assert l.incrby('a', 100) == 200
assert l.incrby('a', amount=100) == 300
def test_mget(self):
self.l.set('a', 'hello')
self.l.set('b', 'world')
self.l.delete('non_exist_key')
assert self.l.mget('a', 'b', 'non_exist_key') == ['hello', 'world', None]
self.l.delete('a', 'b')
assert self.l.mget(['a', 'b']) == [None, None]
l.set('a', 'hello')
l.set('b', 'world')
l.delete('non_exist_key')
assert l.mget('a', 'b', 'non_exist_key') == ['hello', 'world', None]
l.delete('a', 'b')
assert l.mget(['a', 'b']) == [None, None]
def test_mset(self):
d = {'a': b('1'), 'b': b('2'), 'c': b('3')}
assert self.l.mset(**d)
assert l.mset(**d)
for k, v in iteritems(d):
assert self.l[k] == v
assert l[k] == v
def test_set(self):
self.assertTrue(self.l.set('a', 100))
assert (l.set('a', 100))
def test_setnx(self):
self.l.delete('a')
assert self.l.setnx('a', '1')
assert self.l['a'] == b('1')
assert not self.l.setnx('a', '2')
assert self.l['a'] == b('1')
l.delete('a')
assert l.setnx('a', '1')
assert l['a'] == b('1')
assert not l.setnx('a', '2')
assert l['a'] == b('1')
def test_ttl(self):
assert self.l.set('a', 'hello')
assert self.l.expire('a', 100)
assert self.l.ttl('a') <= 100
self.l.delete('a')
assert self.l.ttl('a') == -1
self.l.set('a', 'hello')
assert self.l.ttl('a') == -1
assert l.set('a', 'hello')
assert l.expire('a', 100)
assert l.ttl('a') <= 100
l.delete('a')
assert l.ttl('a') == -1
l.set('a', 'hello')
assert l.ttl('a') == -1
def test_persist(self):
assert self.l.set('a', 'hello')
assert self.l.expire('a', 100)
assert self.l.ttl('a') <= 100
assert self.l.persist('a')
self.l.delete('non_exist_key')
assert not self.l.persist('non_exist_key')
assert l.set('a', 'hello')
assert l.expire('a', 100)
assert l.ttl('a') <= 100
assert l.persist('a')
l.delete('non_exist_key')
assert not l.persist('non_exist_key')
def test_expire(self):
assert not self.l.expire('a', 100)
assert not l.expire('a', 100)
self.l.set('a', 'hello')
self.assertTrue(self.l.expire('a', 100))
self.l.delete('a')
self.assertFalse(self.l.expire('a', 100))
l.set('a', 'hello')
assert (l.expire('a', 100))
l.delete('a')
assert not (l.expire('a', 100))
def test_expireat_datetime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.set('a', '1')
assert self.l.expireat('a', expire_at)
assert 0 < self.l.ttl('a') <= 61
l.set('a', '1')
assert l.expireat('a', expire_at)
assert 0 < l.ttl('a') <= 61
def test_expireat_unixtime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.set('a', '1')
l.set('a', '1')
expire_at_seconds = int(time.mktime(expire_at.timetuple()))
assert self.l.expireat('a', expire_at_seconds)
assert 0 < self.l.ttl('a') <= 61
assert l.expireat('a', expire_at_seconds)
assert 0 < l.ttl('a') <= 61
def test_expireat_no_key(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
assert not self.l.expireat('a', expire_at)
assert not l.expireat('a', expire_at)
def test_expireat(self):
self.l.set('a', 'hello')
self.assertTrue(self.l.expireat('a', 1577808000)) # time is 2020.1.1
self.l.delete('a')
self.assertFalse(self.l.expireat('a', 1577808000))
l.set('a', 'hello')
assert (l.expireat('a', 1577808000)) # time is 2020.1.1
l.delete('a')
assert not(l.expireat('a', 1577808000))

View File

@ -7,6 +7,9 @@ import sys
sys.path.append('..')
import ledis
from ledis._compat import b
l = ledis.Ledis(port=6380)
def current_time():
@ -15,92 +18,92 @@ def current_time():
class TestCmdList(unittest.TestCase):
def setUp(self):
self.l = ledis.Ledis(port=6666)
pass
def tearDown(self):
self.l.lmclear('mylist', 'mylist1', 'mylist2')
l.lmclear('mylist', 'mylist1', 'mylist2')
def test_lindex(self):
self.l.rpush('mylist', '1', '2', '3')
assert self.l.lindex('mylist', 0) == '1'
assert self.l.lindex('mylist', 1) == '2'
assert self.l.lindex('mylist', 2) == '3'
l.rpush('mylist', '1', '2', '3')
assert l.lindex('mylist', 0) == b('1')
assert l.lindex('mylist', 1) == b('2')
assert l.lindex('mylist', 2) == b('3')
def test_llen(self):
self.l.rpush('mylist', '1', '2', '3')
assert self.l.llen('mylist') == 3
l.rpush('mylist', '1', '2', '3')
assert l.llen('mylist') == 3
def test_lpop(self):
self.l.rpush('mylist', '1', '2', '3')
assert self.l.lpop('mylist') == '1'
assert self.l.lpop('mylist') == '2'
assert self.l.lpop('mylist') == '3'
assert self.l.lpop('mylist') is None
l.rpush('mylist', '1', '2', '3')
assert l.lpop('mylist') == b('1')
assert l.lpop('mylist') == b('2')
assert l.lpop('mylist') == b('3')
assert l.lpop('mylist') is None
def test_lpush(self):
assert self.l.lpush('mylist', '1') == 1
assert self.l.lpush('mylist', '2') == 2
assert self.l.lpush('mylist', '3', '4', '5') == 5
assert self.l.lrange('mylist', 0, -1) == ['5', '4', '3', '2', '1']
assert l.lpush('mylist', '1') == 1
assert l.lpush('mylist', '2') == 2
assert l.lpush('mylist', '3', '4', '5') == 5
assert l.lrange('mylist', 0, -1) == ['5', '4', '3', '2', '1']
def test_lrange(self):
self.l.rpush('mylist', '1', '2', '3', '4', '5')
assert self.l.lrange('mylist', 0, 2) == ['1', '2', '3']
assert self.l.lrange('mylist', 2, 10) == ['3', '4', '5']
assert self.l.lrange('mylist', 0, -1) == ['1', '2', '3', '4', '5']
l.rpush('mylist', '1', '2', '3', '4', '5')
assert l.lrange('mylist', 0, 2) == ['1', '2', '3']
assert l.lrange('mylist', 2, 10) == ['3', '4', '5']
assert l.lrange('mylist', 0, -1) == ['1', '2', '3', '4', '5']
def test_rpush(self):
assert self.l.rpush('mylist', '1') == 1
assert self.l.rpush('mylist', '2') == 2
assert self.l.rpush('mylist', '3', '4') == 4
assert self.l.lrange('mylist', 0, -1) == ['1', '2', '3', '4']
assert l.rpush('mylist', '1') == 1
assert l.rpush('mylist', '2') == 2
assert l.rpush('mylist', '3', '4') == 4
assert l.lrange('mylist', 0, -1) == ['1', '2', '3', '4']
def test_rpop(self):
self.l.rpush('mylist', '1', '2', '3')
assert self.l.rpop('mylist') == '3'
assert self.l.rpop('mylist') == '2'
assert self.l.rpop('mylist') == '1'
assert self.l.rpop('mylist') is None
l.rpush('mylist', '1', '2', '3')
assert l.rpop('mylist') == b('3')
assert l.rpop('mylist') == b('2')
assert l.rpop('mylist') == b('1')
assert l.rpop('mylist') is None
def test_lclear(self):
self.l.rpush('mylist', '1', '2', '3')
assert self.l.lclear('mylist') == 3
assert self.l.lclear('mylist') == 0
l.rpush('mylist', '1', '2', '3')
assert l.lclear('mylist') == 3
assert l.lclear('mylist') == 0
def test_lmclear(self):
self.l.rpush('mylist1', '1', '2', '3')
self.l.rpush('mylist2', '1', '2', '3')
assert self.l.lmclear('mylist1', 'mylist2') == 2
l.rpush('mylist1', '1', '2', '3')
l.rpush('mylist2', '1', '2', '3')
assert l.lmclear('mylist1', 'mylist2') == 2
def test_lexpire(self):
assert not self.l.lexpire('mylist', 100)
self.l.rpush('mylist', '1')
assert self.l.lexpire('mylist', 100)
assert 0 < self.l.lttl('mylist') <= 100
assert self.l.lpersist('mylist')
assert self.l.lttl('mylist') == -1
assert not l.lexpire('mylist', 100)
l.rpush('mylist', '1')
assert l.lexpire('mylist', 100)
assert 0 < l.lttl('mylist') <= 100
assert l.lpersist('mylist')
assert l.lttl('mylist') == -1
def test_lexpireat_datetime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.rpush('mylist', '1')
assert self.l.lexpireat('mylist', expire_at)
assert 0 < self.l.lttl('mylist') <= 61
l.rpush('mylist', '1')
assert l.lexpireat('mylist', expire_at)
assert 0 < l.lttl('mylist') <= 61
def test_lexpireat_unixtime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.rpush('mylist', '1')
l.rpush('mylist', '1')
expire_at_seconds = int(time.mktime(expire_at.timetuple()))
assert self.l.lexpireat('mylist', expire_at_seconds)
assert self.l.lttl('mylist') <= 61
assert l.lexpireat('mylist', expire_at_seconds)
assert l.lttl('mylist') <= 61
def test_lexpireat_no_key(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
assert not self.l.lexpireat('mylist', expire_at)
assert not l.lexpireat('mylist', expire_at)
def test_lttl_and_lpersist(self):
self.l.rpush('mylist', '1')
self.l.lexpire('mylist', 100)
assert 0 < self.l.lttl('mylist') <= 100
assert self.l.lpersist('mylist')
assert self.l.lttl('mylist') == -1
l.rpush('mylist', '1')
l.lexpire('mylist', 100)
assert 0 < l.lttl('mylist') <= 100
assert l.lpersist('mylist')
assert l.lttl('mylist') == -1

View File

@ -1,5 +1,5 @@
# coding: utf-8
# Test Cases for list commands
# Test Cases for zset commands
import unittest
import sys
@ -10,171 +10,161 @@ import ledis
from ledis._compat import b, iteritems
from ledis import ResponseError
l = ledis.Ledis(port=6380)
def current_time():
return datetime.datetime.now()
class TestCmdZset(unittest.TestCase):
def setUp(self):
self.l = ledis.Ledis(port=6666)
pass
def tearDown(self):
self.l.zclear('a')
l.zclear('a')
def test_zadd(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zrange('a', 0, -1) == ['a1', 'a2', 'a3']
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zrange('a', 0, -1) == [b('a1'), b('a2'), b('a3')]
def test_zcard(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zcard('a') == 3
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zcard('a') == 3
def test_zcount(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zcount('a', '-inf', '+inf') == 3
assert self.l.zcount('a', 1, 2) == 2
assert self.l.zcount('a', 10, 20) == 0
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zcount('a', '-inf', '+inf') == 3
assert l.zcount('a', 1, 2) == 2
assert l.zcount('a', 10, 20) == 0
def test_zincrby(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zincrby('a', 'a2') == 3.0
assert self.l.zincrby('a', 'a3', amount=5) == 8.0
assert self.l.zscore('a', 'a2') == 3.0
assert self.l.zscore('a', 'a3') == 8.0
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zincrby('a', 'a2') == 3
assert l.zincrby('a', 'a3', amount=5) == 8
assert l.zscore('a', 'a2') == 3
assert l.zscore('a', 'a3') == 8
def test_zrange(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zrange('a', 0, 1) == ['a1', 'a2']
assert self.l.zrange('a', 2, 3) == ['a3']
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zrange('a', 0, 1) == [b('a1'), b('a2')]
assert l.zrange('a', 2, 3) == [b('a3')]
#withscores
assert self.l.zrange('a', 0, 1, withscores=True) == \
[('a1', 1.0), ('a2', 2.0)]
assert self.l.zrange('a', 2, 3, withscores=True) == \
[('a3', 3.0)]
assert l.zrange('a', 0, 1, withscores=True) == \
[(b('a1'), 1), (b('a2'), 2)]
assert l.zrange('a', 2, 3, withscores=True) == \
[(b('a3'), 3)]
def test_zrangebyscore(self):
self.l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert self.l.zrangebyscore('a', 2, 4) == ['a2', 'a3', 'a4']
l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert l.zrangebyscore('a', 2, 4) == [b('a2'), b('a3'), b('a4')]
# slicing with start/num
assert self.l.zrangebyscore('a', 2, 4, start=1, num=2) == \
['a3', 'a4']
assert l.zrangebyscore('a', 2, 4, start=1, num=2) == \
[b('a3'), b('a4')]
# withscores
assert self.l.zrangebyscore('a', 2, 4, withscores=True) == \
[('a2', 2.0), ('a3', 3.0), ('a4', 4.0)]
# custom score function
assert self.l.zrangebyscore('a', 2, 4, withscores=True,
score_cast_func=int) == \
assert l.zrangebyscore('a', 2, 4, withscores=True) == \
[('a2', 2), ('a3', 3), ('a4', 4)]
def test_rank(self):
self.l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert self.l.zrank('a', 'a1') == 0
assert self.l.zrank('a', 'a3') == 2
assert self.l.zrank('a', 'a6') is None
l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert l.zrank('a', 'a1') == 0
assert l.zrank('a', 'a3') == 2
assert l.zrank('a', 'a6') is None
def test_zrem(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zrem('a', 'a2') == 1
assert self.l.zrange('a', 0, -1) == ['a1', 'a3']
assert self.l.zrem('a', 'b') == 0
assert self.l.zrange('a', 0, -1) == ['a1', 'a3']
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zrem('a', 'a2') == 1
assert l.zrange('a', 0, -1) == [b('a1'), b('a3')]
assert l.zrem('a', 'b') == 0
assert l.zrange('a', 0, -1) == [b('a1'), b('a3')]
# multiple keys
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zrem('a', 'a1', 'a2') == 2
assert self.l.zrange('a', 0, -1) == ['a3']
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zrem('a', 'a1', 'a2') == 2
assert l.zrange('a', 0, -1) == [b('a3')]
def test_zremrangebyrank(self):
self.l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert self.l.zremrangebyrank('a', 1, 3) == 3
assert self.l.zrange('a', 0, -1) == ['a1', 'a5']
l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert l.zremrangebyrank('a', 1, 3) == 3
assert l.zrange('a', 0, -1) == [b('a1'), b('a5')]
def test_zremrangebyscore(self):
self.l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert self.l.zremrangebyscore('a', 2, 4) == 3
assert self.l.zrange('a', 0, -1) == ['a1', 'a5']
assert self.l.zremrangebyscore('a', 2, 4) == 0
assert self.l.zrange('a', 0, -1) == ['a1', 'a5']
l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert l.zremrangebyscore('a', 2, 4) == 3
assert l.zrange('a', 0, -1) == [b('a1'), b('a5')]
assert l.zremrangebyscore('a', 2, 4) == 0
assert l.zrange('a', 0, -1) == [b('a1'), b('a5')]
def test_zrevrange(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zrevrange('a', 0, 1) == ['a3', 'a2']
assert self.l.zrevrange('a', 1, 2) == ['a2', 'a1']
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zrevrange('a', 0, 1) == [b('a3'), b('a2')]
assert l.zrevrange('a', 1, 2) == [b('a2'), b('a1')]
def test_zrevrank(self):
self.l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert self.l.zrevrank('a', 'a1') == 4
assert self.l.zrevrank('a', 'a2') == 3
assert self.l.zrevrank('a', 'a6') is None
l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert l.zrevrank('a', 'a1') == 4
assert l.zrevrank('a', 'a2') == 3
assert l.zrevrank('a', 'a6') is None
def test_zrevrangebyscore(self):
self.l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert self.l.zrevrangebyscore('a', 4, 2) == ['a4', 'a3', 'a2']
l.zadd('a', a1=1, a2=2, a3=3, a4=4, a5=5)
assert l.zrevrangebyscore('a', 4, 2) == [b('a4'), b('a3'), b('a2')]
# slicing with start/num
assert self.l.zrevrangebyscore('a', 4, 2, start=1, num=2) == \
['a3', 'a2']
assert l.zrevrangebyscore('a', 4, 2, start=1, num=2) == \
[b('a3'), b('a2')]
# withscores
assert self.l.zrevrangebyscore('a', 4, 2, withscores=True) == \
[('a4', 4.0), ('a3', 3.0), ('a2', 2.0)]
# custom score function
assert self.l.zrevrangebyscore('a', 4, 2, withscores=True,
score_cast_func=int) == \
[('a4', 4), ('a3', 3), ('a2', 2)]
assert l.zrevrangebyscore('a', 4, 2, withscores=True) == \
[(b('a4'), 4), (b('a3'), 3), (b('a2'), 2)]
def test_zscore(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zscore('a', 'a1') == 1.0
assert self.l.zscore('a', 'a2') == 2.0
assert self.l.zscore('a', 'a4') is None
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zscore('a', 'a1') == 1
assert l.zscore('a', 'a2') == 2
assert l.zscore('a', 'a4') is None
def test_zclear(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zclear('a') == 3
assert self.l.zclear('a') == 0
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zclear('a') == 3
assert l.zclear('a') == 0
def test_zmclear(self):
self.l.zadd('a', a1=1, a2=2, a3=3)
self.l.zadd('b', b1=1, b2=2, b3=3)
assert self.l.lmclear('a', 'b') == 2
assert self.l.lmclear('c', 'd') == 2
l.zadd('a', a1=1, a2=2, a3=3)
l.zadd('b', b1=1, b2=2, b3=3)
assert l.lmclear('a', 'b') == 2
assert l.lmclear('c', 'd') == 2
def test_zexpire(self):
assert not self.l.zexpire('a', 100)
self.l.zadd('a', a1=1, a2=2, a3=3)
assert self.l.zexpire('a', 100)
assert 0 < self.l.zttl('a') <= 100
assert self.l.zpersist('a')
assert self.l.zttl('a') == -1
assert not l.zexpire('a', 100)
l.zadd('a', a1=1, a2=2, a3=3)
assert l.zexpire('a', 100)
assert 0 < l.zttl('a') <= 100
assert l.zpersist('a')
assert l.zttl('a') == -1
def test_zexpireat_datetime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.zadd('a', a1=1)
assert self.l.zexpireat('a', expire_at)
assert 0 < self.l.zttl('a') <= 61
l.zadd('a', a1=1)
assert l.zexpireat('a', expire_at)
assert 0 < l.zttl('a') <= 61
def test_zexpireat_unixtime(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
self.l.zadd('a', a1=1)
l.zadd('a', a1=1)
expire_at_seconds = int(time.mktime(expire_at.timetuple()))
assert self.l.zexpireat('a', expire_at_seconds)
assert 0 < self.l.zttl('a') <= 61
assert l.zexpireat('a', expire_at_seconds)
assert 0 < l.zttl('a') <= 61
def test_zexpireat_no_key(self):
expire_at = current_time() + datetime.timedelta(minutes=1)
assert not self.l.zexpireat('a', expire_at)
assert not l.zexpireat('a', expire_at)
def test_zttl_and_zpersist(self):
self.l.zadd('a', a1=1)
self.l.zexpire('a', 100)
assert 0 < self.l.zttl('a') <= 100
assert self.l.zpersist('a')
assert self.l.zttl('a') == -1
l.zadd('a', a1=1)
l.zexpire('a', 100)
assert 0 < l.zttl('a') <= 100
assert l.zpersist('a')
assert l.zttl('a') == -1