forked from mirror/ledisdb
Merge branch 'client-feature' into develop
This commit is contained in:
commit
c00047cc6b
|
@ -1,6 +1,7 @@
|
|||
from __future__ import with_statement
|
||||
import datetime
|
||||
import time as mod_time
|
||||
from itertools import chain, starmap
|
||||
from ledis._compat import (b, izip, imap, iteritems,
|
||||
basestring, long, nativestr, bytes)
|
||||
from ledis.connection import ConnectionPool, UnixDomainSocketConnection
|
||||
|
@ -8,6 +9,8 @@ from ledis.exceptions import (
|
|||
ConnectionError,
|
||||
DataError,
|
||||
LedisError,
|
||||
ResponseError,
|
||||
TxNotBeginError
|
||||
)
|
||||
|
||||
SYM_EMPTY = b('')
|
||||
|
@ -169,6 +172,26 @@ class Ledis(object):
|
|||
def set_response_callback(self, command, callback):
|
||||
"Set a custom Response Callback"
|
||||
self.response_callbacks[command] = callback
|
||||
|
||||
|
||||
# def pipeline(self, transaction=True, shard_hint=None):
|
||||
# """
|
||||
# Return a new pipeline object that can queue multiple commands for
|
||||
# later execution. ``transaction`` indicates whether all commands
|
||||
# should be executed atomically. Apart from making a group of operations
|
||||
# atomic, pipelines are useful for reducing the back-and-forth overhead
|
||||
# between the client and server.
|
||||
# """
|
||||
# return StrictPipeline(
|
||||
# self.connection_pool,
|
||||
# self.response_callbacks,
|
||||
# transaction,
|
||||
# shard_hint)
|
||||
|
||||
def tx(self):
|
||||
return Transaction(
|
||||
self.connection_pool,
|
||||
self.response_callbacks)
|
||||
|
||||
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
|
||||
|
||||
|
@ -868,4 +891,44 @@ class Ledis(object):
|
|||
|
||||
def bpersist(self, name):
|
||||
"Removes an expiration on name"
|
||||
return self.execute_command('BPERSIST', name)
|
||||
return self.execute_command('BPERSIST', name)
|
||||
|
||||
|
||||
class Transaction(Ledis):
|
||||
def __init__(self, connection_pool, response_callbacks):
|
||||
self.connection_pool = connection_pool
|
||||
self.response_callbacks = response_callbacks
|
||||
self.connection = None
|
||||
|
||||
def execute_command(self, *args, **options):
|
||||
"Execute a command and return a parsed response"
|
||||
command_name = args[0]
|
||||
|
||||
connection = self.connection
|
||||
if self.connection is None:
|
||||
raise TxNotBeginError
|
||||
|
||||
try:
|
||||
connection.send_command(*args)
|
||||
return self.parse_response(connection, command_name, **options)
|
||||
except ConnectionError:
|
||||
connection.disconnect()
|
||||
connection.send_command(*args)
|
||||
return self.parse_response(connection, command_name, **options)
|
||||
|
||||
def begin(self):
|
||||
self.connection = self.connection_pool.get_connection('begin')
|
||||
return self.execute_command("BEGIN")
|
||||
|
||||
def commit(self):
|
||||
res = self.execute_command("COMMIT")
|
||||
self.connection_pool.release(self.connection)
|
||||
self.connection = None
|
||||
return res
|
||||
|
||||
def rollback(self):
|
||||
res = self.execute_command("ROLLBACK")
|
||||
self.connection_pool.release(self.connection)
|
||||
self.connection = None
|
||||
return res
|
||||
|
||||
|
|
|
@ -16,6 +16,10 @@ class BusyLoadingError(ConnectionError):
|
|||
pass
|
||||
|
||||
|
||||
class TimeoutError(LedisError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidResponse(ServerError):
|
||||
pass
|
||||
|
||||
|
@ -30,3 +34,6 @@ class DataError(LedisError):
|
|||
|
||||
class ExecAbortError(ResponseError):
|
||||
pass
|
||||
|
||||
class TxNotBeginError(LedisError):
|
||||
pass
|
|
@ -0,0 +1,38 @@
|
|||
import unittest
|
||||
import sys
|
||||
sys.path.append("..")
|
||||
|
||||
import ledis
|
||||
|
||||
|
||||
class TestTx(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.l = ledis.Ledis(port=6380)
|
||||
|
||||
def tearDown(self):
|
||||
self.l.delete("a")
|
||||
|
||||
def test_commit(self):
|
||||
tx = self.l.tx()
|
||||
self.l.set("a", "no-tx")
|
||||
assert self.l.get("a") == "no-tx"
|
||||
tx.begin()
|
||||
tx.set("a", "tx")
|
||||
assert self.l.get("a") == "no-tx"
|
||||
assert tx.get("a") == "tx"
|
||||
|
||||
tx.commit()
|
||||
assert self.l.get("a") == "tx"
|
||||
|
||||
def test_rollback(self):
|
||||
tx = self.l.tx()
|
||||
self.l.set("a", "no-tx")
|
||||
assert self.l.get("a") == "no-tx"
|
||||
|
||||
tx.begin()
|
||||
tx.set("a", "tx")
|
||||
assert tx.get("a") == "tx"
|
||||
assert self.l.get("a") == "no-tx"
|
||||
|
||||
tx.rollback()
|
||||
assert self.l.get("a") == "no-tx"
|
|
@ -137,7 +137,12 @@ local commands = {
|
|||
--[[server]]
|
||||
"ping",
|
||||
"echo",
|
||||
"select"
|
||||
"select",
|
||||
|
||||
-- [[transaction]]
|
||||
"begin",
|
||||
"commit",
|
||||
"rollback"
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
local ledis = require "ledis"
|
||||
local lds = ledis:new()
|
||||
|
||||
lds:set_timeout(1000)
|
||||
|
||||
|
||||
|
||||
|
||||
-- connect
|
||||
local ok, err = lds:connect("127.0.0.1", "6380")
|
||||
if not ok then
|
||||
ngx.say("failed to connect:", err)
|
||||
return
|
||||
end
|
||||
|
||||
lds:del("tx")
|
||||
|
||||
-- transaction
|
||||
|
||||
ok, err = lds:set("tx", "a")
|
||||
if not ok then
|
||||
ngx.say("failed to execute set in tx: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("SET should be OK <=>", ok)
|
||||
|
||||
res, err = lds:get("tx")
|
||||
if not res then
|
||||
ngx.say("failed to execute get in tx: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("GET should be a <=>", res)
|
||||
|
||||
|
||||
|
||||
ok, err = lds:begin()
|
||||
if not ok then
|
||||
ngx.say("failed to run begin: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("BEGIN should be OK <=>", ok)
|
||||
|
||||
ok, err = lds:set("tx", "b")
|
||||
if not ok then
|
||||
ngx.say("failed to execute set in tx: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("SET should be OK <=>", ok)
|
||||
|
||||
|
||||
res, err = lds:get("tx")
|
||||
if not res then
|
||||
ngx.say("failed to execute get in tx: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("GET should be b <=>", res)
|
||||
|
||||
ok, err = lds:rollback()
|
||||
if not ok then
|
||||
ngx.say("failed to rollback", err)
|
||||
return
|
||||
end
|
||||
ngx.say("ROLLBACK should be OK <=>", ok)
|
||||
|
||||
res, err = lds:get("tx")
|
||||
if not res then
|
||||
ngx.say("failed to execute get in tx: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("GET should be a <=>", res)
|
||||
|
||||
|
||||
lds:begin()
|
||||
lds:set("tx", "c")
|
||||
lds:commit()
|
||||
res, err = lds:get("tx")
|
||||
if not res then
|
||||
ngx.say("failed to execute get in tx: ", err)
|
||||
return
|
||||
end
|
||||
|
||||
ngx.say("GET should be c <=>", res)
|
||||
|
||||
|
||||
local ok, err = lds:close()
|
||||
if not ok then
|
||||
ngx.say("failed to close: ", err)
|
||||
return
|
||||
end
|
||||
ngx.say("close success")
|
Loading…
Reference in New Issue