forked from mirror/redis
Periodically update Cluster node latency
This commit is contained in:
parent
513fcfb224
commit
83d698027f
23
cluster.go
23
cluster.go
|
@ -188,16 +188,19 @@ func (n *clusterNode) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) updateLatency() {
|
func (n *clusterNode) updateLatency() {
|
||||||
const probes = 10
|
const numProbe = 10
|
||||||
|
var dur uint64
|
||||||
|
|
||||||
var latency uint32
|
for i := 0; i < numProbe; i++ {
|
||||||
for i := 0; i < probes; i++ {
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
n.Client.Ping(context.TODO())
|
n.Client.Ping(context.TODO())
|
||||||
probe := uint32(time.Since(start) / time.Microsecond)
|
dur += uint64(time.Since(start) / time.Microsecond)
|
||||||
latency = (latency + probe) / 2
|
|
||||||
|
time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&n.latency, latency)
|
|
||||||
|
latency := float64(dur) / float64(numProbe)
|
||||||
|
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Latency() time.Duration {
|
func (n *clusterNode) Latency() time.Duration {
|
||||||
|
@ -318,6 +321,9 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
for addr, node := range c.nodes {
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
c.activeAddrs = append(c.activeAddrs, addr)
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
|
if c.opt.RouteByLatency {
|
||||||
|
go node.updateLatency()
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -548,8 +554,6 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
const threshold = time.Millisecond
|
|
||||||
|
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return c.nodes.Random()
|
return c.nodes.Random()
|
||||||
|
@ -560,13 +564,14 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
if n.Failing() {
|
if n.Failing() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if node == nil || node.Latency()-n.Latency() > threshold {
|
if node == nil || n.Latency() < node.Latency() {
|
||||||
node = n
|
node = n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if node != nil {
|
if node != nil {
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all nodes are failing - return random node
|
// If all nodes are failing - return random node
|
||||||
return c.nodes.Random()
|
return c.nodes.Random()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue