diff --git a/osscluster.go b/osscluster.go index ce258ff3..72e922a8 100644 --- a/osscluster.go +++ b/osscluster.go @@ -21,6 +21,10 @@ import ( "github.com/redis/go-redis/v9/internal/rand" ) +const ( + minLatencyMeasurementInterval = 10 * time.Second +) + var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") // ClusterOptions are used to configure a cluster client and should be @@ -316,6 +320,10 @@ type clusterNode struct { latency uint32 // atomic generation uint32 // atomic failing uint32 // atomic + + // last time the latency measurement was performed for the node, stored in nanoseconds + // from epoch + lastLatencyMeasurement int64 // atomic } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() { latency = float64(dur) / float64(successes) } atomic.StoreUint32(&n.latency, uint32(latency+0.5)) + n.SetLastLatencyMeasurement(time.Now()) } func (n *clusterNode) Latency() time.Duration { @@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 { return atomic.LoadUint32(&n.generation) } +func (n *clusterNode) LastLatencyMeasurement() int64 { + return atomic.LoadInt64(&n.lastLatencyMeasurement) +} + func (n *clusterNode) SetGeneration(gen uint32) { for { v := atomic.LoadUint32(&n.generation) @@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) { } } +func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) { + for { + v := atomic.LoadInt64(&n.lastLatencyMeasurement) + if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) { + break + } + } +} + //------------------------------------------------------------------------------ type clusterNodes struct { @@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) { c.mu.Lock() c.activeAddrs = c.activeAddrs[:0] + now := time.Now() for addr, node := range c.nodes { if node.Generation() >= generation { c.activeAddrs = append(c.activeAddrs, addr) - if c.opt.RouteByLatency { + if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() { go node.updateLatency() } continue