From e16db84e4a8ceb914b3ce2d22db11727f46ab085 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Thu, 15 Jan 2015 17:23:22 +0000 Subject: [PATCH] Better redis process abstraction --- redis_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++ sentinel_test.go | 83 ++++++++++-------------------------------------- 2 files changed, 90 insertions(+), 67 deletions(-) diff --git a/redis_test.go b/redis_test.go index 8649241b..a1a0de15 100644 --- a/redis_test.go +++ b/redis_test.go @@ -2,6 +2,8 @@ package redis_test import ( "net" + "os" + "os/exec" "sort" "testing" "time" @@ -128,6 +130,78 @@ func sortStrings(slice []string) []string { return slice } +func execCmd(name string, args ...string) (*os.Process, error) { + cmd := exec.Command(name, args...) + if false { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } + return cmd.Process, cmd.Start() +} + +func connectTo(port string) (client *redis.Client, err error) { + client = redis.NewTCPClient(&redis.Options{ + Addr: ":" + port, + }) + + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if err = client.Ping().Err(); err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + return +} + +type redisProcess struct { + *os.Process + *redis.Client +} + +func (p *redisProcess) Close() error { + p.Client.Close() + return p.Kill() +} + +func startRedis(port string, args ...string) (*redisProcess, error) { + process, err := execCmd("redis-server", append([]string{"--port", port}, args...)...) + if err != nil { + return nil, err + } + client, err := connectTo(port) + if err != nil { + process.Kill() + return nil, err + } + return &redisProcess{process, client}, err +} + +func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { + process, err := execCmd("redis-server", os.DevNull, "--sentinel", "--port", port) + if err != nil { + return nil, err + } + client, err := connectTo(port) + if err != nil { + process.Kill() + return nil, err + } + for _, cmd := range []*redis.StatusCmd{ + redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"), + redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"), + redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"), + redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"), + } { + client.Process(cmd) + if err := cmd.Err(); err != nil { + process.Kill() + return nil, err + } + } + return &redisProcess{process, client}, err +} + //------------------------------------------------------------------------------ func BenchmarkRedisPing(b *testing.B) { diff --git a/sentinel_test.go b/sentinel_test.go index abbba925..5404eec0 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -1,11 +1,6 @@ package redis_test import ( - "io/ioutil" - "os" - "os/exec" - "path/filepath" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "gopkg.in/redis.v2" @@ -16,69 +11,23 @@ var _ = Describe("Sentinel", func() { const masterName = "mymaster" const masterPort = "8123" const sentinelPort = "8124" - const sentinelConf = ` -port ` + sentinelPort + ` - -sentinel monitor ` + masterName + ` 127.0.0.1 ` + masterPort + ` 1 -sentinel down-after-milliseconds ` + masterName + ` 400 -sentinel failover-timeout ` + masterName + ` 800 -sentinel parallel-syncs ` + masterName + ` 1 -` - - var runCmd = func(name string, args ...string) *os.Process { - cmd := exec.Command(name, args...) - if false { - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - } - err := cmd.Start() - Expect(err).NotTo(HaveOccurred()) - return cmd.Process - } - - var connect = func(port string) *redis.Client { - client := redis.NewTCPClient(&redis.Options{ - Addr: ":" + port, - }) - Eventually(func() error { - return client.Ping().Err() - }, "1s", "100ms").ShouldNot(HaveOccurred()) - return client - } - - var startMaster = func() (*redis.Client, *os.Process) { - proc := runCmd("redis-server", "--port", masterPort) - return connect(masterPort), proc - } - - var startSlave = func(port string) (*redis.Client, *os.Process) { - proc := runCmd("redis-server", "--port", port, "--slaveof", "127.0.0.1", masterPort) - return connect(port), proc - } - - var startSentinel = func() *os.Process { - dir, err := ioutil.TempDir("", "sentinel") - Expect(err).NotTo(HaveOccurred()) - - fname := filepath.Join(dir, "sentinel.conf") - err = ioutil.WriteFile(fname, []byte(sentinelConf), 0664) - Expect(err).NotTo(HaveOccurred()) - - proc := runCmd("redis-server", fname, "--sentinel") - client := connect(sentinelPort) - client.Close() - return proc - } It("should facilitate failover", func() { - master, mproc := startMaster() - defer mproc.Kill() - slave1, sproc1 := startSlave("8125") - defer sproc1.Kill() - slave2, sproc2 := startSlave("8126") - defer sproc2.Kill() - sntproc := startSentinel() - defer sntproc.Kill() + master, err := startRedis(masterPort) + Expect(err).NotTo(HaveOccurred()) + defer master.Close() + + sentinel, err := startSentinel(sentinelPort, masterName, masterPort) + Expect(err).NotTo(HaveOccurred()) + defer sentinel.Close() + + slave1, err := startRedis("8125", "--slaveof", "127.0.0.1", masterPort) + Expect(err).NotTo(HaveOccurred()) + defer slave1.Close() + + slave2, err := startRedis("8126", "--slaveof", "127.0.0.1", masterPort) + Expect(err).NotTo(HaveOccurred()) + defer slave2.Close() client := redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: masterName, @@ -87,7 +36,7 @@ sentinel parallel-syncs ` + masterName + ` 1 defer client.Close() // Set value on master, verify - err := client.Set("foo", "master").Err() + err = client.Set("foo", "master").Err() Expect(err).NotTo(HaveOccurred()) val, err := master.Get("foo").Result()