mirror of https://github.com/go-redis/redis.git
Merge pull request #316 from go-redis/feature/tweak-tx-api
Tweak transaction API.
This commit is contained in:
commit
46cb31d37d
|
@ -59,15 +59,13 @@ func (c *ClusterClient) getClients() map[string]*Client {
|
||||||
return clients
|
return clients
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch creates new transaction and marks the keys to be watched
|
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
|
||||||
// for conditional execution of a transaction.
|
|
||||||
func (c *ClusterClient) Watch(keys ...string) (*Tx, error) {
|
|
||||||
addr := c.slotMasterAddr(hashtag.Slot(keys[0]))
|
addr := c.slotMasterAddr(hashtag.Slot(keys[0]))
|
||||||
client, err := c.getClient(addr)
|
client, err := c.getClient(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
return client.Watch(keys...)
|
return client.Watch(fn, keys...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolStats returns accumulated connection pool stats.
|
// PoolStats returns accumulated connection pool stats.
|
||||||
|
|
|
@ -383,21 +383,18 @@ var _ = Describe("Cluster", func() {
|
||||||
|
|
||||||
// Transactionally increments key using GET and SET commands.
|
// Transactionally increments key using GET and SET commands.
|
||||||
incr = func(key string) error {
|
incr = func(key string) error {
|
||||||
tx, err := client.Watch(key)
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Close()
|
|
||||||
|
|
||||||
n, err := tx.Get(key).Int64()
|
n, err := tx.Get(key).Int64()
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(func() error {
|
_, err = tx.MultiExec(func() error {
|
||||||
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
|
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return err
|
||||||
|
}, key)
|
||||||
if err == redis.TxFailedErr {
|
if err == redis.TxFailedErr {
|
||||||
return incr(key)
|
return incr(key)
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,21 +184,18 @@ func ExampleClient_Watch() {
|
||||||
|
|
||||||
// Transactionally increments key using GET and SET commands.
|
// Transactionally increments key using GET and SET commands.
|
||||||
incr = func(key string) error {
|
incr = func(key string) error {
|
||||||
tx, err := client.Watch(key)
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Close()
|
|
||||||
|
|
||||||
n, err := tx.Get(key).Int64()
|
n, err := tx.Get(key).Int64()
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(func() error {
|
_, err = tx.MultiExec(func() error {
|
||||||
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
|
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return err
|
||||||
|
}, key)
|
||||||
if err == redis.TxFailedErr {
|
if err == redis.TxFailedErr {
|
||||||
return incr(key)
|
return incr(key)
|
||||||
}
|
}
|
||||||
|
|
11
pool_test.go
11
pool_test.go
|
@ -37,18 +37,19 @@ var _ = Describe("pool", func() {
|
||||||
perform(1000, func(id int) {
|
perform(1000, func(id int) {
|
||||||
var ping *redis.StatusCmd
|
var ping *redis.StatusCmd
|
||||||
|
|
||||||
tx, err := client.Watch()
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
cmds, err := tx.MultiExec(func() error {
|
||||||
|
|
||||||
cmds, err := tx.Exec(func() error {
|
|
||||||
ping = tx.Ping()
|
ping = tx.Ping()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(1))
|
Expect(cmds).To(HaveLen(1))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
Expect(ping.Err()).NotTo(HaveOccurred())
|
Expect(ping.Err()).NotTo(HaveOccurred())
|
||||||
Expect(ping.Val()).To(Equal("PONG"))
|
Expect(ping.Val()).To(Equal("PONG"))
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
|
|
14
race_test.go
14
race_test.go
|
@ -215,9 +215,7 @@ var _ = Describe("races", func() {
|
||||||
|
|
||||||
perform(C, func(id int) {
|
perform(C, func(id int) {
|
||||||
for i := 0; i < N; i++ {
|
for i := 0; i < N; i++ {
|
||||||
tx, err := client.Watch("key")
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
val, err := tx.Get("key").Result()
|
val, err := tx.Get("key").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).NotTo(Equal(redis.Nil))
|
Expect(val).NotTo(Equal(redis.Nil))
|
||||||
|
@ -225,20 +223,18 @@ var _ = Describe("races", func() {
|
||||||
num, err := strconv.ParseInt(val, 10, 64)
|
num, err := strconv.ParseInt(val, 10, 64)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
cmds, err := tx.Exec(func() error {
|
cmds, err := tx.MultiExec(func() error {
|
||||||
tx.Set("key", strconv.FormatInt(num+1, 10), 0)
|
tx.Set("key", strconv.FormatInt(num+1, 10), 0)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
Expect(cmds).To(HaveLen(1))
|
||||||
|
return err
|
||||||
|
}, "key")
|
||||||
if err == redis.TxFailedErr {
|
if err == redis.TxFailedErr {
|
||||||
i--
|
i--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(1))
|
|
||||||
Expect(cmds[0].Err()).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
err = tx.Close()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -65,16 +65,15 @@ var _ = Describe("Client", func() {
|
||||||
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should close multi without closing the client", func() {
|
It("should close Tx without closing the client", func() {
|
||||||
tx, err := client.Watch()
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
_, err := tx.MultiExec(func() error {
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
_, err = tx.Exec(func() error {
|
|
||||||
tx.Ping()
|
tx.Ping()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
Expect(err).To(MatchError("redis: client is closed"))
|
return err
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
@ -96,13 +95,6 @@ var _ = Describe("Client", func() {
|
||||||
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should close multi when client is closed", func() {
|
|
||||||
tx, err := client.Watch()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should close pipeline when client is closed", func() {
|
It("should close pipeline when client is closed", func() {
|
||||||
pipeline := client.Pipeline()
|
pipeline := client.Pipeline()
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
|
27
tx.go
27
tx.go
|
@ -34,17 +34,19 @@ func (c *Client) newTx() *Tx {
|
||||||
return tx
|
return tx
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch creates new transaction and marks the keys to be watched
|
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
|
||||||
// for conditional execution of a transaction.
|
|
||||||
func (c *Client) Watch(keys ...string) (*Tx, error) {
|
|
||||||
tx := c.newTx()
|
tx := c.newTx()
|
||||||
if len(keys) > 0 {
|
if len(keys) > 0 {
|
||||||
if err := tx.Watch(keys...).Err(); err != nil {
|
if err := tx.Watch(keys...).Err(); err != nil {
|
||||||
tx.Close()
|
tx.close()
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tx, nil
|
retErr := fn(tx)
|
||||||
|
if err := tx.close(); err != nil && retErr == nil {
|
||||||
|
retErr = err
|
||||||
|
}
|
||||||
|
return retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tx *Tx) process(cmd Cmder) {
|
func (tx *Tx) process(cmd Cmder) {
|
||||||
|
@ -55,8 +57,11 @@ func (tx *Tx) process(cmd Cmder) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the transaction, releasing any open resources.
|
// close closes the transaction, releasing any open resources.
|
||||||
func (tx *Tx) Close() error {
|
func (tx *Tx) close() error {
|
||||||
|
if tx.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
tx.closed = true
|
tx.closed = true
|
||||||
if err := tx.Unwatch().Err(); err != nil {
|
if err := tx.Unwatch().Err(); err != nil {
|
||||||
internal.Logf("Unwatch failed: %s", err)
|
internal.Logf("Unwatch failed: %s", err)
|
||||||
|
@ -98,7 +103,7 @@ func (tx *Tx) Discard() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec executes all previously queued commands in a transaction
|
// MultiExec executes all previously queued commands in a transaction
|
||||||
// and restores the connection state to normal.
|
// and restores the connection state to normal.
|
||||||
//
|
//
|
||||||
// When using WATCH, EXEC will execute commands only if the watched keys
|
// When using WATCH, EXEC will execute commands only if the watched keys
|
||||||
|
@ -107,13 +112,13 @@ func (tx *Tx) Discard() error {
|
||||||
// Exec always returns list of commands. If transaction fails
|
// Exec always returns list of commands. If transaction fails
|
||||||
// TxFailedErr is returned. Otherwise Exec returns error of the first
|
// TxFailedErr is returned. Otherwise Exec returns error of the first
|
||||||
// failed command or nil.
|
// failed command or nil.
|
||||||
func (tx *Tx) Exec(f func() error) ([]Cmder, error) {
|
func (tx *Tx) MultiExec(fn func() error) ([]Cmder, error) {
|
||||||
if tx.closed {
|
if tx.closed {
|
||||||
return nil, pool.ErrClosed
|
return nil, pool.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.cmds = []Cmder{NewStatusCmd("MULTI")}
|
tx.cmds = []Cmder{NewStatusCmd("MULTI")}
|
||||||
if err := f(); err != nil {
|
if err := fn(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tx.cmds = append(tx.cmds, NewSliceCmd("EXEC"))
|
tx.cmds = append(tx.cmds, NewSliceCmd("EXEC"))
|
||||||
|
|
109
tx_test.go
109
tx_test.go
|
@ -27,21 +27,18 @@ var _ = Describe("Tx", func() {
|
||||||
|
|
||||||
// Transactionally increments key using GET and SET commands.
|
// Transactionally increments key using GET and SET commands.
|
||||||
incr = func(key string) error {
|
incr = func(key string) error {
|
||||||
tx, err := client.Watch(key)
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Close()
|
|
||||||
|
|
||||||
n, err := tx.Get(key).Int64()
|
n, err := tx.Get(key).Int64()
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(func() error {
|
_, err = tx.MultiExec(func() error {
|
||||||
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
|
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return err
|
||||||
|
}, key)
|
||||||
if err == redis.TxFailedErr {
|
if err == redis.TxFailedErr {
|
||||||
return incr(key)
|
return incr(key)
|
||||||
}
|
}
|
||||||
|
@ -67,13 +64,8 @@ var _ = Describe("Tx", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should discard", func() {
|
It("should discard", func() {
|
||||||
tx, err := client.Watch("key1", "key2")
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
cmds, err := tx.MultiExec(func() error {
|
||||||
defer func() {
|
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
}()
|
|
||||||
|
|
||||||
cmds, err := tx.Exec(func() error {
|
|
||||||
tx.Set("key1", "hello1", 0)
|
tx.Set("key1", "hello1", 0)
|
||||||
tx.Discard()
|
tx.Discard()
|
||||||
tx.Set("key2", "hello2", 0)
|
tx.Set("key2", "hello2", 0)
|
||||||
|
@ -81,6 +73,9 @@ var _ = Describe("Tx", func() {
|
||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(1))
|
Expect(cmds).To(HaveLen(1))
|
||||||
|
return err
|
||||||
|
}, "key1", "key2")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
get := client.Get("key1")
|
get := client.Get("key1")
|
||||||
Expect(get.Err()).To(Equal(redis.Nil))
|
Expect(get.Err()).To(Equal(redis.Nil))
|
||||||
|
@ -92,43 +87,41 @@ var _ = Describe("Tx", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should exec empty", func() {
|
It("should exec empty", func() {
|
||||||
tx, err := client.Watch()
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
cmds, err := tx.MultiExec(func() error { return nil })
|
||||||
defer func() {
|
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
}()
|
|
||||||
|
|
||||||
cmds, err := tx.Exec(func() error { return nil })
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(0))
|
Expect(cmds).To(HaveLen(0))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
ping := tx.Ping()
|
v, err := client.Ping().Result()
|
||||||
Expect(ping.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(ping.Val()).To(Equal("PONG"))
|
Expect(v).To(Equal("PONG"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should exec bulks", func() {
|
It("should exec bulks", func() {
|
||||||
tx, err := client.Watch()
|
const N = 20000
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
defer func() {
|
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
}()
|
|
||||||
|
|
||||||
cmds, err := tx.Exec(func() error {
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
for i := int64(0); i < 20000; i++ {
|
cmds, err := tx.MultiExec(func() error {
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
tx.Incr("key")
|
tx.Incr("key")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(cmds)).To(Equal(20000))
|
Expect(len(cmds)).To(Equal(N))
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
Expect(cmd.Err()).NotTo(HaveOccurred())
|
Expect(cmd.Err()).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
get := client.Get("key")
|
num, err := client.Get("key").Int64()
|
||||||
Expect(get.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(get.Val()).To(Equal("20000"))
|
Expect(num).To(Equal(int64(N)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should recover from bad connection", func() {
|
It("should recover from bad connection", func() {
|
||||||
|
@ -140,22 +133,21 @@ var _ = Describe("Tx", func() {
|
||||||
err = client.Pool().Put(cn)
|
err = client.Pool().Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
tx, err := client.Watch()
|
do := func() error {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
defer func() {
|
_, err := tx.MultiExec(func() error {
|
||||||
Expect(tx.Close()).NotTo(HaveOccurred())
|
|
||||||
}()
|
|
||||||
|
|
||||||
_, err = tx.Exec(func() error {
|
|
||||||
tx.Ping()
|
tx.Ping()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = do()
|
||||||
Expect(err).To(MatchError("bad connection"))
|
Expect(err).To(MatchError("bad connection"))
|
||||||
|
|
||||||
_, err = tx.Exec(func() error {
|
err = do()
|
||||||
tx.Ping()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -168,21 +160,20 @@ var _ = Describe("Tx", func() {
|
||||||
err = client.Pool().Put(cn)
|
err = client.Pool().Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
{
|
do := func() error {
|
||||||
tx, err := client.Watch("key")
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
|
_, err := tx.MultiExec(func() error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}, "key")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = do()
|
||||||
Expect(err).To(MatchError("bad connection"))
|
Expect(err).To(MatchError("bad connection"))
|
||||||
Expect(tx).To(BeNil())
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
err = do()
|
||||||
tx, err := client.Watch("key")
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
err = tx.Ping().Err()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
err = tx.Close()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue