mirror of https://github.com/go-redis/redis.git
commit
19c1c2272e
|
@ -583,7 +583,7 @@ func (c *ClusterClient) Process(cmd Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// On network errors try random node.
|
// On network errors try random node.
|
||||||
if internal.IsRetryableError(err) {
|
if internal.IsRetryableError(err) || internal.IsClusterDownError(err) {
|
||||||
node, err = c.nodes.Random()
|
node, err = c.nodes.Random()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
|
|
|
@ -342,7 +342,8 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(get.Val()).To(Equal(key + "_value"))
|
Expect(get.Val()).To(Equal(key + "_value"))
|
||||||
|
|
||||||
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
|
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
|
||||||
Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second))
|
dur := time.Duration(i+1) * time.Hour
|
||||||
|
Expect(ttl.Val()).To(BeNumerically("~", dur, 5*time.Second))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -476,9 +477,9 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
for _, client := range cluster.masters() {
|
for _, client := range cluster.masters() {
|
||||||
keys, err := client.Keys("*").Result()
|
size, err := client.DBSize().Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(keys).To(HaveLen(0))
|
Expect(size).To(Equal(int64(0)))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -551,6 +552,9 @@ var _ = Describe("ClusterClient", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
_ = client.ForEachSlave(func(slave *redis.Client) error {
|
_ = client.ForEachSlave(func(slave *redis.Client) error {
|
||||||
|
Eventually(func() int64 {
|
||||||
|
return client.DBSize().Val()
|
||||||
|
}, 30*time.Second).Should(Equal(int64(0)))
|
||||||
return slave.ClusterFailover().Err()
|
return slave.ClusterFailover().Err()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -700,14 +704,14 @@ var _ = Describe("ClusterClient timeout", func() {
|
||||||
testTimeout()
|
testTimeout()
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("network timeout", func() {
|
Context("ClientPause timeout", func() {
|
||||||
const pause = time.Second
|
const pause = time.Second
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
opt := redisClusterOptions()
|
opt := redisClusterOptions()
|
||||||
opt.ReadTimeout = 100 * time.Millisecond
|
opt.ReadTimeout = pause / 10
|
||||||
opt.WriteTimeout = 100 * time.Millisecond
|
opt.WriteTimeout = pause / 10
|
||||||
opt.MaxRedirects = 1
|
opt.MaxRedirects = -1
|
||||||
client = cluster.clusterClient(opt)
|
client = cluster.clusterClient(opt)
|
||||||
|
|
||||||
err := client.ForEachNode(func(client *redis.Client) error {
|
err := client.ForEachNode(func(client *redis.Client) error {
|
||||||
|
@ -717,11 +721,12 @@ var _ = Describe("ClusterClient timeout", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
Eventually(func() error {
|
client.ForEachNode(func(client *redis.Client) error {
|
||||||
return client.ForEachNode(func(client *redis.Client) error {
|
Eventually(func() error {
|
||||||
return client.Ping().Err()
|
return client.Ping().Err()
|
||||||
})
|
}, 2*pause).ShouldNot(HaveOccurred())
|
||||||
}, 2*pause).ShouldNot(HaveOccurred())
|
return nil
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
testTimeout()
|
testTimeout()
|
||||||
|
|
18
commands.go
18
commands.go
|
@ -159,6 +159,7 @@ type Cmdable interface {
|
||||||
ZIncrXX(key string, member Z) *FloatCmd
|
ZIncrXX(key string, member Z) *FloatCmd
|
||||||
ZCard(key string) *IntCmd
|
ZCard(key string) *IntCmd
|
||||||
ZCount(key, min, max string) *IntCmd
|
ZCount(key, min, max string) *IntCmd
|
||||||
|
ZLexCount(key, min, max string) *IntCmd
|
||||||
ZIncrBy(key string, increment float64, member string) *FloatCmd
|
ZIncrBy(key string, increment float64, member string) *FloatCmd
|
||||||
ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
|
ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
|
||||||
ZRange(key string, start, stop int64) *StringSliceCmd
|
ZRange(key string, start, stop int64) *StringSliceCmd
|
||||||
|
@ -190,7 +191,7 @@ type Cmdable interface {
|
||||||
ConfigGet(parameter string) *SliceCmd
|
ConfigGet(parameter string) *SliceCmd
|
||||||
ConfigResetStat() *StatusCmd
|
ConfigResetStat() *StatusCmd
|
||||||
ConfigSet(parameter, value string) *StatusCmd
|
ConfigSet(parameter, value string) *StatusCmd
|
||||||
DbSize() *IntCmd
|
DBSize() *IntCmd
|
||||||
FlushAll() *StatusCmd
|
FlushAll() *StatusCmd
|
||||||
FlushAllAsync() *StatusCmd
|
FlushAllAsync() *StatusCmd
|
||||||
FlushDB() *StatusCmd
|
FlushDB() *StatusCmd
|
||||||
|
@ -1352,6 +1353,12 @@ func (c *cmdable) ZCount(key, min, max string) *IntCmd {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) ZLexCount(key, min, max string) *IntCmd {
|
||||||
|
cmd := NewIntCmd("zlexcount", key, min, max)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cmdable) ZIncrBy(key string, increment float64, member string) *FloatCmd {
|
func (c *cmdable) ZIncrBy(key string, increment float64, member string) *FloatCmd {
|
||||||
cmd := NewFloatCmd("zincrby", key, increment, member)
|
cmd := NewFloatCmd("zincrby", key, increment, member)
|
||||||
c.process(cmd)
|
c.process(cmd)
|
||||||
|
@ -1677,7 +1684,12 @@ func (c *cmdable) ConfigSet(parameter, value string) *StatusCmd {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deperecated. Use DBSize instead.
|
||||||
func (c *cmdable) DbSize() *IntCmd {
|
func (c *cmdable) DbSize() *IntCmd {
|
||||||
|
return c.DBSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) DBSize() *IntCmd {
|
||||||
cmd := NewIntCmd("dbsize")
|
cmd := NewIntCmd("dbsize")
|
||||||
c.process(cmd)
|
c.process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
|
@ -1697,9 +1709,7 @@ func (c *cmdable) FlushAllAsync() *StatusCmd {
|
||||||
|
|
||||||
// Deprecated. Use FlushDB instead.
|
// Deprecated. Use FlushDB instead.
|
||||||
func (c *cmdable) FlushDb() *StatusCmd {
|
func (c *cmdable) FlushDb() *StatusCmd {
|
||||||
cmd := NewStatusCmd("flushdb")
|
return c.FlushDB()
|
||||||
c.process(cmd)
|
|
||||||
return cmd
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) FlushDB() *StatusCmd {
|
func (c *cmdable) FlushDB() *StatusCmd {
|
||||||
|
|
|
@ -139,10 +139,10 @@ var _ = Describe("Commands", func() {
|
||||||
Expect(configSet.Val()).To(Equal("OK"))
|
Expect(configSet.Val()).To(Equal("OK"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should DbSize", func() {
|
It("should DBSize", func() {
|
||||||
dbSize := client.DbSize()
|
size, err := client.DBSize().Result()
|
||||||
Expect(dbSize.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(dbSize.Val()).To(Equal(int64(0)))
|
Expect(size).To(Equal(int64(0)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should Info", func() {
|
It("should Info", func() {
|
||||||
|
@ -2176,20 +2176,24 @@ var _ = Describe("Commands", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should ZCount", func() {
|
It("should ZCount", func() {
|
||||||
zAdd := client.ZAdd("zset", redis.Z{1, "one"})
|
err := client.ZAdd("zset", redis.Z{1, "one"}).Err()
|
||||||
Expect(zAdd.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
zAdd = client.ZAdd("zset", redis.Z{2, "two"})
|
err = client.ZAdd("zset", redis.Z{2, "two"}).Err()
|
||||||
Expect(zAdd.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
zAdd = client.ZAdd("zset", redis.Z{3, "three"})
|
err = client.ZAdd("zset", redis.Z{3, "three"}).Err()
|
||||||
Expect(zAdd.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
zCount := client.ZCount("zset", "-inf", "+inf")
|
count, err := client.ZCount("zset", "-inf", "+inf").Result()
|
||||||
Expect(zCount.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(zCount.Val()).To(Equal(int64(3)))
|
Expect(count).To(Equal(int64(3)))
|
||||||
|
|
||||||
zCount = client.ZCount("zset", "(1", "3")
|
count, err = client.ZCount("zset", "(1", "3").Result()
|
||||||
Expect(zCount.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(zCount.Val()).To(Equal(int64(2)))
|
Expect(count).To(Equal(int64(2)))
|
||||||
|
|
||||||
|
count, err = client.ZLexCount("zset", "-", "+").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(count).To(Equal(int64(3)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should ZIncrBy", func() {
|
It("should ZIncrBy", func() {
|
||||||
|
|
|
@ -67,9 +67,9 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsLoadingError(err error) bool {
|
func IsLoadingError(err error) bool {
|
||||||
return strings.HasPrefix(err.Error(), "LOADING")
|
return strings.HasPrefix(err.Error(), "LOADING ")
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsExecAbortError(err error) bool {
|
func IsClusterDownError(err error) bool {
|
||||||
return strings.HasPrefix(err.Error(), "EXECABORT")
|
return strings.HasPrefix(err.Error(), "CLUSTERDOWN ")
|
||||||
}
|
}
|
|
@ -400,8 +400,11 @@ var _ = Describe("PubSub", func() {
|
||||||
pubsub := client.Subscribe()
|
pubsub := client.Subscribe()
|
||||||
defer pubsub.Close()
|
defer pubsub.Close()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
time.Sleep(2 * timeout)
|
time.Sleep(2 * timeout)
|
||||||
|
|
||||||
|
@ -418,5 +421,7 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msg.Channel).To(Equal("mychannel"))
|
Expect(msg.Channel).To(Equal("mychannel"))
|
||||||
Expect(msg.Payload).To(Equal("hello"))
|
Expect(msg.Payload).To(Equal("hello"))
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue