diff --git a/cluster.go b/cluster.go index 6e3c302..76899ea 100644 --- a/cluster.go +++ b/cluster.go @@ -95,18 +95,22 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { } if clOpt.RouteByLatency { - const probes = 10 - for i := 0; i < probes; i++ { - t1 := time.Now() - node.Client.Ping() - node.Latency += time.Since(t1) - } - node.Latency = node.Latency / probes + node.updateLatency() } return &node } +func (n *clusterNode) updateLatency() { + const probes = 10 + for i := 0; i < probes; i++ { + start := time.Now() + n.Client.Ping() + n.Latency += time.Since(start) + } + n.Latency = n.Latency / probes +} + func (n *clusterNode) Loading() bool { return !n.loading.IsZero() && time.Since(n.loading) < time.Minute } @@ -290,6 +294,8 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { } func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { + const threshold = time.Millisecond + nodes := c.slotNodes(slot) if len(nodes) == 0 { return c.nodes.Random() @@ -297,7 +303,10 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { var node *clusterNode for _, n := range nodes { - if node == nil || n.Latency < node.Latency { + if n.Loading() { + continue + } + if node == nil || node.Latency-n.Latency > threshold { node = n } } diff --git a/cluster_test.go b/cluster_test.go index d6707ef..589ef98 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -6,15 +6,14 @@ import ( "strconv" "strings" "sync" - "testing" "time" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "gopkg.in/redis.v5" "gopkg.in/redis.v5/internal/hashtag" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" ) type clusterScenario struct { @@ -24,10 +23,6 @@ type clusterScenario struct { clients map[string]*redis.Client } -func (s *clusterScenario) primary() *redis.Client { - return s.clients[s.ports[0]] -} - func (s *clusterScenario) masters() []*redis.Client { result := make([]*redis.Client, 3) for pos, port := range s.ports[:3] { @@ -157,6 +152,9 @@ func slotEqual(s1, s2 redis.ClusterSlot) bool { if s1.End != s2.End { return false } + if len(s1.Nodes) != len(s2.Nodes) { + return false + } for i, n1 := range s1.Nodes { if n1.Addr != s2.Nodes[i].Addr { return false @@ -182,9 +180,10 @@ func stopCluster(scenario *clusterScenario) error { //------------------------------------------------------------------------------ var _ = Describe("ClusterClient", func() { + var opt *redis.ClusterOptions var client *redis.ClusterClient - describeClusterClient := func() { + assertClusterClient := func() { It("should CLUSTER SLOTS", func() { res, err := client.ClusterSlots().Result() Expect(err).NotTo(HaveOccurred()) @@ -377,11 +376,13 @@ var _ = Describe("ClusterClient", func() { var pipe *redis.Pipeline assertPipeline := func() { - It("follows redirects", func() { - slot := hashtag.Slot("A") - Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + keys := []string{"A", "B", "C", "D", "E", "F", "G"} - keys := []string{"A", "B", "C", "D", "E", "F", "G"} + It("follows redirects", func() { + for _, key := range keys { + slot := hashtag.Slot(key) + client.SwapSlotNodes(slot) + } for i, key := range keys { pipe.Set(key, key+"_value", 0) @@ -391,6 +392,15 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) + if opt.RouteByLatency { + return + } + + for _, key := range keys { + slot := hashtag.Slot(key) + client.SwapSlotNodes(slot) + } + for _, key := range keys { pipe.Get(key) pipe.TTL(key) @@ -398,25 +408,26 @@ var _ = Describe("ClusterClient", func() { cmds, err = pipe.Exec() Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) - Expect(cmds[0].(*redis.StringCmd).Val()).To(Equal("A_value")) - Expect(cmds[1].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second)) - Expect(cmds[6].(*redis.StringCmd).Val()).To(Equal("D_value")) - Expect(cmds[7].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second)) - Expect(cmds[12].(*redis.StringCmd).Val()).To(Equal("G_value")) - Expect(cmds[13].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second)) + + for i, key := range keys { + get := cmds[i*2].(*redis.StringCmd) + Expect(get.Val()).To(Equal(key + "_value")) + + ttl := cmds[(i*2)+1].(*redis.DurationCmd) + Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second)) + } }) It("works with missing keys", func() { - Expect(client.Set("A", "A_value", 0).Err()).NotTo(HaveOccurred()) - Expect(client.Set("C", "C_value", 0).Err()).NotTo(HaveOccurred()) + pipe.Set("A", "A_value", 0) + pipe.Set("C", "C_value", 0) + _, err := pipe.Exec() + Expect(err).NotTo(HaveOccurred()) - var a, b, c *redis.StringCmd - cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error { - a = pipe.Get("A") - b = pipe.Get("B") - c = pipe.Get("C") - return nil - }) + a := pipe.Get("A") + b := pipe.Get("B") + c := pipe.Get("C") + cmds, err := pipe.Exec() Expect(err).To(Equal(redis.Nil)) Expect(cmds).To(HaveLen(3)) @@ -476,7 +487,8 @@ var _ = Describe("ClusterClient", func() { Describe("default ClusterClient", func() { BeforeEach(func() { - client = cluster.clusterClient(redisClusterOptions()) + opt = redisClusterOptions() + client = cluster.clusterClient(opt) _ = client.ForEachMaster(func(master *redis.Client) error { return master.FlushDb().Err() @@ -487,12 +499,12 @@ var _ = Describe("ClusterClient", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - describeClusterClient() + assertClusterClient() }) Describe("ClusterClient with RouteByLatency", func() { BeforeEach(func() { - opt := redisClusterOptions() + opt = redisClusterOptions() opt.RouteByLatency = true client = cluster.clusterClient(opt) @@ -506,7 +518,7 @@ var _ = Describe("ClusterClient", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - describeClusterClient() + assertClusterClient() }) })