diff --git a/cluster.go b/cluster.go index 78ad22ca..e6fc2a3e 100644 --- a/cluster.go +++ b/cluster.go @@ -193,16 +193,19 @@ func (n *clusterNode) Close() error { } func (n *clusterNode) updateLatency() { - const probes = 10 + const numProbe = 10 + var dur uint64 + + for i := 0; i < numProbe; i++ { + time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond) - var latency uint32 - for i := 0; i < probes; i++ { start := time.Now() n.Client.Ping(context.TODO()) - probe := uint32(time.Since(start) / time.Microsecond) - latency = (latency + probe) / 2 + dur += uint64(time.Since(start) / time.Microsecond) } - atomic.StoreUint32(&n.latency, latency) + + latency := float64(dur) / float64(numProbe) + atomic.StoreUint32(&n.latency, uint32(latency+0.5)) } func (n *clusterNode) Latency() time.Duration { @@ -323,6 +326,9 @@ func (c *clusterNodes) GC(generation uint32) { for addr, node := range c.nodes { if node.Generation() >= generation { c.activeAddrs = append(c.activeAddrs, addr) + if c.opt.RouteByLatency { + go node.updateLatency() + } continue } @@ -553,8 +559,6 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { } func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { - const threshold = time.Millisecond - nodes := c.slotNodes(slot) if len(nodes) == 0 { return c.nodes.Random() @@ -565,13 +569,14 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { if n.Failing() { continue } - if node == nil || node.Latency()-n.Latency() > threshold { + if node == nil || n.Latency() < node.Latency() { node = n } } if node != nil { return node, nil } + // If all nodes are failing - return random node return c.nodes.Random() } diff --git a/cluster_test.go b/cluster_test.go index c1130233..48c45b51 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -522,6 +522,175 @@ var _ = Describe("ClusterClient", func() { err := pubsub.Ping(ctx) Expect(err).NotTo(HaveOccurred()) }) + } + + Describe("ClusterClient", func() { + BeforeEach(func() { + opt = redisClusterOptions() + client = cluster.newClusterClient(ctx, opt) + + err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + return master.FlushDB(ctx).Err() + }) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + _ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + return master.FlushDB(ctx).Err() + }) + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("returns pool stats", func() { + stats := client.PoolStats() + Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{})) + }) + + It("returns an error when there are no attempts left", func() { + opt := redisClusterOptions() + opt.MaxRedirects = -1 + client := cluster.newClusterClient(ctx, opt) + + Eventually(func() error { + return client.SwapNodes(ctx, "A") + }, 30*time.Second).ShouldNot(HaveOccurred()) + + err := client.Get(ctx, "A").Err() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("MOVED")) + + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("calls fn for every master node", func() { + for i := 0; i < 10; i++ { + Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred()) + } + + err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + return master.FlushDB(ctx).Err() + }) + Expect(err).NotTo(HaveOccurred()) + + size, err := client.DBSize(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(0))) + }) + + It("should CLUSTER SLOTS", func() { + res, err := client.ClusterSlots(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(3)) + + wanted := []redis.ClusterSlot{{ + Start: 0, + End: 4999, + Nodes: []redis.ClusterNode{{ + ID: "", + Addr: "127.0.0.1:8220", + }, { + ID: "", + Addr: "127.0.0.1:8223", + }}, + }, { + Start: 5000, + End: 9999, + Nodes: []redis.ClusterNode{{ + ID: "", + Addr: "127.0.0.1:8221", + }, { + ID: "", + Addr: "127.0.0.1:8224", + }}, + }, { + Start: 10000, + End: 16383, + Nodes: []redis.ClusterNode{{ + ID: "", + Addr: "127.0.0.1:8222", + }, { + ID: "", + Addr: "127.0.0.1:8225", + }}, + }} + Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) + }) + + It("should CLUSTER NODES", func() { + res, err := client.ClusterNodes(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(res)).To(BeNumerically(">", 400)) + }) + + It("should CLUSTER INFO", func() { + res, err := client.ClusterInfo(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(ContainSubstring("cluster_known_nodes:6")) + }) + + It("should CLUSTER KEYSLOT", func() { + hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey")))) + }) + + It("should CLUSTER GETKEYSINSLOT", func() { + keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(keys)).To(Equal(0)) + }) + + It("should CLUSTER COUNT-FAILURE-REPORTS", func() { + n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + }) + + It("should CLUSTER COUNTKEYSINSLOT", func() { + n, err := client.ClusterCountKeysInSlot(ctx, 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + }) + + It("should CLUSTER SAVECONFIG", func() { + res, err := client.ClusterSaveConfig(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + }) + + It("should CLUSTER SLAVES", func() { + nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(nodesList).Should(ContainElement(ContainSubstring("slave"))) + Expect(nodesList).Should(HaveLen(1)) + }) + + It("should RANDOMKEY", func() { + const nkeys = 100 + + for i := 0; i < nkeys; i++ { + err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + } + + var keys []string + addKey := func(key string) { + for _, k := range keys { + if k == key { + return + } + } + keys = append(keys, key) + } + + for i := 0; i < nkeys*10; i++ { + key := client.RandomKey(ctx).Val() + addKey(key) + } + + Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10)) + }) It("supports Process hook", func() { err := client.Ping(ctx).Err() @@ -694,175 +863,6 @@ var _ = Describe("ClusterClient", func() { "cluster.AfterProcessPipeline", })) }) - } - - Describe("ClusterClient", func() { - BeforeEach(func() { - opt = redisClusterOptions() - client = cluster.newClusterClient(ctx, opt) - - err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { - return master.FlushDB(ctx).Err() - }) - Expect(err).NotTo(HaveOccurred()) - }) - - AfterEach(func() { - _ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { - return master.FlushDB(ctx).Err() - }) - Expect(client.Close()).NotTo(HaveOccurred()) - }) - - It("returns pool stats", func() { - stats := client.PoolStats() - Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{})) - }) - - It("returns an error when there are no attempts left", func() { - opt := redisClusterOptions() - opt.MaxRedirects = -1 - client := cluster.newClusterClient(ctx, opt) - - Eventually(func() error { - return client.SwapNodes(ctx, "A") - }, 30*time.Second).ShouldNot(HaveOccurred()) - - err := client.Get(ctx, "A").Err() - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("MOVED")) - - Expect(client.Close()).NotTo(HaveOccurred()) - }) - - It("calls fn for every master node", func() { - for i := 0; i < 10; i++ { - Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred()) - } - - err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { - return master.FlushDB(ctx).Err() - }) - Expect(err).NotTo(HaveOccurred()) - - size, err := client.DBSize(ctx).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(size).To(Equal(int64(0))) - }) - - It("should CLUSTER SLOTS", func() { - res, err := client.ClusterSlots(ctx).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(HaveLen(3)) - - wanted := []redis.ClusterSlot{{ - Start: 0, - End: 4999, - Nodes: []redis.ClusterNode{{ - ID: "", - Addr: "127.0.0.1:8220", - }, { - ID: "", - Addr: "127.0.0.1:8223", - }}, - }, { - Start: 5000, - End: 9999, - Nodes: []redis.ClusterNode{{ - ID: "", - Addr: "127.0.0.1:8221", - }, { - ID: "", - Addr: "127.0.0.1:8224", - }}, - }, { - Start: 10000, - End: 16383, - Nodes: []redis.ClusterNode{{ - ID: "", - Addr: "127.0.0.1:8222", - }, { - ID: "", - Addr: "127.0.0.1:8225", - }}, - }} - Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) - }) - - It("should CLUSTER NODES", func() { - res, err := client.ClusterNodes(ctx).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(res)).To(BeNumerically(">", 400)) - }) - - It("should CLUSTER INFO", func() { - res, err := client.ClusterInfo(ctx).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("cluster_known_nodes:6")) - }) - - It("should CLUSTER KEYSLOT", func() { - hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey")))) - }) - - It("should CLUSTER GETKEYSINSLOT", func() { - keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(keys)).To(Equal(0)) - }) - - It("should CLUSTER COUNT-FAILURE-REPORTS", func() { - n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(0))) - }) - - It("should CLUSTER COUNTKEYSINSLOT", func() { - n, err := client.ClusterCountKeysInSlot(ctx, 10).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(0))) - }) - - It("should CLUSTER SAVECONFIG", func() { - res, err := client.ClusterSaveConfig(ctx).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal("OK")) - }) - - It("should CLUSTER SLAVES", func() { - nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(nodesList).Should(ContainElement(ContainSubstring("slave"))) - Expect(nodesList).Should(HaveLen(1)) - }) - - It("should RANDOMKEY", func() { - const nkeys = 100 - - for i := 0; i < nkeys; i++ { - err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err() - Expect(err).NotTo(HaveOccurred()) - } - - var keys []string - addKey := func(key string) { - for _, k := range keys { - if k == key { - return - } - } - keys = append(keys, key) - } - - for i := 0; i < nkeys*10; i++ { - key := client.RandomKey(ctx).Val() - addKey(key) - } - - Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10)) - }) assertClusterClient() })