forked from mirror/redis
cluster: don't reset command when there no attempts left.
This commit is contained in:
parent
2507be6cd2
commit
7f1eb05ba8
|
@ -130,6 +130,10 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
cmd.reset()
|
||||||
|
}
|
||||||
|
|
||||||
if ask {
|
if ask {
|
||||||
pipe := client.Pipeline()
|
pipe := client.Pipeline()
|
||||||
pipe.Process(NewCmd("ASKING"))
|
pipe.Process(NewCmd("ASKING"))
|
||||||
|
@ -152,7 +156,6 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cmd.reset()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,7 +170,6 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cmd.reset()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,6 +284,9 @@ type ClusterOptions struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *ClusterOptions) getMaxRedirects() int {
|
func (opt *ClusterOptions) getMaxRedirects() int {
|
||||||
|
if opt.MaxRedirects == -1 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
if opt.MaxRedirects == 0 {
|
if opt.MaxRedirects == 0 {
|
||||||
return 16
|
return 16
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,14 +40,16 @@ func (s *clusterScenario) slaves() []*redis.Client {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *clusterScenario) clusterClient() *redis.ClusterClient {
|
func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
|
||||||
addrs := make([]string, len(s.ports))
|
addrs := make([]string, len(s.ports))
|
||||||
for i, port := range s.ports {
|
for i, port := range s.ports {
|
||||||
addrs[i] = net.JoinHostPort("127.0.0.1", port)
|
addrs[i] = net.JoinHostPort("127.0.0.1", port)
|
||||||
}
|
}
|
||||||
return redis.NewClusterClient(&redis.ClusterOptions{
|
if opt == nil {
|
||||||
Addrs: addrs,
|
opt = &redis.ClusterOptions{}
|
||||||
})
|
}
|
||||||
|
opt.Addrs = addrs
|
||||||
|
return redis.NewClusterClient(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startCluster(scenario *clusterScenario) error {
|
func startCluster(scenario *clusterScenario) error {
|
||||||
|
@ -228,7 +230,7 @@ var _ = Describe("Cluster", func() {
|
||||||
var client *redis.ClusterClient
|
var client *redis.ClusterClient
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = scenario.clusterClient()
|
client = scenario.clusterClient(nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
|
@ -301,6 +303,18 @@ var _ = Describe("Cluster", func() {
|
||||||
Expect(cmds[27].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second))
|
Expect(cmds[27].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should return error when there are no attempts left", func() {
|
||||||
|
client = scenario.clusterClient(&redis.ClusterOptions{
|
||||||
|
MaxRedirects: -1,
|
||||||
|
})
|
||||||
|
|
||||||
|
slot := redis.HashSlot("A")
|
||||||
|
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
|
err := client.Get("A").Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -317,7 +331,7 @@ func BenchmarkRedisClusterPing(b *testing.B) {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
defer stopCluster(scenario)
|
defer stopCluster(scenario)
|
||||||
client := scenario.clusterClient()
|
client := scenario.clusterClient(nil)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
Loading…
Reference in New Issue