diff --git a/cluster.go b/cluster.go index 51ceb09..c9b178a 100644 --- a/cluster.go +++ b/cluster.go @@ -188,16 +188,19 @@ func (n *clusterNode) Close() error { } func (n *clusterNode) updateLatency() { - const probes = 10 + const numProbe = 10 + var dur uint64 - var latency uint32 - for i := 0; i < probes; i++ { + for i := 0; i < numProbe; i++ { start := time.Now() n.Client.Ping(context.TODO()) - probe := uint32(time.Since(start) / time.Microsecond) - latency = (latency + probe) / 2 + dur += uint64(time.Since(start) / time.Microsecond) + + time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond) } - atomic.StoreUint32(&n.latency, latency) + + latency := float64(dur) / float64(numProbe) + atomic.StoreUint32(&n.latency, uint32(latency+0.5)) } func (n *clusterNode) Latency() time.Duration { @@ -318,6 +321,9 @@ func (c *clusterNodes) GC(generation uint32) { for addr, node := range c.nodes { if node.Generation() >= generation { c.activeAddrs = append(c.activeAddrs, addr) + if c.opt.RouteByLatency { + go node.updateLatency() + } continue } @@ -548,8 +554,6 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { } func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { - const threshold = time.Millisecond - nodes := c.slotNodes(slot) if len(nodes) == 0 { return c.nodes.Random() @@ -560,13 +564,14 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { if n.Failing() { continue } - if node == nil || node.Latency()-n.Latency() > threshold { + if node == nil || n.Latency() < node.Latency() { node = n } } if node != nil { return node, nil } + // If all nodes are failing - return random node return c.nodes.Random() }