From 7285c50bfcf49e0423943e414b3c33dc35496356 Mon Sep 17 00:00:00 2001 From: holys Date: Mon, 25 Aug 2014 17:06:14 +0800 Subject: [PATCH 1/2] add tx support for lua(openresty) --- client/openresty/ledis.lua | 7 ++- client/openresty/tx_test.lua | 96 ++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 client/openresty/tx_test.lua diff --git a/client/openresty/ledis.lua b/client/openresty/ledis.lua index aa2335a..3bb0d02 100644 --- a/client/openresty/ledis.lua +++ b/client/openresty/ledis.lua @@ -137,7 +137,12 @@ local commands = { --[[server]] "ping", "echo", - "select" + "select", + + -- [[transaction]] + "begin", + "commit", + "rollback" } diff --git a/client/openresty/tx_test.lua b/client/openresty/tx_test.lua new file mode 100644 index 0000000..069ddf9 --- /dev/null +++ b/client/openresty/tx_test.lua @@ -0,0 +1,96 @@ +local ledis = require "ledis" +local lds = ledis:new() + +lds:set_timeout(1000) + + + + +-- connect +local ok, err = lds:connect("127.0.0.1", "6380") +if not ok then + ngx.say("failed to connect:", err) + return +end + +lds:del("tx") + +-- transaction + +ok, err = lds:set("tx", "a") +if not ok then + ngx.say("failed to execute set in tx: ", err) + return +end + +ngx.say("SET should be OK <=>", ok) + +res, err = lds:get("tx") +if not res then + ngx.say("failed to execute get in tx: ", err) + return +end + +ngx.say("GET should be a <=>", res) + + + +ok, err = lds:begin() +if not ok then + ngx.say("failed to run begin: ", err) + return +end + +ngx.say("BEGIN should be OK <=>", ok) + +ok, err = lds:set("tx", "b") +if not ok then + ngx.say("failed to execute set in tx: ", err) + return +end + +ngx.say("SET should be OK <=>", ok) + + +res, err = lds:get("tx") +if not res then + ngx.say("failed to execute get in tx: ", err) + return +end + +ngx.say("GET should be b <=>", res) + +ok, err = lds:rollback() +if not ok then + ngx.say("failed to rollback", err) + return +end +ngx.say("ROLLBACK should be OK <=>", ok) + +res, err = lds:get("tx") +if not res then + ngx.say("failed to execute get in tx: ", err) + return +end + +ngx.say("GET should be a <=>", res) + + +lds:begin() +lds:set("tx", "c") +lds:commit() +res, err = lds:get("tx") +if not res then + ngx.say("failed to execute get in tx: ", err) + return +end + +ngx.say("GET should be c <=>", res) + + +local ok, err = lds:close() +if not ok then + ngx.say("failed to close: ", err) + return +end +ngx.say("close success") From 63d5c9858029c685a4b33788b071d88607b1c279 Mon Sep 17 00:00:00 2001 From: holys Date: Tue, 26 Aug 2014 10:56:29 +0800 Subject: [PATCH 2/2] add tx support for ledis-py --- client/ledis-py/ledis/client.py | 65 ++++++++++++++++++++++++++++- client/ledis-py/ledis/exceptions.py | 7 ++++ client/ledis-py/tests/test_tx.py | 38 +++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 client/ledis-py/tests/test_tx.py diff --git a/client/ledis-py/ledis/client.py b/client/ledis-py/ledis/client.py index 4593a70..2e8a3f7 100644 --- a/client/ledis-py/ledis/client.py +++ b/client/ledis-py/ledis/client.py @@ -1,6 +1,7 @@ from __future__ import with_statement import datetime import time as mod_time +from itertools import chain, starmap from ledis._compat import (b, izip, imap, iteritems, basestring, long, nativestr, bytes) from ledis.connection import ConnectionPool, UnixDomainSocketConnection @@ -8,6 +9,8 @@ from ledis.exceptions import ( ConnectionError, DataError, LedisError, + ResponseError, + TxNotBeginError ) SYM_EMPTY = b('') @@ -169,6 +172,26 @@ class Ledis(object): def set_response_callback(self, command, callback): "Set a custom Response Callback" self.response_callbacks[command] = callback + + + # def pipeline(self, transaction=True, shard_hint=None): + # """ + # Return a new pipeline object that can queue multiple commands for + # later execution. ``transaction`` indicates whether all commands + # should be executed atomically. Apart from making a group of operations + # atomic, pipelines are useful for reducing the back-and-forth overhead + # between the client and server. + # """ + # return StrictPipeline( + # self.connection_pool, + # self.response_callbacks, + # transaction, + # shard_hint) + + def tx(self): + return Transaction( + self.connection_pool, + self.response_callbacks) #### COMMAND EXECUTION AND PROTOCOL PARSING #### @@ -868,4 +891,44 @@ class Ledis(object): def bpersist(self, name): "Removes an expiration on name" - return self.execute_command('BPERSIST', name) \ No newline at end of file + return self.execute_command('BPERSIST', name) + + +class Transaction(Ledis): + def __init__(self, connection_pool, response_callbacks): + self.connection_pool = connection_pool + self.response_callbacks = response_callbacks + self.connection = None + + def execute_command(self, *args, **options): + "Execute a command and return a parsed response" + command_name = args[0] + + connection = self.connection + if self.connection is None: + raise TxNotBeginError + + try: + connection.send_command(*args) + return self.parse_response(connection, command_name, **options) + except ConnectionError: + connection.disconnect() + connection.send_command(*args) + return self.parse_response(connection, command_name, **options) + + def begin(self): + self.connection = self.connection_pool.get_connection('begin') + return self.execute_command("BEGIN") + + def commit(self): + res = self.execute_command("COMMIT") + self.connection_pool.release(self.connection) + self.connection = None + return res + + def rollback(self): + res = self.execute_command("ROLLBACK") + self.connection_pool.release(self.connection) + self.connection = None + return res + diff --git a/client/ledis-py/ledis/exceptions.py b/client/ledis-py/ledis/exceptions.py index 91bada4..9150db6 100644 --- a/client/ledis-py/ledis/exceptions.py +++ b/client/ledis-py/ledis/exceptions.py @@ -16,6 +16,10 @@ class BusyLoadingError(ConnectionError): pass +class TimeoutError(LedisError): + pass + + class InvalidResponse(ServerError): pass @@ -30,3 +34,6 @@ class DataError(LedisError): class ExecAbortError(ResponseError): pass + +class TxNotBeginError(LedisError): + pass \ No newline at end of file diff --git a/client/ledis-py/tests/test_tx.py b/client/ledis-py/tests/test_tx.py new file mode 100644 index 0000000..b589dc7 --- /dev/null +++ b/client/ledis-py/tests/test_tx.py @@ -0,0 +1,38 @@ +import unittest +import sys +sys.path.append("..") + +import ledis + + +class TestTx(unittest.TestCase): + def setUp(self): + self.l = ledis.Ledis(port=6380) + + def tearDown(self): + self.l.delete("a") + + def test_commit(self): + tx = self.l.tx() + self.l.set("a", "no-tx") + assert self.l.get("a") == "no-tx" + tx.begin() + tx.set("a", "tx") + assert self.l.get("a") == "no-tx" + assert tx.get("a") == "tx" + + tx.commit() + assert self.l.get("a") == "tx" + + def test_rollback(self): + tx = self.l.tx() + self.l.set("a", "no-tx") + assert self.l.get("a") == "no-tx" + + tx.begin() + tx.set("a", "tx") + assert tx.get("a") == "tx" + assert self.l.get("a") == "no-tx" + + tx.rollback() + assert self.l.get("a") == "no-tx" \ No newline at end of file