mirror of https://github.com/go-redis/redis.git
Compare commits
3 Commits
d4bc9ea832
...
60a5ae1090
Author | SHA1 | Date |
---|---|---|
KyleKincer | 60a5ae1090 | |
Justin | f1ffb55c9a | |
Kyle Kincer | 7b8b3a603d |
|
@ -216,6 +216,7 @@ func (mh *metricsHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
|
||||||
attrs = append(attrs, mh.attrs...)
|
attrs = append(attrs, mh.attrs...)
|
||||||
attrs = append(attrs, attribute.String("type", "command"))
|
attrs = append(attrs, attribute.String("type", "command"))
|
||||||
attrs = append(attrs, statusAttr(err))
|
attrs = append(attrs, statusAttr(err))
|
||||||
|
attrs = append(attrs, attribute.String("cmd", cmd.FullName()))
|
||||||
|
|
||||||
mh.useTime.Record(ctx, milliseconds(dur), metric.WithAttributes(attrs...))
|
mh.useTime.Record(ctx, milliseconds(dur), metric.WithAttributes(attrs...))
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,10 @@ import (
|
||||||
"github.com/redis/go-redis/v9/internal/rand"
|
"github.com/redis/go-redis/v9/internal/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minLatencyMeasurementInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||||
|
|
||||||
// ClusterOptions are used to configure a cluster client and should be
|
// ClusterOptions are used to configure a cluster client and should be
|
||||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
||||||
latency uint32 // atomic
|
latency uint32 // atomic
|
||||||
generation uint32 // atomic
|
generation uint32 // atomic
|
||||||
failing uint32 // atomic
|
failing uint32 // atomic
|
||||||
|
|
||||||
|
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||||
|
// from epoch
|
||||||
|
lastLatencyMeasurement int64 // atomic
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
||||||
latency = float64(dur) / float64(successes)
|
latency = float64(dur) / float64(successes)
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||||
|
n.SetLastLatencyMeasurement(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Latency() time.Duration {
|
func (n *clusterNode) Latency() time.Duration {
|
||||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
||||||
return atomic.LoadUint32(&n.generation)
|
return atomic.LoadUint32(&n.generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||||
|
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
for {
|
for {
|
||||||
v := atomic.LoadUint32(&n.generation)
|
v := atomic.LoadUint32(&n.generation)
|
||||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||||
|
for {
|
||||||
|
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type clusterNodes struct {
|
type clusterNodes struct {
|
||||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|
||||||
c.activeAddrs = c.activeAddrs[:0]
|
c.activeAddrs = c.activeAddrs[:0]
|
||||||
|
now := time.Now()
|
||||||
for addr, node := range c.nodes {
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
c.activeAddrs = append(c.activeAddrs, addr)
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
if c.opt.RouteByLatency {
|
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||||
go node.updateLatency()
|
go node.updateLatency()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in New Issue