From a9364f117c80e51e9f7aea42903c903cda4d8842 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 15 Aug 2017 09:49:23 +0300 Subject: [PATCH 1/3] Add ZLexCount --- commands.go | 7 +++++++ commands_test.go | 28 ++++++++++++++++------------ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/commands.go b/commands.go index ac7c6cc1..15f67a13 100644 --- a/commands.go +++ b/commands.go @@ -159,6 +159,7 @@ type Cmdable interface { ZIncrXX(key string, member Z) *FloatCmd ZCard(key string) *IntCmd ZCount(key, min, max string) *IntCmd + ZLexCount(key, min, max string) *IntCmd ZIncrBy(key string, increment float64, member string) *FloatCmd ZInterStore(destination string, store ZStore, keys ...string) *IntCmd ZRange(key string, start, stop int64) *StringSliceCmd @@ -1352,6 +1353,12 @@ func (c *cmdable) ZCount(key, min, max string) *IntCmd { return cmd } +func (c *cmdable) ZLexCount(key, min, max string) *IntCmd { + cmd := NewIntCmd("zlexcount", key, min, max) + c.process(cmd) + return cmd +} + func (c *cmdable) ZIncrBy(key string, increment float64, member string) *FloatCmd { cmd := NewFloatCmd("zincrby", key, increment, member) c.process(cmd) diff --git a/commands_test.go b/commands_test.go index e8cdb205..81e0e4a8 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2176,20 +2176,24 @@ var _ = Describe("Commands", func() { }) It("should ZCount", func() { - zAdd := client.ZAdd("zset", redis.Z{1, "one"}) - Expect(zAdd.Err()).NotTo(HaveOccurred()) - zAdd = client.ZAdd("zset", redis.Z{2, "two"}) - Expect(zAdd.Err()).NotTo(HaveOccurred()) - zAdd = client.ZAdd("zset", redis.Z{3, "three"}) - Expect(zAdd.Err()).NotTo(HaveOccurred()) + err := client.ZAdd("zset", redis.Z{1, "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset", redis.Z{2, "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset", redis.Z{3, "three"}).Err() + Expect(err).NotTo(HaveOccurred()) - zCount := client.ZCount("zset", "-inf", "+inf") - Expect(zCount.Err()).NotTo(HaveOccurred()) - Expect(zCount.Val()).To(Equal(int64(3))) + count, err := client.ZCount("zset", "-inf", "+inf").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(int64(3))) - zCount = client.ZCount("zset", "(1", "3") - Expect(zCount.Err()).NotTo(HaveOccurred()) - Expect(zCount.Val()).To(Equal(int64(2))) + count, err = client.ZCount("zset", "(1", "3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(int64(2))) + + count, err = client.ZLexCount("zset", "-", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(int64(3))) }) It("should ZIncrBy", func() { From 63e3bc58c7d21783115fa0efc58ee6e09012b337 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 15 Aug 2017 10:12:43 +0300 Subject: [PATCH 2/3] Retry cluster down errors --- cluster.go | 2 +- cluster_test.go | 8 ++++---- internal/{errors.go => error.go} | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) rename internal/{errors.go => error.go} (88%) diff --git a/cluster.go b/cluster.go index a8695c32..647a25be 100644 --- a/cluster.go +++ b/cluster.go @@ -583,7 +583,7 @@ func (c *ClusterClient) Process(cmd Cmder) error { } // On network errors try random node. - if internal.IsRetryableError(err) { + if internal.IsRetryableError(err) || internal.IsClusterDownError(err) { node, err = c.nodes.Random() if err != nil { cmd.setErr(err) diff --git a/cluster_test.go b/cluster_test.go index 0176c685..91b9e80e 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -700,14 +700,14 @@ var _ = Describe("ClusterClient timeout", func() { testTimeout() }) - Context("network timeout", func() { + Context("ClientPause timeout", func() { const pause = time.Second BeforeEach(func() { opt := redisClusterOptions() - opt.ReadTimeout = 100 * time.Millisecond - opt.WriteTimeout = 100 * time.Millisecond - opt.MaxRedirects = 1 + opt.ReadTimeout = pause / 10 + opt.WriteTimeout = pause / 10 + opt.MaxRedirects = -1 client = cluster.clusterClient(opt) err := client.ForEachNode(func(client *redis.Client) error { diff --git a/internal/errors.go b/internal/error.go similarity index 88% rename from internal/errors.go rename to internal/error.go index c93e0081..90f6503a 100644 --- a/internal/errors.go +++ b/internal/error.go @@ -67,9 +67,9 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { } func IsLoadingError(err error) bool { - return strings.HasPrefix(err.Error(), "LOADING") + return strings.HasPrefix(err.Error(), "LOADING ") } -func IsExecAbortError(err error) bool { - return strings.HasPrefix(err.Error(), "EXECABORT") +func IsClusterDownError(err error) bool { + return strings.HasPrefix(err.Error(), "CLUSTERDOWN ") } From 8ff417ca1810febd2f38abe9b3134996c6bd2f08 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 15 Aug 2017 10:34:05 +0300 Subject: [PATCH 3/3] Fix flaky tests --- cluster_test.go | 19 ++++++++++++------- commands.go | 11 +++++++---- commands_test.go | 8 ++++---- pubsub_test.go | 5 +++++ 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 91b9e80e..324bd1ce 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -342,7 +342,8 @@ var _ = Describe("ClusterClient", func() { Expect(get.Val()).To(Equal(key + "_value")) ttl := cmds[(i*2)+1].(*redis.DurationCmd) - Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second)) + dur := time.Duration(i+1) * time.Hour + Expect(ttl.Val()).To(BeNumerically("~", dur, 5*time.Second)) } }) @@ -476,9 +477,9 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) for _, client := range cluster.masters() { - keys, err := client.Keys("*").Result() + size, err := client.DBSize().Result() Expect(err).NotTo(HaveOccurred()) - Expect(keys).To(HaveLen(0)) + Expect(size).To(Equal(int64(0))) } }) @@ -551,6 +552,9 @@ var _ = Describe("ClusterClient", func() { }) _ = client.ForEachSlave(func(slave *redis.Client) error { + Eventually(func() int64 { + return client.DBSize().Val() + }, 30*time.Second).Should(Equal(int64(0))) return slave.ClusterFailover().Err() }) }) @@ -717,11 +721,12 @@ var _ = Describe("ClusterClient timeout", func() { }) AfterEach(func() { - Eventually(func() error { - return client.ForEachNode(func(client *redis.Client) error { + client.ForEachNode(func(client *redis.Client) error { + Eventually(func() error { return client.Ping().Err() - }) - }, 2*pause).ShouldNot(HaveOccurred()) + }, 2*pause).ShouldNot(HaveOccurred()) + return nil + }) }) testTimeout() diff --git a/commands.go b/commands.go index 15f67a13..83b3824f 100644 --- a/commands.go +++ b/commands.go @@ -191,7 +191,7 @@ type Cmdable interface { ConfigGet(parameter string) *SliceCmd ConfigResetStat() *StatusCmd ConfigSet(parameter, value string) *StatusCmd - DbSize() *IntCmd + DBSize() *IntCmd FlushAll() *StatusCmd FlushAllAsync() *StatusCmd FlushDB() *StatusCmd @@ -1684,7 +1684,12 @@ func (c *cmdable) ConfigSet(parameter, value string) *StatusCmd { return cmd } +// Deperecated. Use DBSize instead. func (c *cmdable) DbSize() *IntCmd { + return c.DBSize() +} + +func (c *cmdable) DBSize() *IntCmd { cmd := NewIntCmd("dbsize") c.process(cmd) return cmd @@ -1704,9 +1709,7 @@ func (c *cmdable) FlushAllAsync() *StatusCmd { // Deprecated. Use FlushDB instead. func (c *cmdable) FlushDb() *StatusCmd { - cmd := NewStatusCmd("flushdb") - c.process(cmd) - return cmd + return c.FlushDB() } func (c *cmdable) FlushDB() *StatusCmd { diff --git a/commands_test.go b/commands_test.go index 81e0e4a8..4298cba6 100644 --- a/commands_test.go +++ b/commands_test.go @@ -139,10 +139,10 @@ var _ = Describe("Commands", func() { Expect(configSet.Val()).To(Equal("OK")) }) - It("should DbSize", func() { - dbSize := client.DbSize() - Expect(dbSize.Err()).NotTo(HaveOccurred()) - Expect(dbSize.Val()).To(Equal(int64(0))) + It("should DBSize", func() { + size, err := client.DBSize().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(0))) }) It("should Info", func() { diff --git a/pubsub_test.go b/pubsub_test.go index d44b1dd8..1d9dfcb9 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -400,8 +400,11 @@ var _ = Describe("PubSub", func() { pubsub := client.Subscribe() defer pubsub.Close() + var wg sync.WaitGroup + wg.Add(1) go func() { defer GinkgoRecover() + defer wg.Done() time.Sleep(2 * timeout) @@ -418,5 +421,7 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) Expect(msg.Channel).To(Equal("mychannel")) Expect(msg.Payload).To(Equal("hello")) + + wg.Wait() }) })