forked from mirror/redis
Merge pull request #1484 from go-redis/fix/cluster-node-latency
Periodically update Cluster node latency
This commit is contained in:
commit
afb0064872
23
cluster.go
23
cluster.go
|
@ -193,16 +193,19 @@ func (n *clusterNode) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) updateLatency() {
|
func (n *clusterNode) updateLatency() {
|
||||||
const probes = 10
|
const numProbe = 10
|
||||||
|
var dur uint64
|
||||||
|
|
||||||
|
for i := 0; i < numProbe; i++ {
|
||||||
|
time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
|
||||||
|
|
||||||
var latency uint32
|
|
||||||
for i := 0; i < probes; i++ {
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
n.Client.Ping(context.TODO())
|
n.Client.Ping(context.TODO())
|
||||||
probe := uint32(time.Since(start) / time.Microsecond)
|
dur += uint64(time.Since(start) / time.Microsecond)
|
||||||
latency = (latency + probe) / 2
|
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&n.latency, latency)
|
|
||||||
|
latency := float64(dur) / float64(numProbe)
|
||||||
|
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Latency() time.Duration {
|
func (n *clusterNode) Latency() time.Duration {
|
||||||
|
@ -323,6 +326,9 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
for addr, node := range c.nodes {
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
c.activeAddrs = append(c.activeAddrs, addr)
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
|
if c.opt.RouteByLatency {
|
||||||
|
go node.updateLatency()
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -553,8 +559,6 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
const threshold = time.Millisecond
|
|
||||||
|
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return c.nodes.Random()
|
return c.nodes.Random()
|
||||||
|
@ -565,13 +569,14 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
if n.Failing() {
|
if n.Failing() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if node == nil || node.Latency()-n.Latency() > threshold {
|
if node == nil || n.Latency() < node.Latency() {
|
||||||
node = n
|
node = n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if node != nil {
|
if node != nil {
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all nodes are failing - return random node
|
// If all nodes are failing - return random node
|
||||||
return c.nodes.Random()
|
return c.nodes.Random()
|
||||||
}
|
}
|
||||||
|
|
338
cluster_test.go
338
cluster_test.go
|
@ -522,6 +522,175 @@ var _ = Describe("ClusterClient", func() {
|
||||||
err := pubsub.Ping(ctx)
|
err := pubsub.Ping(ctx)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
Describe("ClusterClient", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
opt = redisClusterOptions()
|
||||||
|
client = cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
return master.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
return master.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns pool stats", func() {
|
||||||
|
stats := client.PoolStats()
|
||||||
|
Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns an error when there are no attempts left", func() {
|
||||||
|
opt := redisClusterOptions()
|
||||||
|
opt.MaxRedirects = -1
|
||||||
|
client := cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
Eventually(func() error {
|
||||||
|
return client.SwapNodes(ctx, "A")
|
||||||
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
||||||
|
|
||||||
|
err := client.Get(ctx, "A").Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("calls fn for every master node", func() {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
||||||
|
return master.FlushDB(ctx).Err()
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
size, err := client.DBSize(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(size).To(Equal(int64(0)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER SLOTS", func() {
|
||||||
|
res, err := client.ClusterSlots(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(HaveLen(3))
|
||||||
|
|
||||||
|
wanted := []redis.ClusterSlot{{
|
||||||
|
Start: 0,
|
||||||
|
End: 4999,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
ID: "",
|
||||||
|
Addr: "127.0.0.1:8220",
|
||||||
|
}, {
|
||||||
|
ID: "",
|
||||||
|
Addr: "127.0.0.1:8223",
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
Start: 5000,
|
||||||
|
End: 9999,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
ID: "",
|
||||||
|
Addr: "127.0.0.1:8221",
|
||||||
|
}, {
|
||||||
|
ID: "",
|
||||||
|
Addr: "127.0.0.1:8224",
|
||||||
|
}},
|
||||||
|
}, {
|
||||||
|
Start: 10000,
|
||||||
|
End: 16383,
|
||||||
|
Nodes: []redis.ClusterNode{{
|
||||||
|
ID: "",
|
||||||
|
Addr: "127.0.0.1:8222",
|
||||||
|
}, {
|
||||||
|
ID: "",
|
||||||
|
Addr: "127.0.0.1:8225",
|
||||||
|
}},
|
||||||
|
}}
|
||||||
|
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER NODES", func() {
|
||||||
|
res, err := client.ClusterNodes(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(len(res)).To(BeNumerically(">", 400))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER INFO", func() {
|
||||||
|
res, err := client.ClusterInfo(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER KEYSLOT", func() {
|
||||||
|
hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER GETKEYSINSLOT", func() {
|
||||||
|
keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(len(keys)).To(Equal(0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
|
||||||
|
n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(0)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER COUNTKEYSINSLOT", func() {
|
||||||
|
n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(0)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER SAVECONFIG", func() {
|
||||||
|
res, err := client.ClusterSaveConfig(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal("OK"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER SLAVES", func() {
|
||||||
|
nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
|
||||||
|
Expect(nodesList).Should(HaveLen(1))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should RANDOMKEY", func() {
|
||||||
|
const nkeys = 100
|
||||||
|
|
||||||
|
for i := 0; i < nkeys; i++ {
|
||||||
|
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
var keys []string
|
||||||
|
addKey := func(key string) {
|
||||||
|
for _, k := range keys {
|
||||||
|
if k == key {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < nkeys*10; i++ {
|
||||||
|
key := client.RandomKey(ctx).Val()
|
||||||
|
addKey(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
|
||||||
|
})
|
||||||
|
|
||||||
It("supports Process hook", func() {
|
It("supports Process hook", func() {
|
||||||
err := client.Ping(ctx).Err()
|
err := client.Ping(ctx).Err()
|
||||||
|
@ -694,175 +863,6 @@ var _ = Describe("ClusterClient", func() {
|
||||||
"cluster.AfterProcessPipeline",
|
"cluster.AfterProcessPipeline",
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
Describe("ClusterClient", func() {
|
|
||||||
BeforeEach(func() {
|
|
||||||
opt = redisClusterOptions()
|
|
||||||
client = cluster.newClusterClient(ctx, opt)
|
|
||||||
|
|
||||||
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
||||||
return master.FlushDB(ctx).Err()
|
|
||||||
})
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
AfterEach(func() {
|
|
||||||
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
||||||
return master.FlushDB(ctx).Err()
|
|
||||||
})
|
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("returns pool stats", func() {
|
|
||||||
stats := client.PoolStats()
|
|
||||||
Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("returns an error when there are no attempts left", func() {
|
|
||||||
opt := redisClusterOptions()
|
|
||||||
opt.MaxRedirects = -1
|
|
||||||
client := cluster.newClusterClient(ctx, opt)
|
|
||||||
|
|
||||||
Eventually(func() error {
|
|
||||||
return client.SwapNodes(ctx, "A")
|
|
||||||
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
||||||
|
|
||||||
err := client.Get(ctx, "A").Err()
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
|
||||||
|
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("calls fn for every master node", func() {
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
|
|
||||||
}
|
|
||||||
|
|
||||||
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
||||||
return master.FlushDB(ctx).Err()
|
|
||||||
})
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
size, err := client.DBSize(ctx).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(size).To(Equal(int64(0)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER SLOTS", func() {
|
|
||||||
res, err := client.ClusterSlots(ctx).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(res).To(HaveLen(3))
|
|
||||||
|
|
||||||
wanted := []redis.ClusterSlot{{
|
|
||||||
Start: 0,
|
|
||||||
End: 4999,
|
|
||||||
Nodes: []redis.ClusterNode{{
|
|
||||||
ID: "",
|
|
||||||
Addr: "127.0.0.1:8220",
|
|
||||||
}, {
|
|
||||||
ID: "",
|
|
||||||
Addr: "127.0.0.1:8223",
|
|
||||||
}},
|
|
||||||
}, {
|
|
||||||
Start: 5000,
|
|
||||||
End: 9999,
|
|
||||||
Nodes: []redis.ClusterNode{{
|
|
||||||
ID: "",
|
|
||||||
Addr: "127.0.0.1:8221",
|
|
||||||
}, {
|
|
||||||
ID: "",
|
|
||||||
Addr: "127.0.0.1:8224",
|
|
||||||
}},
|
|
||||||
}, {
|
|
||||||
Start: 10000,
|
|
||||||
End: 16383,
|
|
||||||
Nodes: []redis.ClusterNode{{
|
|
||||||
ID: "",
|
|
||||||
Addr: "127.0.0.1:8222",
|
|
||||||
}, {
|
|
||||||
ID: "",
|
|
||||||
Addr: "127.0.0.1:8225",
|
|
||||||
}},
|
|
||||||
}}
|
|
||||||
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER NODES", func() {
|
|
||||||
res, err := client.ClusterNodes(ctx).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(len(res)).To(BeNumerically(">", 400))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER INFO", func() {
|
|
||||||
res, err := client.ClusterInfo(ctx).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER KEYSLOT", func() {
|
|
||||||
hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER GETKEYSINSLOT", func() {
|
|
||||||
keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(len(keys)).To(Equal(0))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
|
|
||||||
n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(0)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER COUNTKEYSINSLOT", func() {
|
|
||||||
n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(0)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER SAVECONFIG", func() {
|
|
||||||
res, err := client.ClusterSaveConfig(ctx).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(res).To(Equal("OK"))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should CLUSTER SLAVES", func() {
|
|
||||||
nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
|
|
||||||
Expect(nodesList).Should(HaveLen(1))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should RANDOMKEY", func() {
|
|
||||||
const nkeys = 100
|
|
||||||
|
|
||||||
for i := 0; i < nkeys; i++ {
|
|
||||||
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}
|
|
||||||
|
|
||||||
var keys []string
|
|
||||||
addKey := func(key string) {
|
|
||||||
for _, k := range keys {
|
|
||||||
if k == key {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
keys = append(keys, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < nkeys*10; i++ {
|
|
||||||
key := client.RandomKey(ctx).Val()
|
|
||||||
addKey(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
|
|
||||||
})
|
|
||||||
|
|
||||||
assertClusterClient()
|
assertClusterClient()
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue