From a242fa70275c80f27bdc79e6f2389d08224332d7 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 14 Nov 2015 15:54:16 +0200 Subject: [PATCH] Try to make cluster tests more stable. --- cluster_test.go | 52 +++++++++++++++++++++++++++---------------------- main_test.go | 19 +++++++----------- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 136340c..bc395b4 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,8 +1,10 @@ package redis_test import ( + "fmt" "math/rand" "net" + "strings" "testing" "time" @@ -53,7 +55,7 @@ func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.Cluste } func startCluster(scenario *clusterScenario) error { - // Start processes, connect individual clients + // Start processes and collect node ids for pos, port := range scenario.ports { process, err := startRedis(port, "--cluster-enabled", "yes") if err != nil { @@ -81,44 +83,48 @@ func startCluster(scenario *clusterScenario) error { // Bootstrap masters slots := []int{0, 5000, 10000, 16384} - for pos, client := range scenario.masters() { - err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err() + for pos, master := range scenario.masters() { + err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err() if err != nil { return err } } // Bootstrap slaves - for pos, client := range scenario.slaves() { - masterId := scenario.nodeIds[pos] + for idx, slave := range scenario.slaves() { + masterId := scenario.nodeIds[idx] - // Wait for masters - err := waitForSubstring(func() string { - return client.ClusterNodes().Val() - }, masterId, 10*time.Second) + // Wait until master is available + err := eventually(func() error { + s := slave.ClusterNodes().Val() + wanted := masterId + if !strings.Contains(s, wanted) { + return fmt.Errorf("%q does not contain %q", s, wanted) + } + return nil + }, 10*time.Second) if err != nil { return err } - err = client.ClusterReplicate(masterId).Err() - if err != nil { - return err - } - - // Wait for slaves - err = waitForSubstring(func() string { - return scenario.primary().ClusterNodes().Val() - }, "slave "+masterId, 10*time.Second) + err = slave.ClusterReplicate(masterId).Err() if err != nil { return err } } - // Wait for cluster state to turn OK + // Wait until all nodes have consistent info for _, client := range scenario.clients { - err := waitForSubstring(func() string { - return client.ClusterInfo().Val() - }, "cluster_state:ok", 10*time.Second) + err := eventually(func() error { + for _, masterId := range scenario.nodeIds[:3] { + s := client.ClusterNodes().Val() + wanted := "slave " + masterId + if !strings.Contains(s, wanted) { + return fmt.Errorf("%q does not contain %q", s, wanted) + } + } + return nil + }, 10*time.Second) if err != nil { return err } @@ -260,7 +266,6 @@ var _ = Describe("Cluster", func() { It("should perform multi-pipelines", func() { slot := redis.HashSlot("A") - Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"})) Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) pipe := client.Pipeline() @@ -288,6 +293,7 @@ var _ = Describe("Cluster", func() { }) It("should return error when there are no attempts left", func() { + Expect(client.Close()).NotTo(HaveOccurred()) client = cluster.clusterClient(&redis.ClusterOptions{ MaxRedirects: -1, }) diff --git a/main_test.go b/main_test.go index eafbeee..806d7d3 100644 --- a/main_test.go +++ b/main_test.go @@ -1,12 +1,10 @@ package redis_test import ( - "fmt" "net" "os" "os/exec" "path/filepath" - "strings" "sync/atomic" "syscall" "testing" @@ -100,17 +98,14 @@ func TestGinkgoSuite(t *testing.T) { //------------------------------------------------------------------------------ -// Replaces ginkgo's Eventually. -func waitForSubstring(fn func() string, substr string, timeout time.Duration) error { - var s string - - found := make(chan struct{}) +func eventually(fn func() error, timeout time.Duration) (err error) { + done := make(chan struct{}) var exit int32 go func() { for atomic.LoadInt32(&exit) == 0 { - s = fn() - if strings.Contains(s, substr) { - found <- struct{}{} + err = fn() + if err == nil { + close(done) return } time.Sleep(timeout / 100) @@ -118,12 +113,12 @@ func waitForSubstring(fn func() string, substr string, timeout time.Duration) er }() select { - case <-found: + case <-done: return nil case <-time.After(timeout): atomic.StoreInt32(&exit, 1) + return err } - return fmt.Errorf("%q does not contain %q", s, substr) } func execCmd(name string, args ...string) (*os.Process, error) {