forked from mirror/redis
Fix cluster pipeline tests.
This commit is contained in:
parent
a3eed908aa
commit
cd7431c40a
25
cluster.go
25
cluster.go
|
@ -95,18 +95,22 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
if clOpt.RouteByLatency {
|
if clOpt.RouteByLatency {
|
||||||
const probes = 10
|
node.updateLatency()
|
||||||
for i := 0; i < probes; i++ {
|
|
||||||
t1 := time.Now()
|
|
||||||
node.Client.Ping()
|
|
||||||
node.Latency += time.Since(t1)
|
|
||||||
}
|
|
||||||
node.Latency = node.Latency / probes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &node
|
return &node
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) updateLatency() {
|
||||||
|
const probes = 10
|
||||||
|
for i := 0; i < probes; i++ {
|
||||||
|
start := time.Now()
|
||||||
|
n.Client.Ping()
|
||||||
|
n.Latency += time.Since(start)
|
||||||
|
}
|
||||||
|
n.Latency = n.Latency / probes
|
||||||
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Loading() bool {
|
func (n *clusterNode) Loading() bool {
|
||||||
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
|
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
|
||||||
}
|
}
|
||||||
|
@ -290,6 +294,8 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
|
const threshold = time.Millisecond
|
||||||
|
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return c.nodes.Random()
|
return c.nodes.Random()
|
||||||
|
@ -297,7 +303,10 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
||||||
|
|
||||||
var node *clusterNode
|
var node *clusterNode
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
if node == nil || n.Latency < node.Latency {
|
if n.Loading() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if node == nil || node.Latency-n.Latency > threshold {
|
||||||
node = n
|
node = n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,15 +6,14 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
|
|
||||||
"gopkg.in/redis.v5"
|
"gopkg.in/redis.v5"
|
||||||
"gopkg.in/redis.v5/internal/hashtag"
|
"gopkg.in/redis.v5/internal/hashtag"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
|
|
||||||
type clusterScenario struct {
|
type clusterScenario struct {
|
||||||
|
@ -24,10 +23,6 @@ type clusterScenario struct {
|
||||||
clients map[string]*redis.Client
|
clients map[string]*redis.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *clusterScenario) primary() *redis.Client {
|
|
||||||
return s.clients[s.ports[0]]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *clusterScenario) masters() []*redis.Client {
|
func (s *clusterScenario) masters() []*redis.Client {
|
||||||
result := make([]*redis.Client, 3)
|
result := make([]*redis.Client, 3)
|
||||||
for pos, port := range s.ports[:3] {
|
for pos, port := range s.ports[:3] {
|
||||||
|
@ -157,6 +152,9 @@ func slotEqual(s1, s2 redis.ClusterSlot) bool {
|
||||||
if s1.End != s2.End {
|
if s1.End != s2.End {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
if len(s1.Nodes) != len(s2.Nodes) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
for i, n1 := range s1.Nodes {
|
for i, n1 := range s1.Nodes {
|
||||||
if n1.Addr != s2.Nodes[i].Addr {
|
if n1.Addr != s2.Nodes[i].Addr {
|
||||||
return false
|
return false
|
||||||
|
@ -182,9 +180,10 @@ func stopCluster(scenario *clusterScenario) error {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
var _ = Describe("ClusterClient", func() {
|
var _ = Describe("ClusterClient", func() {
|
||||||
|
var opt *redis.ClusterOptions
|
||||||
var client *redis.ClusterClient
|
var client *redis.ClusterClient
|
||||||
|
|
||||||
describeClusterClient := func() {
|
assertClusterClient := func() {
|
||||||
It("should CLUSTER SLOTS", func() {
|
It("should CLUSTER SLOTS", func() {
|
||||||
res, err := client.ClusterSlots().Result()
|
res, err := client.ClusterSlots().Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -377,12 +376,14 @@ var _ = Describe("ClusterClient", func() {
|
||||||
var pipe *redis.Pipeline
|
var pipe *redis.Pipeline
|
||||||
|
|
||||||
assertPipeline := func() {
|
assertPipeline := func() {
|
||||||
It("follows redirects", func() {
|
|
||||||
slot := hashtag.Slot("A")
|
|
||||||
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
|
||||||
|
|
||||||
keys := []string{"A", "B", "C", "D", "E", "F", "G"}
|
keys := []string{"A", "B", "C", "D", "E", "F", "G"}
|
||||||
|
|
||||||
|
It("follows redirects", func() {
|
||||||
|
for _, key := range keys {
|
||||||
|
slot := hashtag.Slot(key)
|
||||||
|
client.SwapSlotNodes(slot)
|
||||||
|
}
|
||||||
|
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
pipe.Set(key, key+"_value", 0)
|
pipe.Set(key, key+"_value", 0)
|
||||||
pipe.Expire(key, time.Duration(i+1)*time.Hour)
|
pipe.Expire(key, time.Duration(i+1)*time.Hour)
|
||||||
|
@ -391,6 +392,15 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(14))
|
Expect(cmds).To(HaveLen(14))
|
||||||
|
|
||||||
|
if opt.RouteByLatency {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
slot := hashtag.Slot(key)
|
||||||
|
client.SwapSlotNodes(slot)
|
||||||
|
}
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
pipe.Get(key)
|
pipe.Get(key)
|
||||||
pipe.TTL(key)
|
pipe.TTL(key)
|
||||||
|
@ -398,25 +408,26 @@ var _ = Describe("ClusterClient", func() {
|
||||||
cmds, err = pipe.Exec()
|
cmds, err = pipe.Exec()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(14))
|
Expect(cmds).To(HaveLen(14))
|
||||||
Expect(cmds[0].(*redis.StringCmd).Val()).To(Equal("A_value"))
|
|
||||||
Expect(cmds[1].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second))
|
for i, key := range keys {
|
||||||
Expect(cmds[6].(*redis.StringCmd).Val()).To(Equal("D_value"))
|
get := cmds[i*2].(*redis.StringCmd)
|
||||||
Expect(cmds[7].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second))
|
Expect(get.Val()).To(Equal(key + "_value"))
|
||||||
Expect(cmds[12].(*redis.StringCmd).Val()).To(Equal("G_value"))
|
|
||||||
Expect(cmds[13].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second))
|
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
|
||||||
|
Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("works with missing keys", func() {
|
It("works with missing keys", func() {
|
||||||
Expect(client.Set("A", "A_value", 0).Err()).NotTo(HaveOccurred())
|
pipe.Set("A", "A_value", 0)
|
||||||
Expect(client.Set("C", "C_value", 0).Err()).NotTo(HaveOccurred())
|
pipe.Set("C", "C_value", 0)
|
||||||
|
_, err := pipe.Exec()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
var a, b, c *redis.StringCmd
|
a := pipe.Get("A")
|
||||||
cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error {
|
b := pipe.Get("B")
|
||||||
a = pipe.Get("A")
|
c := pipe.Get("C")
|
||||||
b = pipe.Get("B")
|
cmds, err := pipe.Exec()
|
||||||
c = pipe.Get("C")
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
Expect(err).To(Equal(redis.Nil))
|
Expect(err).To(Equal(redis.Nil))
|
||||||
Expect(cmds).To(HaveLen(3))
|
Expect(cmds).To(HaveLen(3))
|
||||||
|
|
||||||
|
@ -476,7 +487,8 @@ var _ = Describe("ClusterClient", func() {
|
||||||
|
|
||||||
Describe("default ClusterClient", func() {
|
Describe("default ClusterClient", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = cluster.clusterClient(redisClusterOptions())
|
opt = redisClusterOptions()
|
||||||
|
client = cluster.clusterClient(opt)
|
||||||
|
|
||||||
_ = client.ForEachMaster(func(master *redis.Client) error {
|
_ = client.ForEachMaster(func(master *redis.Client) error {
|
||||||
return master.FlushDb().Err()
|
return master.FlushDb().Err()
|
||||||
|
@ -487,12 +499,12 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
describeClusterClient()
|
assertClusterClient()
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("ClusterClient with RouteByLatency", func() {
|
Describe("ClusterClient with RouteByLatency", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
opt := redisClusterOptions()
|
opt = redisClusterOptions()
|
||||||
opt.RouteByLatency = true
|
opt.RouteByLatency = true
|
||||||
client = cluster.clusterClient(opt)
|
client = cluster.clusterClient(opt)
|
||||||
|
|
||||||
|
@ -506,7 +518,7 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
describeClusterClient()
|
assertClusterClient()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue