diff --git a/cluster_test.go b/cluster_test.go index 26d306e3..d875735f 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "net" + "reflect" "strings" "testing" @@ -116,23 +117,23 @@ func startCluster(scenario *clusterScenario) error { // Wait until all nodes have consistent info for _, client := range scenario.clients { err := eventually(func() error { - s := client.ClusterNodes().Val() - nodes := strings.Split(s, "\n") - if len(nodes) < 6 { - return fmt.Errorf("got %d nodes, wanted 6", len(nodes)) + res, err := client.ClusterSlots().Result() + if err != nil { + return err } - for _, node := range nodes { - if node == "" { - continue - } - parts := strings.Split(node, " ") - var flags string - if len(parts) >= 3 { - flags = parts[2] - } - if !strings.Contains(flags, "master") && !strings.Contains(flags, "slave") { - return fmt.Errorf("node flags are %q", flags) + wanted := []redis.ClusterSlotInfo{ + {0, 4999, []string{"127.0.0.1:8220", "127.0.0.1:8223"}}, + {5000, 9999, []string{"127.0.0.1:8221", "127.0.0.1:8224"}}, + {10000, 16383, []string{"127.0.0.1:8222", "127.0.0.1:8225"}}, + } + loop: + for _, info := range res { + for _, info2 := range wanted { + if reflect.DeepEqual(info, info2) { + continue loop + } } + return fmt.Errorf("cluster did not reach consistent state (%v)", res) } return nil }, 10*time.Second) diff --git a/commands_test.go b/commands_test.go index b931d05e..4e3488dd 100644 --- a/commands_test.go +++ b/commands_test.go @@ -20,10 +20,8 @@ var _ = Describe("Commands", func() { BeforeEach(func() { client = redis.NewClient(&redis.Options{ - Addr: redisAddr, - ReadTimeout: 500 * time.Millisecond, - WriteTimeout: 500 * time.Millisecond, - PoolTimeout: 30 * time.Second, + Addr: redisAddr, + PoolTimeout: 30 * time.Second, }) }) @@ -81,13 +79,10 @@ var _ = Describe("Commands", func() { err := client.ClientPause(time.Second).Err() Expect(err).NotTo(HaveOccurred()) - Consistently(func() error { - return client.Ping().Err() - }, "400ms").Should(HaveOccurred()) // pause time - read timeout - - Eventually(func() error { - return client.Ping().Err() - }, "1s").ShouldNot(HaveOccurred()) + start := time.Now() + err = client.Ping().Err() + Expect(err).NotTo(HaveOccurred()) + Expect(time.Now()).To(BeTemporally("~", start.Add(time.Second), 800*time.Millisecond)) }) It("should ClientSetName and ClientGetName", func() { diff --git a/example_test.go b/example_test.go index 8de20827..c7969914 100644 --- a/example_test.go +++ b/example_test.go @@ -238,6 +238,7 @@ func ExamplePubSub_Receive() { } for i := 0; i < 2; i++ { + // ReceiveTimeout is a low level API. Use ReceiveMessage instead. msgi, err := pubsub.ReceiveTimeout(100 * time.Millisecond) if err != nil { panic(err) diff --git a/pubsub.go b/pubsub.go index a3792d3f..223f8b96 100644 --- a/pubsub.go +++ b/pubsub.go @@ -80,6 +80,9 @@ func (c *PubSub) PSubscribe(patterns ...string) error { } func remove(ss []string, es ...string) []string { + if len(es) == 0 { + return ss[:0] + } for _, e := range es { for i, s := range ss { if s == e { @@ -231,7 +234,9 @@ func (c *PubSub) Receive() (interface{}, error) { } func (c *PubSub) reconnect() { + // Close current connection. c.connPool.Remove(nil) // nil to force removal + if len(c.channels) > 0 { if err := c.Subscribe(c.channels...); err != nil { log.Printf("redis: Subscribe failed: %s", err) diff --git a/pubsub_test.go b/pubsub_test.go index 411c6431..1506a8eb 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -2,6 +2,7 @@ package redis_test import ( "net" + "sync" "time" . "github.com/onsi/ginkgo" @@ -261,19 +262,23 @@ var _ = Describe("PubSub", func() { writeErr: errTimeout, }) + var wg sync.WaitGroup + wg.Add(1) go func() { defer GinkgoRecover() + defer wg.Done() time.Sleep(100 * time.Millisecond) - n, err := client.Publish("mychannel", "hello").Result() + err := client.Publish("mychannel", "hello").Err() Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(2))) }() msg, err := pubsub.ReceiveMessage() Expect(err).NotTo(HaveOccurred()) Expect(msg.Channel).To(Equal("mychannel")) Expect(msg.Payload).To(Equal("hello")) + + wg.Wait() }) }) diff --git a/ring_test.go b/ring_test.go index 55eb90dd..0de37dc4 100644 --- a/ring_test.go +++ b/ring_test.go @@ -56,7 +56,7 @@ var _ = Describe("Redis ring", func() { // Ring needs 5 * heartbeat time to detect that node is down. // Give it more to be sure. heartbeat := 100 * time.Millisecond - time.Sleep(5*heartbeat + 2*heartbeat) + time.Sleep(2 * 5 * heartbeat) setRingKeys()