diff --git a/.travis.yml b/.travis.yml index 1c57ddea..9eca1470 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,6 @@ services: go: - 1.3 - 1.4 - - tip install: - go get gopkg.in/bufio.v1 diff --git a/cluster_test.go b/cluster_test.go index 6ec8552f..d892c106 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -144,21 +144,6 @@ func stopCluster(scenario *clusterScenario) error { //------------------------------------------------------------------------------ var _ = Describe("Cluster", func() { - scenario := &clusterScenario{ - ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, - nodeIds: make([]string, 6), - processes: make(map[string]*redisProcess, 6), - clients: make(map[string]*redis.Client, 6), - } - - BeforeSuite(func() { - Expect(startCluster(scenario)).NotTo(HaveOccurred()) - }) - - AfterSuite(func() { - Expect(stopCluster(scenario)).NotTo(HaveOccurred()) - }) - Describe("HashSlot", func() { It("should calculate hash slots", func() { @@ -202,7 +187,7 @@ var _ = Describe("Cluster", func() { Describe("Commands", func() { It("should CLUSTER SLOTS", func() { - res, err := scenario.primary().ClusterSlots().Result() + res, err := cluster.primary().ClusterSlots().Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(HaveLen(3)) Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{ @@ -213,13 +198,13 @@ var _ = Describe("Cluster", func() { }) It("should CLUSTER NODES", func() { - res, err := scenario.primary().ClusterNodes().Result() + res, err := cluster.primary().ClusterNodes().Result() Expect(err).NotTo(HaveOccurred()) Expect(len(res)).To(BeNumerically(">", 400)) }) It("should CLUSTER INFO", func() { - res, err := scenario.primary().ClusterInfo().Result() + res, err := cluster.primary().ClusterInfo().Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(ContainSubstring("cluster_known_nodes:6")) }) @@ -230,11 +215,11 @@ var _ = Describe("Cluster", func() { var client *redis.ClusterClient BeforeEach(func() { - client = scenario.clusterClient(nil) + client = cluster.clusterClient(nil) }) AfterEach(func() { - for _, client := range scenario.masters() { + for _, client := range cluster.masters() { Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) } Expect(client.Close()).NotTo(HaveOccurred()) @@ -304,7 +289,7 @@ var _ = Describe("Cluster", func() { }) It("should return error when there are no attempts left", func() { - client = scenario.clusterClient(&redis.ClusterOptions{ + client = cluster.clusterClient(&redis.ClusterOptions{ MaxRedirects: -1, }) @@ -321,17 +306,17 @@ var _ = Describe("Cluster", func() { //------------------------------------------------------------------------------ func BenchmarkRedisClusterPing(b *testing.B) { - scenario := &clusterScenario{ + cluster := &clusterScenario{ ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, nodeIds: make([]string, 6), processes: make(map[string]*redisProcess, 6), clients: make(map[string]*redis.Client, 6), } - if err := startCluster(scenario); err != nil { + if err := startCluster(cluster); err != nil { b.Fatal(err) } - defer stopCluster(scenario) - client := scenario.clusterClient(nil) + defer stopCluster(cluster) + client := cluster.clusterClient(nil) defer client.Close() b.ResetTimer() diff --git a/commands_test.go b/commands_test.go index b451c521..7a2add44 100644 --- a/commands_test.go +++ b/commands_test.go @@ -307,9 +307,8 @@ var _ = Describe("Commands", func() { Expect(refCount.Err()).NotTo(HaveOccurred()) Expect(refCount.Val()).To(Equal(int64(1))) - enc := client.ObjectEncoding("key") - Expect(enc.Err()).NotTo(HaveOccurred()) - Expect(enc.Val()).To(Equal("raw")) + err := client.ObjectEncoding("key").Err() + Expect(err).NotTo(HaveOccurred()) idleTime := client.ObjectIdleTime("key") Expect(idleTime.Err()).NotTo(HaveOccurred()) diff --git a/main_test.go b/main_test.go new file mode 100644 index 00000000..2a0f8368 --- /dev/null +++ b/main_test.go @@ -0,0 +1,223 @@ +package redis_test + +import ( + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "sync/atomic" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "gopkg.in/redis.v2" +) + +const redisAddr = ":6379" + +const ( + sentinelName = "mymaster" + sentinelMasterPort = "8123" + sentinelSlave1Port = "8124" + sentinelSlave2Port = "8125" + sentinelPort = "8126" +) + +var sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess + +var cluster = &clusterScenario{ + ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, + nodeIds: make([]string, 6), + processes: make(map[string]*redisProcess, 6), + clients: make(map[string]*redis.Client, 6), +} + +var _ = BeforeSuite(func() { + var err error + + sentinelMaster, err = startRedis(sentinelMasterPort) + Expect(err).NotTo(HaveOccurred()) + + sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort) + Expect(err).NotTo(HaveOccurred()) + + sentinelSlave1, err = startRedis( + sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort) + Expect(err).NotTo(HaveOccurred()) + + sentinelSlave2, err = startRedis( + sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort) + Expect(err).NotTo(HaveOccurred()) + + Expect(startCluster(cluster)).NotTo(HaveOccurred()) +}) + +var _ = AfterSuite(func() { + Expect(sentinel.Close()).NotTo(HaveOccurred()) + Expect(sentinelSlave1.Close()).NotTo(HaveOccurred()) + Expect(sentinelSlave2.Close()).NotTo(HaveOccurred()) + Expect(sentinelMaster.Close()).NotTo(HaveOccurred()) + + Expect(stopCluster(cluster)).NotTo(HaveOccurred()) +}) + +func TestGinkgoSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "gopkg.in/redis.v2") +} + +//------------------------------------------------------------------------------ + +// Replaces ginkgo's Eventually. +func waitForSubstring(fn func() string, substr string, timeout time.Duration) error { + var s string + + found := make(chan struct{}) + var exit int32 + go func() { + for atomic.LoadInt32(&exit) == 0 { + s = fn() + if strings.Contains(s, substr) { + found <- struct{}{} + return + } + time.Sleep(timeout / 100) + } + }() + + select { + case <-found: + return nil + case <-time.After(timeout): + atomic.StoreInt32(&exit, 1) + } + return fmt.Errorf("%q does not contain %q", s, substr) +} + +func execCmd(name string, args ...string) (*os.Process, error) { + cmd := exec.Command(name, args...) + if testing.Verbose() { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } + return cmd.Process, cmd.Start() +} + +func connectTo(port string) (client *redis.Client, err error) { + client = redis.NewClient(&redis.Options{ + Addr: ":" + port, + }) + + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if err = client.Ping().Err(); err == nil { + return client, nil + } + time.Sleep(250 * time.Millisecond) + } + + return nil, err +} + +type redisProcess struct { + *os.Process + *redis.Client +} + +func (p *redisProcess) Close() error { + p.Client.Close() + return p.Kill() +} + +var ( + redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server")) + redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf")) +) + +func redisDir(port string) (string, error) { + dir, err := filepath.Abs(filepath.Join(".test", "instances", port)) + if err != nil { + return "", err + } else if err = os.RemoveAll(dir); err != nil { + return "", err + } else if err = os.MkdirAll(dir, 0775); err != nil { + return "", err + } + return dir, nil +} + +func startRedis(port string, args ...string) (*redisProcess, error) { + dir, err := redisDir(port) + if err != nil { + return nil, err + } + if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil { + return nil, err + } + + baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir} + process, err := execCmd(redisServerBin, append(baseArgs, args...)...) + if err != nil { + return nil, err + } + + client, err := connectTo(port) + if err != nil { + process.Kill() + return nil, err + } + return &redisProcess{process, client}, err +} + +func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { + dir, err := redisDir(port) + if err != nil { + return nil, err + } + process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir) + if err != nil { + return nil, err + } + client, err := connectTo(port) + if err != nil { + process.Kill() + return nil, err + } + for _, cmd := range []*redis.StatusCmd{ + redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"), + redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"), + redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"), + redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"), + } { + client.Process(cmd) + if err := cmd.Err(); err != nil { + process.Kill() + return nil, err + } + } + return &redisProcess{process, client}, nil +} + +//------------------------------------------------------------------------------ + +type badNetConn struct { + net.TCPConn +} + +var _ net.Conn = &badNetConn{} + +func newBadNetConn() net.Conn { + return &badNetConn{} +} + +func (badNetConn) Read([]byte) (int, error) { + return 0, net.UnknownNetworkError("badNetConn") +} + +func (badNetConn) Write([]byte) (int, error) { + return 0, net.UnknownNetworkError("badNetConn") +} diff --git a/redis_test.go b/redis_test.go index 0198aba4..1f089d74 100644 --- a/redis_test.go +++ b/redis_test.go @@ -1,29 +1,16 @@ package redis_test import ( - "fmt" "net" - "os" - "os/exec" - "path/filepath" - "strings" - "sync/atomic" "testing" "time" - "gopkg.in/redis.v2" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + "gopkg.in/redis.v2" ) -const redisAddr = ":6379" - -func TestGinkgoSuite(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "gopkg.in/redis.v2") -} - var _ = Describe("Client", func() { var client *redis.Client @@ -280,153 +267,3 @@ func BenchmarkPipeline(b *testing.B) { } }) } - -//------------------------------------------------------------------------------ - -type badNetConn struct { - net.TCPConn -} - -var _ net.Conn = &badNetConn{} - -func newBadNetConn() net.Conn { - return &badNetConn{} -} - -func (badNetConn) Read([]byte) (int, error) { - return 0, net.UnknownNetworkError("badNetConn") -} - -func (badNetConn) Write([]byte) (int, error) { - return 0, net.UnknownNetworkError("badNetConn") -} - -// Replaces ginkgo's Eventually. -func waitForSubstring(fn func() string, substr string, timeout time.Duration) error { - var s string - - found := make(chan struct{}) - var exit int32 - go func() { - for atomic.LoadInt32(&exit) == 0 { - s = fn() - if strings.Contains(s, substr) { - found <- struct{}{} - return - } - time.Sleep(timeout / 100) - } - }() - - select { - case <-found: - return nil - case <-time.After(timeout): - atomic.StoreInt32(&exit, 1) - } - return fmt.Errorf("%q does not contain %q", s, substr) -} - -func execCmd(name string, args ...string) (*os.Process, error) { - cmd := exec.Command(name, args...) - if testing.Verbose() { - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - } - return cmd.Process, cmd.Start() -} - -func connectTo(port string) (client *redis.Client, err error) { - client = redis.NewClient(&redis.Options{ - Addr: ":" + port, - }) - - deadline := time.Now().Add(3 * time.Second) - for time.Now().Before(deadline) { - if err = client.Ping().Err(); err == nil { - return client, nil - } - time.Sleep(250 * time.Millisecond) - } - - return nil, err -} - -type redisProcess struct { - *os.Process - *redis.Client -} - -func (p *redisProcess) Close() error { - p.Client.Close() - return p.Kill() -} - -var ( - redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server")) - redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf")) -) - -func redisDir(port string) (string, error) { - dir, err := filepath.Abs(filepath.Join(".test", "instances", port)) - if err != nil { - return "", err - } else if err = os.RemoveAll(dir); err != nil { - return "", err - } else if err = os.MkdirAll(dir, 0775); err != nil { - return "", err - } - return dir, nil -} - -func startRedis(port string, args ...string) (*redisProcess, error) { - dir, err := redisDir(port) - if err != nil { - return nil, err - } - if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil { - return nil, err - } - - baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir} - process, err := execCmd(redisServerBin, append(baseArgs, args...)...) - if err != nil { - return nil, err - } - - client, err := connectTo(port) - if err != nil { - process.Kill() - return nil, err - } - return &redisProcess{process, client}, err -} - -func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { - dir, err := redisDir(port) - if err != nil { - return nil, err - } - process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir) - if err != nil { - return nil, err - } - client, err := connectTo(port) - if err != nil { - process.Kill() - return nil, err - } - for _, cmd := range []*redis.StatusCmd{ - redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"), - redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"), - redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"), - redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"), - } { - client.Process(cmd) - if err := cmd.Err(); err != nil { - process.Kill() - return nil, err - } - } - return &redisProcess{process, client}, nil -} diff --git a/sentinel.go b/sentinel.go index 717f4122..26ab4855 100644 --- a/sentinel.go +++ b/sentinel.go @@ -185,9 +185,6 @@ func (d *sentinelFailover) MasterAddr() (string, error) { sentinel := newSentinel(&Options{ Addr: sentinelAddr, - DB: d.opt.DB, - Password: d.opt.Password, - DialTimeout: d.opt.DialTimeout, ReadTimeout: d.opt.ReadTimeout, WriteTimeout: d.opt.WriteTimeout, diff --git a/sentinel_test.go b/sentinel_test.go index 6fc2f4f4..42d509e6 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -7,48 +7,34 @@ import ( ) var _ = Describe("Sentinel", func() { + var client *redis.Client - const masterName = "mymaster" - const masterPort = "8123" - const sentinelPort = "8124" - - It("should facilitate failover", func() { - master, err := startRedis(masterPort) - Expect(err).NotTo(HaveOccurred()) - defer master.Close() - - sentinel, err := startSentinel(sentinelPort, masterName, masterPort) - Expect(err).NotTo(HaveOccurred()) - defer sentinel.Close() - - slave1, err := startRedis("8125", "--slaveof", "127.0.0.1", masterPort) - Expect(err).NotTo(HaveOccurred()) - defer slave1.Close() - - slave2, err := startRedis("8126", "--slaveof", "127.0.0.1", masterPort) - Expect(err).NotTo(HaveOccurred()) - defer slave2.Close() - - client := redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: masterName, + BeforeEach(func() { + client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: sentinelName, SentinelAddrs: []string{":" + sentinelPort}, }) - defer client.Close() + }) + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should facilitate failover", func() { // Set value on master, verify - err = client.Set("foo", "master", 0).Err() + err := client.Set("foo", "master", 0).Err() Expect(err).NotTo(HaveOccurred()) - val, err := master.Get("foo").Result() + val, err := sentinelMaster.Get("foo").Result() Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal("master")) // Wait until replicated Eventually(func() string { - return slave1.Get("foo").Val() + return sentinelSlave1.Get("foo").Val() }, "1s", "100ms").Should(Equal("master")) Eventually(func() string { - return slave2.Get("foo").Val() + return sentinelSlave2.Get("foo").Val() }, "1s", "100ms").Should(Equal("master")) // Wait until slaves are picked up by sentinel. @@ -57,14 +43,14 @@ var _ = Describe("Sentinel", func() { }, "10s", "100ms").Should(ContainSubstring("slaves=2")) // Kill master. - master.Shutdown() + sentinelMaster.Shutdown() Eventually(func() error { - return master.Ping().Err() + return sentinelMaster.Ping().Err() }, "5s", "100ms").Should(HaveOccurred()) // Wait for Redis sentinel to elect new master. Eventually(func() string { - return slave1.Info().Val() + slave2.Info().Val() + return sentinelSlave1.Info().Val() + sentinelSlave2.Info().Val() }, "30s", "1s").Should(ContainSubstring("role:master")) // Check that client picked up new master. @@ -73,4 +59,15 @@ var _ = Describe("Sentinel", func() { }, "5s", "100ms").ShouldNot(HaveOccurred()) }) + It("supports DB selection", func() { + Expect(client.Close()).NotTo(HaveOccurred()) + + client = redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: sentinelName, + SentinelAddrs: []string{":" + sentinelPort}, + DB: 1, + }) + err := client.Ping().Err() + Expect(err).NotTo(HaveOccurred()) + }) })