mirror of https://github.com/go-redis/redis.git
Compare commits
4 Commits
4a9458ea65
...
0277f84bd7
Author | SHA1 | Date |
---|---|---|
Cattī Crūdēlēs | 0277f84bd7 | |
Justin | f1ffb55c9a | |
Cattī Crūdēlēs | 960ff8bd39 | |
Cattī Crūdēlēs | fc5be74fe9 |
|
@ -83,9 +83,9 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
|
|||
}
|
||||
|
||||
func reportPoolStats(rdb *redis.Client, conf *config) error {
|
||||
labels := conf.attrs
|
||||
idleAttrs := append(labels, attribute.String("state", "idle"))
|
||||
usedAttrs := append(labels, attribute.String("state", "used"))
|
||||
poolAttrs := attribute.NewSet(conf.attrs...)
|
||||
idleAttrs := attribute.NewSet(append(poolAttrs.ToSlice(), attribute.String("state", "idle"))...)
|
||||
usedAttrs := attribute.NewSet(append(poolAttrs.ToSlice(), attribute.String("state", "used"))...)
|
||||
|
||||
idleMax, err := conf.meter.Int64ObservableUpDownCounter(
|
||||
"db.client.connections.idle.max",
|
||||
|
@ -132,14 +132,14 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
|
|||
func(ctx context.Context, o metric.Observer) error {
|
||||
stats := rdb.PoolStats()
|
||||
|
||||
o.ObserveInt64(idleMax, int64(redisConf.MaxIdleConns), metric.WithAttributes(labels...))
|
||||
o.ObserveInt64(idleMin, int64(redisConf.MinIdleConns), metric.WithAttributes(labels...))
|
||||
o.ObserveInt64(connsMax, int64(redisConf.PoolSize), metric.WithAttributes(labels...))
|
||||
o.ObserveInt64(idleMax, int64(redisConf.MaxIdleConns), metric.WithAttributeSet(poolAttrs))
|
||||
o.ObserveInt64(idleMin, int64(redisConf.MinIdleConns), metric.WithAttributeSet(poolAttrs))
|
||||
o.ObserveInt64(connsMax, int64(redisConf.PoolSize), metric.WithAttributeSet(poolAttrs))
|
||||
|
||||
o.ObserveInt64(usage, int64(stats.IdleConns), metric.WithAttributes(idleAttrs...))
|
||||
o.ObserveInt64(usage, int64(stats.TotalConns-stats.IdleConns), metric.WithAttributes(usedAttrs...))
|
||||
o.ObserveInt64(usage, int64(stats.IdleConns), metric.WithAttributeSet(idleAttrs))
|
||||
o.ObserveInt64(usage, int64(stats.TotalConns-stats.IdleConns), metric.WithAttributeSet(usedAttrs))
|
||||
|
||||
o.ObserveInt64(timeouts, int64(stats.Timeouts), metric.WithAttributes(labels...))
|
||||
o.ObserveInt64(timeouts, int64(stats.Timeouts), metric.WithAttributeSet(poolAttrs))
|
||||
return nil
|
||||
},
|
||||
idleMax,
|
||||
|
|
|
@ -21,6 +21,10 @@ import (
|
|||
"github.com/redis/go-redis/v9/internal/rand"
|
||||
)
|
||||
|
||||
const (
|
||||
minLatencyMeasurementInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||
|
||||
// ClusterOptions are used to configure a cluster client and should be
|
||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
|||
latency uint32 // atomic
|
||||
generation uint32 // atomic
|
||||
failing uint32 // atomic
|
||||
|
||||
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||
// from epoch
|
||||
lastLatencyMeasurement int64 // atomic
|
||||
}
|
||||
|
||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
|||
latency = float64(dur) / float64(successes)
|
||||
}
|
||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||
n.SetLastLatencyMeasurement(time.Now())
|
||||
}
|
||||
|
||||
func (n *clusterNode) Latency() time.Duration {
|
||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
|||
return atomic.LoadUint32(&n.generation)
|
||||
}
|
||||
|
||||
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||
}
|
||||
|
||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||
for {
|
||||
v := atomic.LoadUint32(&n.generation)
|
||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||
for {
|
||||
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type clusterNodes struct {
|
||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
|||
c.mu.Lock()
|
||||
|
||||
c.activeAddrs = c.activeAddrs[:0]
|
||||
now := time.Now()
|
||||
for addr, node := range c.nodes {
|
||||
if node.Generation() >= generation {
|
||||
c.activeAddrs = append(c.activeAddrs, addr)
|
||||
if c.opt.RouteByLatency {
|
||||
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||
go node.updateLatency()
|
||||
}
|
||||
continue
|
||||
|
|
Loading…
Reference in New Issue