Compare commits

...

4 Commits

Author SHA1 Message Date
Cattī Crūdēlēs 0277f84bd7
Merge 960ff8bd39 into f1ffb55c9a 2024-11-21 13:35:46 +08:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
Cattī Crūdēlēs 960ff8bd39
Merge branch 'master' into otel 2024-09-26 22:27:59 +08:00
Cattī Crūdēlēs fc5be74fe9
fix(redisotel): fix buggy append in reportPoolStats
The current append twice to `conf.attrs` approach in `reportPoolStats` may result in unexpected idleAttrs,
due to `append` [can mutate](https://github.com/golang/go/issues/29115#issuecomment-444669036) the underlying array of the original slice,
as demonstrated at <https://go.dev/play/p/jwRMofH91eQ?v=goprev>.

Also, I replaced `metric.WithAttributes` in `reportPoolStats` with `metric.WithAttributeSet`,
since `WithAttributes` is just `WithAttributeSet` with some extra works that are not needed here,
see <https://pkg.go.dev/go.opentelemetry.io/otel/metric@v1.22.0#WithAttributes>.
2024-09-15 18:26:16 +08:00
2 changed files with 33 additions and 10 deletions

View File

@ -83,9 +83,9 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
} }
func reportPoolStats(rdb *redis.Client, conf *config) error { func reportPoolStats(rdb *redis.Client, conf *config) error {
labels := conf.attrs poolAttrs := attribute.NewSet(conf.attrs...)
idleAttrs := append(labels, attribute.String("state", "idle")) idleAttrs := attribute.NewSet(append(poolAttrs.ToSlice(), attribute.String("state", "idle"))...)
usedAttrs := append(labels, attribute.String("state", "used")) usedAttrs := attribute.NewSet(append(poolAttrs.ToSlice(), attribute.String("state", "used"))...)
idleMax, err := conf.meter.Int64ObservableUpDownCounter( idleMax, err := conf.meter.Int64ObservableUpDownCounter(
"db.client.connections.idle.max", "db.client.connections.idle.max",
@ -132,14 +132,14 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
func(ctx context.Context, o metric.Observer) error { func(ctx context.Context, o metric.Observer) error {
stats := rdb.PoolStats() stats := rdb.PoolStats()
o.ObserveInt64(idleMax, int64(redisConf.MaxIdleConns), metric.WithAttributes(labels...)) o.ObserveInt64(idleMax, int64(redisConf.MaxIdleConns), metric.WithAttributeSet(poolAttrs))
o.ObserveInt64(idleMin, int64(redisConf.MinIdleConns), metric.WithAttributes(labels...)) o.ObserveInt64(idleMin, int64(redisConf.MinIdleConns), metric.WithAttributeSet(poolAttrs))
o.ObserveInt64(connsMax, int64(redisConf.PoolSize), metric.WithAttributes(labels...)) o.ObserveInt64(connsMax, int64(redisConf.PoolSize), metric.WithAttributeSet(poolAttrs))
o.ObserveInt64(usage, int64(stats.IdleConns), metric.WithAttributes(idleAttrs...)) o.ObserveInt64(usage, int64(stats.IdleConns), metric.WithAttributeSet(idleAttrs))
o.ObserveInt64(usage, int64(stats.TotalConns-stats.IdleConns), metric.WithAttributes(usedAttrs...)) o.ObserveInt64(usage, int64(stats.TotalConns-stats.IdleConns), metric.WithAttributeSet(usedAttrs))
o.ObserveInt64(timeouts, int64(stats.Timeouts), metric.WithAttributes(labels...)) o.ObserveInt64(timeouts, int64(stats.Timeouts), metric.WithAttributeSet(poolAttrs))
return nil return nil
}, },
idleMax, idleMax,

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand" "github.com/redis/go-redis/v9/internal/rand"
) )
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be // ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic latency uint32 // atomic
generation uint32 // atomic generation uint32 // atomic
failing uint32 // atomic failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
} }
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes) latency = float64(dur) / float64(successes)
} }
atomic.StoreUint32(&n.latency, uint32(latency+0.5)) atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
} }
func (n *clusterNode) Latency() time.Duration { func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation) return atomic.LoadUint32(&n.generation)
} }
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) { func (n *clusterNode) SetGeneration(gen uint32) {
for { for {
v := atomic.LoadUint32(&n.generation) v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
} }
} }
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterNodes struct { type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock() c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0] c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes { for addr, node := range c.nodes {
if node.Generation() >= generation { if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr) c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency { if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency() go node.updateLatency()
} }
continue continue