From 9079a66323abc637fe1c48575ee0e2c9337a6400 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 16 Dec 2015 16:11:52 +0200 Subject: [PATCH] cluster: add Watch support. --- cluster.go | 11 ++++++++ cluster_pipeline.go | 5 ++-- cluster_test.go | 45 ++++++++++++++++++++++++++++++++ multi.go | 4 +-- multi_test.go | 63 ++++++++++++++++++++++++++++++--------------- 5 files changed, 103 insertions(+), 25 deletions(-) diff --git a/cluster.go b/cluster.go index ab06890..43bfc8d 100644 --- a/cluster.go +++ b/cluster.go @@ -44,6 +44,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return client } +// Watch creates new transaction and marks the keys to be watched +// for conditional execution of a transaction. +func (c *ClusterClient) Watch(keys ...string) (*Multi, error) { + addr := c.slotMasterAddr(hashSlot(keys[0])) + client, err := c.getClient(addr) + if err != nil { + return nil, err + } + return client.Watch(keys...) +} + // Close closes the cluster client, releasing any open resources. // // It is rare to Close a ClusterClient, as the ClusterClient is meant diff --git a/cluster_pipeline.go b/cluster_pipeline.go index 3c93bbf..eb5cd2d 100644 --- a/cluster_pipeline.go +++ b/cluster_pipeline.go @@ -4,9 +4,10 @@ package redis type ClusterPipeline struct { commandable - cmds []Cmder cluster *ClusterClient - closed bool + + cmds []Cmder + closed bool } // Pipeline creates a new pipeline which is able to execute commands diff --git a/cluster_test.go b/cluster_test.go index d875735..d409dcb 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -5,7 +5,9 @@ import ( "math/rand" "net" "reflect" + "strconv" "strings" + "sync" "testing" "time" @@ -317,6 +319,49 @@ var _ = Describe("Cluster", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("MOVED")) }) + + It("should Watch", func() { + var incr func(string) error + + // Transactionally increments key using GET and SET commands. + incr = func(key string) error { + tx, err := client.Watch(key) + if err != nil { + return err + } + defer tx.Close() + + n, err := tx.Get(key).Int64() + if err != nil && err != redis.Nil { + return err + } + + _, err = tx.Exec(func() error { + tx.Set(key, strconv.FormatInt(n+1, 10), 0) + return nil + }) + if err == redis.TxFailedErr { + return incr(key) + } + return err + } + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + err := incr("key") + Expect(err).NotTo(HaveOccurred()) + }() + } + wg.Wait() + + n, err := client.Get("key").Int64() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(100))) + }) }) }) diff --git a/multi.go b/multi.go index bb4fafe..9b9df75 100644 --- a/multi.go +++ b/multi.go @@ -23,8 +23,8 @@ type Multi struct { closed bool } -// Watch marks the keys to be watched for conditional execution -// of a transaction. +// Watch creates new transaction and marks the keys to be watched +// for conditional execution of a transaction. func (c *Client) Watch(keys ...string) (*Multi, error) { tx := c.Multi() if err := tx.Watch(keys...).Err(); err != nil { diff --git a/multi_test.go b/multi_test.go index c95c703..1e6f360 100644 --- a/multi_test.go +++ b/multi_test.go @@ -1,6 +1,9 @@ package redis_test import ( + "strconv" + "sync" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -21,29 +24,47 @@ var _ = Describe("Multi", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - It("should exec", func() { - multi := client.Multi() - defer func() { - Expect(multi.Close()).NotTo(HaveOccurred()) - }() + It("should Watch", func() { + var incr func(string) error - var ( - set *redis.StatusCmd - get *redis.StringCmd - ) - cmds, err := multi.Exec(func() error { - set = multi.Set("key", "hello", 0) - get = multi.Get("key") - return nil - }) + // Transactionally increments key using GET and SET commands. + incr = func(key string) error { + tx, err := client.Watch(key) + if err != nil { + return err + } + defer tx.Close() + + n, err := tx.Get(key).Int64() + if err != nil && err != redis.Nil { + return err + } + + _, err = tx.Exec(func() error { + tx.Set(key, strconv.FormatInt(n+1, 10), 0) + return nil + }) + if err == redis.TxFailedErr { + return incr(key) + } + return err + } + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + err := incr("key") + Expect(err).NotTo(HaveOccurred()) + }() + } + wg.Wait() + + n, err := client.Get("key").Int64() Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(2)) - - Expect(set.Err()).NotTo(HaveOccurred()) - Expect(set.Val()).To(Equal("OK")) - - Expect(get.Err()).NotTo(HaveOccurred()) - Expect(get.Val()).To(Equal("hello")) + Expect(n).To(Equal(int64(100))) }) It("should discard", func() {