diff --git a/cluster.go b/cluster.go index cc8b300..90fd4ce 100644 --- a/cluster.go +++ b/cluster.go @@ -12,6 +12,8 @@ import ( "gopkg.in/redis.v5/internal/pool" ) +var errClusterNoNodes = errors.RedisError("redis: cluster has no nodes") + // ClusterOptions are used to configure a cluster client and should be // passed to NewClusterClient. type ClusterOptions struct { @@ -155,14 +157,14 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { func (c *ClusterClient) getNodes() map[string]*clusterNode { var nodes map[string]*clusterNode + c.mu.RLock() if !c.closed { nodes = make(map[string]*clusterNode, len(c.nodes)) - c.mu.RLock() for addr, node := range c.nodes { nodes[addr] = node } - c.mu.RUnlock() } + c.mu.RUnlock() return nodes } @@ -261,6 +263,9 @@ func (c *ClusterClient) randomNode() (*clusterNode, error) { return nil, pool.ErrClosed } + if len(addrs) == 0 { + return nil, errClusterNoNodes + } n := rand.Intn(len(addrs)) node, err := c.nodeByAddr(addrs[n]) diff --git a/cluster_test.go b/cluster_test.go index 367a6e3..9639cf8 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -301,6 +301,37 @@ var _ = Describe("ClusterClient", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) + It("distributes keys", func() { + for i := 0; i < 100; i++ { + err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + + wanted := []string{"keys=31", "keys=29", "keys=40"} + for i, master := range cluster.masters() { + Expect(master.Info().Val()).To(ContainSubstring(wanted[i])) + } + }) + + It("distributes keys when using EVAL", func() { + script := redis.NewScript(` + local r = redis.call('SET', KEYS[1], ARGV[1]) + return r + `) + + var key string + for i := 0; i < 100; i++ { + key = fmt.Sprintf("key%d", i) + err := script.Run(client, []string{key}, "value").Err() + Expect(err).NotTo(HaveOccurred()) + } + + wanted := []string{"keys=31", "keys=29", "keys=40"} + for i, master := range cluster.masters() { + Expect(master.Info().Val()).To(ContainSubstring(wanted[i])) + } + }) + It("supports Watch", func() { var incr func(string) error @@ -342,60 +373,62 @@ var _ = Describe("ClusterClient", func() { Expect(n).To(Equal(int64(100))) }) - It("supports pipeline", func() { - slot := hashtag.Slot("A") - Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + Describe("pipeline", func() { + It("follows redirects", func() { + slot := hashtag.Slot("A") + Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) - pipe := client.Pipeline() - defer pipe.Close() + pipe := client.Pipeline() + defer pipe.Close() - keys := []string{"A", "B", "C", "D", "E", "F", "G"} + keys := []string{"A", "B", "C", "D", "E", "F", "G"} - for i, key := range keys { - pipe.Set(key, key+"_value", 0) - pipe.Expire(key, time.Duration(i+1)*time.Hour) - } - cmds, err := pipe.Exec() - Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(14)) + for i, key := range keys { + pipe.Set(key, key+"_value", 0) + pipe.Expire(key, time.Duration(i+1)*time.Hour) + } + cmds, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + Expect(cmds).To(HaveLen(14)) - for _, key := range keys { - pipe.Get(key) - pipe.TTL(key) - } - cmds, err = pipe.Exec() - Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(14)) - Expect(cmds[0].(*redis.StringCmd).Val()).To(Equal("A_value")) - Expect(cmds[1].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second)) - Expect(cmds[6].(*redis.StringCmd).Val()).To(Equal("D_value")) - Expect(cmds[7].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second)) - Expect(cmds[12].(*redis.StringCmd).Val()).To(Equal("G_value")) - Expect(cmds[13].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second)) - }) - - It("supports pipeline with missing keys", func() { - Expect(client.Set("A", "A_value", 0).Err()).NotTo(HaveOccurred()) - Expect(client.Set("C", "C_value", 0).Err()).NotTo(HaveOccurred()) - - var a, b, c *redis.StringCmd - cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error { - a = pipe.Get("A") - b = pipe.Get("B") - c = pipe.Get("C") - return nil + for _, key := range keys { + pipe.Get(key) + pipe.TTL(key) + } + cmds, err = pipe.Exec() + Expect(err).NotTo(HaveOccurred()) + Expect(cmds).To(HaveLen(14)) + Expect(cmds[0].(*redis.StringCmd).Val()).To(Equal("A_value")) + Expect(cmds[1].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second)) + Expect(cmds[6].(*redis.StringCmd).Val()).To(Equal("D_value")) + Expect(cmds[7].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second)) + Expect(cmds[12].(*redis.StringCmd).Val()).To(Equal("G_value")) + Expect(cmds[13].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second)) }) - Expect(err).To(Equal(redis.Nil)) - Expect(cmds).To(HaveLen(3)) - Expect(a.Err()).NotTo(HaveOccurred()) - Expect(a.Val()).To(Equal("A_value")) + It("works with missing keys", func() { + Expect(client.Set("A", "A_value", 0).Err()).NotTo(HaveOccurred()) + Expect(client.Set("C", "C_value", 0).Err()).NotTo(HaveOccurred()) - Expect(b.Err()).To(Equal(redis.Nil)) - Expect(b.Val()).To(Equal("")) + var a, b, c *redis.StringCmd + cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error { + a = pipe.Get("A") + b = pipe.Get("B") + c = pipe.Get("C") + return nil + }) + Expect(err).To(Equal(redis.Nil)) + Expect(cmds).To(HaveLen(3)) - Expect(c.Err()).NotTo(HaveOccurred()) - Expect(c.Val()).To(Equal("C_value")) + Expect(a.Err()).NotTo(HaveOccurred()) + Expect(a.Val()).To(Equal("A_value")) + + Expect(b.Err()).To(Equal(redis.Nil)) + Expect(b.Val()).To(Equal("")) + + Expect(c.Err()).NotTo(HaveOccurred()) + Expect(c.Val()).To(Equal("C_value")) + }) }) It("calls fn for every master node", func() { @@ -451,6 +484,25 @@ var _ = Describe("ClusterClient", func() { describeClusterClient() }) + Describe("ClusterClient without nodes", func() { + BeforeEach(func() { + client = redis.NewClusterClient(&redis.ClusterOptions{}) + }) + + It("returns an error", func() { + err := client.Ping().Err() + Expect(err).To(MatchError("redis: cluster has no nodes")) + }) + + It("pipeline returns an error", func() { + _, err := client.Pipelined(func(pipe *redis.Pipeline) error { + pipe.Ping() + return nil + }) + Expect(err).To(MatchError("redis: cluster has no nodes")) + }) + }) + Describe("ClusterClient without valid nodes", func() { BeforeEach(func() { client = redis.NewClusterClient(&redis.ClusterOptions{ @@ -462,6 +514,14 @@ var _ = Describe("ClusterClient", func() { err := client.Ping().Err() Expect(err).To(MatchError("ERR This instance has cluster support disabled")) }) + + It("pipeline returns an error", func() { + _, err := client.Pipelined(func(pipe *redis.Pipeline) error { + pipe.Ping() + return nil + }) + Expect(err).To(MatchError("ERR This instance has cluster support disabled")) + }) }) }) diff --git a/ring_test.go b/ring_test.go index 5e9d16f..1c1ae69 100644 --- a/ring_test.go +++ b/ring_test.go @@ -38,7 +38,7 @@ var _ = Describe("Redis Ring", func() { Expect(ring.Close()).NotTo(HaveOccurred()) }) - It("uses both shards", func() { + It("distributes keys", func() { setRingKeys() // Both shards should have some keys now. @@ -46,6 +46,23 @@ var _ = Describe("Redis Ring", func() { Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) }) + It("distributes keys when using EVAL", func() { + script := redis.NewScript(` + local r = redis.call('SET', KEYS[1], ARGV[1]) + return r + `) + + var key string + for i := 0; i < 100; i++ { + key = fmt.Sprintf("key%d", i) + err := script.Run(ring, []string{key}, "value").Err() + Expect(err).NotTo(HaveOccurred()) + } + + Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57")) + Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43")) + }) + It("uses single shard when one of the shards is down", func() { // Stop ringShard2. Expect(ringShard2.Close()).NotTo(HaveOccurred()) @@ -89,34 +106,8 @@ var _ = Describe("Redis Ring", func() { Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100")) }) - It("supports eval key search", func() { - script := redis.NewScript(` - local r = redis.call('SET', KEYS[1], ARGV[1]) - return r - `) - - var key string - for i := 0; i < 100; i++ { - key = fmt.Sprintf("key{%d}", i) - err := script.Run(ring, []string{key}, "value").Err() - Expect(err).NotTo(HaveOccurred()) - } - - Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=52")) - Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=48")) - }) - - Describe("pipelining", func() { - It("returns an error when all shards are down", func() { - ring := redis.NewRing(&redis.RingOptions{}) - _, err := ring.Pipelined(func(pipe *redis.Pipeline) error { - pipe.Ping() - return nil - }) - Expect(err).To(MatchError("redis: all ring shards are down")) - }) - - It("uses both shards", func() { + Describe("pipeline", func() { + It("distributes keys", func() { pipe := ring.Pipeline() for i := 0; i < 100; i++ { err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err() @@ -175,3 +166,28 @@ var _ = Describe("Redis Ring", func() { }) }) }) + +var _ = Describe("empty Redis Ring", func() { + var ring *redis.Ring + + BeforeEach(func() { + ring = redis.NewRing(&redis.RingOptions{}) + }) + + AfterEach(func() { + Expect(ring.Close()).NotTo(HaveOccurred()) + }) + + It("returns an error", func() { + err := ring.Ping().Err() + Expect(err).To(MatchError("redis: all ring shards are down")) + }) + + It("pipeline returns an error", func() { + _, err := ring.Pipelined(func(pipe *redis.Pipeline) error { + pipe.Ping() + return nil + }) + Expect(err).To(MatchError("redis: all ring shards are down")) + }) +})