diff --git a/cluster_pipeline.go b/cluster_pipeline.go index 6d55265..883c770 100644 --- a/cluster_pipeline.go +++ b/cluster_pipeline.go @@ -27,6 +27,16 @@ func (c *ClusterClient) Pipeline() *ClusterPipeline { return pipe } +func (c *ClusterClient) Pipelined(fn func(*ClusterPipeline) error) ([]Cmder, error) { + pipe := c.Pipeline() + if err := fn(pipe); err != nil { + return nil, err + } + cmds, err := pipe.Exec() + _ = pipe.Close() + return cmds, err +} + func (pipe *ClusterPipeline) process(cmd Cmder) { pipe.cmds = append(pipe.cmds, cmd) } diff --git a/cluster_test.go b/cluster_test.go index 6a04212..ff091a2 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -191,6 +191,7 @@ var _ = Describe("Cluster", func() { {"", 5176}, {string([]byte{83, 153, 134, 118, 229, 214, 244, 75, 140, 37, 215, 215}), 5463}, } + // Empty keys receive random slot. rand.Seed(100) for _, test := range tests { @@ -343,34 +344,6 @@ var _ = Describe("Cluster", func() { }, "5s").Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) }) - It("should perform multi-pipelines", func() { - slot := hashtag.Slot("A") - Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) - - pipe := client.Pipeline() - defer pipe.Close() - - keys := []string{"A", "B", "C", "D", "E", "F", "G"} - for i, key := range keys { - pipe.Set(key, key+"_value", 0) - pipe.Expire(key, time.Duration(i+1)*time.Hour) - } - for _, key := range keys { - pipe.Get(key) - pipe.TTL(key) - } - - cmds, err := pipe.Exec() - Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(28)) - Expect(cmds[14].(*redis.StringCmd).Val()).To(Equal("A_value")) - Expect(cmds[15].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second)) - Expect(cmds[20].(*redis.StringCmd).Val()).To(Equal("D_value")) - Expect(cmds[21].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second)) - Expect(cmds[26].(*redis.StringCmd).Val()).To(Equal("G_value")) - Expect(cmds[27].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second)) - }) - It("should return error when there are no attempts left", func() { Expect(client.Close()).NotTo(HaveOccurred()) client = cluster.clusterClient(&redis.ClusterOptions{ @@ -428,6 +401,73 @@ var _ = Describe("Cluster", func() { Expect(n).To(Equal(int64(100))) }) }) + + Describe("pipeline", func() { + var client *redis.ClusterClient + + BeforeEach(func() { + client = cluster.clusterClient(nil) + }) + + AfterEach(func() { + for _, client := range cluster.masters() { + Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) + } + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("performs multi-pipelines", func() { + slot := hashtag.Slot("A") + Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + + pipe := client.Pipeline() + defer pipe.Close() + + keys := []string{"A", "B", "C", "D", "E", "F", "G"} + for i, key := range keys { + pipe.Set(key, key+"_value", 0) + pipe.Expire(key, time.Duration(i+1)*time.Hour) + } + for _, key := range keys { + pipe.Get(key) + pipe.TTL(key) + } + + cmds, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + Expect(cmds).To(HaveLen(28)) + Expect(cmds[14].(*redis.StringCmd).Val()).To(Equal("A_value")) + Expect(cmds[15].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second)) + Expect(cmds[20].(*redis.StringCmd).Val()).To(Equal("D_value")) + Expect(cmds[21].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second)) + Expect(cmds[26].(*redis.StringCmd).Val()).To(Equal("G_value")) + Expect(cmds[27].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second)) + }) + + It("works with missing keys", func() { + Expect(client.Set("A", "A_value", 0).Err()).NotTo(HaveOccurred()) + Expect(client.Set("C", "C_value", 0).Err()).NotTo(HaveOccurred()) + + var a, b, c *redis.StringCmd + cmds, err := client.Pipelined(func(pipe *redis.ClusterPipeline) error { + a = pipe.Get("A") + b = pipe.Get("B") + c = pipe.Get("C") + return nil + }) + Expect(err).To(Equal(redis.Nil)) + Expect(cmds).To(HaveLen(3)) + + Expect(a.Err()).NotTo(HaveOccurred()) + Expect(a.Val()).To(Equal("A_value")) + + Expect(b.Err()).To(Equal(redis.Nil)) + Expect(b.Val()).To(Equal("")) + + Expect(c.Err()).NotTo(HaveOccurred()) + Expect(c.Val()).To(Equal("C_value")) + }) + }) }) //------------------------------------------------------------------------------