diff --git a/cluster_test.go b/cluster_test.go index 70bdd35b..32022ed0 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -2,6 +2,9 @@ package redis_test import ( "math/rand" + "net" + + "testing" "time" . "github.com/onsi/ginkgo" @@ -10,73 +13,148 @@ import ( "gopkg.in/redis.v2" ) +type clusterScenario struct { + ports []string + nodeIds []string + processes map[string]*redisProcess + clients map[string]*redis.Client +} + +func (s *clusterScenario) primary() *redis.Client { + return s.clients[s.ports[0]] +} + +func (s *clusterScenario) masters() []*redis.Client { + result := make([]*redis.Client, 3) + for pos, port := range s.ports[:3] { + result[pos] = s.clients[port] + } + return result +} + +func (s *clusterScenario) slaves() []*redis.Client { + result := make([]*redis.Client, 3) + for pos, port := range s.ports[3:] { + result[pos] = s.clients[port] + } + return result +} + +func (s *clusterScenario) clusterClient() *redis.ClusterClient { + addrs := make([]string, len(s.ports)) + for i, port := range s.ports { + addrs[i] = net.JoinHostPort("127.0.0.1", port) + } + return redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + }) +} + +func startCluster(scenario *clusterScenario) error { + // Start processes, connect individual clients + for pos, port := range scenario.ports { + process, err := startRedis(port, "--cluster-enabled", "yes") + if err != nil { + return err + } + + client := redis.NewClient(&redis.Options{Addr: "127.0.0.1:" + port}) + info, err := client.ClusterNodes().Result() + if err != nil { + return err + } + + scenario.processes[port] = process + scenario.clients[port] = client + scenario.nodeIds[pos] = info[:40] + } + + // Meet cluster nodes + for _, client := range scenario.clients { + err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err() + if err != nil { + return err + } + } + + // Bootstrap masters + slots := []int{0, 5000, 10000, 16384} + for pos, client := range scenario.masters() { + err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err() + if err != nil { + return err + } + } + + // Bootstrap slaves + for pos, client := range scenario.slaves() { + masterId := scenario.nodeIds[pos] + + // Wait for masters + err := waitForSubstring(func() string { + return client.ClusterNodes().Val() + }, masterId, 10*time.Second) + if err != nil { + return err + } + + err = client.ClusterReplicate(masterId).Err() + if err != nil { + return err + } + + // Wait for slaves + err = waitForSubstring(func() string { + return scenario.primary().ClusterNodes().Val() + }, "slave "+masterId, 10*time.Second) + if err != nil { + return err + } + } + + // Wait for cluster state to turn OK + for _, client := range scenario.clients { + err := waitForSubstring(func() string { + return client.ClusterInfo().Val() + }, "cluster_state:ok", 10*time.Second) + if err != nil { + return err + } + } + + return nil +} + +func stopCluster(scenario *clusterScenario) error { + for _, client := range scenario.clients { + if err := client.Close(); err != nil { + return err + } + } + for _, process := range scenario.processes { + if err := process.Close(); err != nil { + return err + } + } + return nil +} + +//------------------------------------------------------------------------------ + var _ = Describe("Cluster", func() { - var scenario = &clusterScenario{ + scenario := &clusterScenario{ ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, - nodeIDs: make([]string, 6), + nodeIds: make([]string, 6), processes: make(map[string]*redisProcess, 6), clients: make(map[string]*redis.Client, 6), } BeforeSuite(func() { - // Start processes, connect individual clients - for pos, port := range scenario.ports { - process, err := startRedis(port, "--cluster-enabled", "yes") - Expect(err).NotTo(HaveOccurred()) - - client := redis.NewClient(&redis.Options{Addr: "127.0.0.1:" + port}) - info, err := client.ClusterNodes().Result() - Expect(err).NotTo(HaveOccurred()) - - scenario.processes[port] = process - scenario.clients[port] = client - scenario.nodeIDs[pos] = info[:40] - } - - // Meet cluster nodes - for _, client := range scenario.clients { - err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err() - Expect(err).NotTo(HaveOccurred()) - } - - // Bootstrap masters - slots := []int{0, 5000, 10000, 16384} - for pos, client := range scenario.masters() { - err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err() - Expect(err).NotTo(HaveOccurred()) - } - - // Bootstrap slaves - for pos, client := range scenario.slaves() { - masterID := scenario.nodeIDs[pos] - - Eventually(func() string { // Wait for masters - return client.ClusterNodes().Val() - }, "10s").Should(ContainSubstring(masterID)) - - err := client.ClusterReplicate(masterID).Err() - Expect(err).NotTo(HaveOccurred()) - - Eventually(func() string { // Wait for slaves - return scenario.primary().ClusterNodes().Val() - }, "10s").Should(ContainSubstring("slave " + masterID)) - } - - // Wait for cluster state to turn OK - for _, client := range scenario.clients { - Eventually(func() string { - return client.ClusterInfo().Val() - }, "10s").Should(ContainSubstring("cluster_state:ok")) - } + Expect(startCluster(scenario)).NotTo(HaveOccurred()) }) AfterSuite(func() { - for _, client := range scenario.clients { - client.Close() - } - for _, process := range scenario.processes { - process.Close() - } + Expect(stopCluster(scenario)).NotTo(HaveOccurred()) }) Describe("HashSlot", func() { @@ -150,14 +228,12 @@ var _ = Describe("Cluster", func() { var client *redis.ClusterClient BeforeEach(func() { - client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"}, - }) + client = scenario.clusterClient() }) AfterEach(func() { - for _, client := range scenario.clients { - client.FlushDb() + for _, client := range scenario.masters() { + Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) } Expect(client.Close()).NotTo(HaveOccurred()) }) @@ -230,31 +306,29 @@ var _ = Describe("Cluster", func() { }) }) -// -------------------------------------------------------------------- +//------------------------------------------------------------------------------ -type clusterScenario struct { - ports []string - nodeIDs []string - processes map[string]*redisProcess - clients map[string]*redis.Client -} - -func (s *clusterScenario) primary() *redis.Client { - return s.clients[s.ports[0]] -} - -func (s *clusterScenario) masters() []*redis.Client { - result := make([]*redis.Client, 3) - for pos, port := range s.ports[:3] { - result[pos] = s.clients[port] +func BenchmarkRedisClusterPing(b *testing.B) { + scenario := &clusterScenario{ + ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, + nodeIds: make([]string, 6), + processes: make(map[string]*redisProcess, 6), + clients: make(map[string]*redis.Client, 6), } - return result -} - -func (s *clusterScenario) slaves() []*redis.Client { - result := make([]*redis.Client, 3) - for pos, port := range s.ports[3:] { - result[pos] = s.clients[port] + if err := startCluster(scenario); err != nil { + b.Fatal(err) } - return result + defer stopCluster(scenario) + client := scenario.clusterClient() + defer client.Close() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.Ping().Err(); err != nil { + b.Fatal(err) + } + } + }) } diff --git a/pool_test.go b/pool_test.go index 98b4eccb..890ca2cf 100644 --- a/pool_test.go +++ b/pool_test.go @@ -186,8 +186,10 @@ func BenchmarkPool(b *testing.B) { IdleTimeout: 100 * time.Millisecond, }) defer client.Close() - pool := client.Pool() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { for pb.Next() { conn, err := pool.Get() diff --git a/redis_test.go b/redis_test.go index 09b7e79e..62068d06 100644 --- a/redis_test.go +++ b/redis_test.go @@ -1,10 +1,13 @@ package redis_test import ( + "fmt" "net" "os" "os/exec" "path/filepath" + "strings" + "sync/atomic" "testing" "time" @@ -16,6 +19,11 @@ import ( const redisAddr = ":6379" +func TestGinkgoSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "gopkg.in/redis.v2") +} + var _ = Describe("Client", func() { var client *redis.Client @@ -120,9 +128,168 @@ var _ = Describe("Client", func() { //------------------------------------------------------------------------------ -func TestGinkgoSuite(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "gopkg.in/redis.v2") +func BenchmarkRedisPing(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.Ping().Err(); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkRedisSet(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.Set("key", "hello", 0).Err(); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkRedisGetNil(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + if err := client.FlushDb().Err(); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.Get("key").Err(); err != redis.Nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkRedisGet(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + if err := client.Set("key", "hello", 0).Err(); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.Get("key").Err(); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkRedisMGet(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + if err := client.MSet("key1", "hello1", "key2", "hello2").Err(); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.MGet("key1", "key2").Err(); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkSetExpire(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := client.Set("key", "hello", 0).Err(); err != nil { + b.Fatal(err) + } + if err := client.Expire("key", time.Second).Err(); err != nil { + b.Fatal(err) + } + } + }) +} + +func BenchmarkPipeline(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + }) + defer client.Close() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := client.Pipelined(func(pipe *redis.Pipeline) error { + pipe.Set("key", "hello", 0) + pipe.Expire("key", time.Second) + return nil + }) + if err != nil { + b.Fatal(err) + } + } + }) +} + +//------------------------------------------------------------------------------ + +// Replaces ginkgo's Eventually. +func waitForSubstring(fn func() string, substr string, timeout time.Duration) error { + var s string + + found := make(chan struct{}) + var exit int32 + go func() { + for atomic.LoadInt32(&exit) == 0 { + s = fn() + if strings.Contains(s, substr) { + found <- struct{}{} + return + } + time.Sleep(timeout / 100) + } + }() + + select { + case <-found: + return nil + case <-time.After(timeout): + atomic.StoreInt32(&exit, 1) + } + return fmt.Errorf("%q does not contain %q", s, substr) } func execCmd(name string, args ...string) (*os.Process, error) { @@ -228,120 +395,3 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { } return &redisProcess{process, client}, nil } - -//------------------------------------------------------------------------------ - -func BenchmarkRedisPing(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - b.StartTimer() - - for i := 0; i < b.N; i++ { - if err := client.Ping().Err(); err != nil { - panic(err) - } - } -} - -func BenchmarkRedisSet(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - b.StartTimer() - - for i := 0; i < b.N; i++ { - if err := client.Set("key", "hello", 0).Err(); err != nil { - panic(err) - } - } -} - -func BenchmarkRedisGetNil(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - if err := client.FlushDb().Err(); err != nil { - b.Fatal(err) - } - b.StartTimer() - - for i := 0; i < b.N; i++ { - if err := client.Get("key").Err(); err != redis.Nil { - b.Fatal(err) - } - } -} - -func BenchmarkRedisGet(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - if err := client.Set("key", "hello", 0).Err(); err != nil { - b.Fatal(err) - } - b.StartTimer() - - for i := 0; i < b.N; i++ { - if err := client.Get("key").Err(); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkRedisMGet(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - if err := client.MSet("key1", "hello1", "key2", "hello2").Err(); err != nil { - b.Fatal(err) - } - b.StartTimer() - - for i := 0; i < b.N; i++ { - if err := client.MGet("key1", "key2").Err(); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkSetExpire(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - b.StartTimer() - - for i := 0; i < b.N; i++ { - if err := client.Set("key", "hello", 0).Err(); err != nil { - b.Fatal(err) - } - if err := client.Expire("key", time.Second).Err(); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkPipeline(b *testing.B) { - b.StopTimer() - client := redis.NewTCPClient(&redis.Options{ - Addr: redisAddr, - }) - b.StartTimer() - - for i := 0; i < b.N; i++ { - _, err := client.Pipelined(func(pipe *redis.Pipeline) error { - pipe.Set("key", "hello", 0) - pipe.Expire("key", time.Second) - return nil - }) - if err != nil { - b.Fatal(err) - } - } -}