sentine: don't pass DB and Password to Sentinel client.

This commit is contained in:
Vladimir Mihailenco 2015-05-14 16:13:45 +03:00
parent b8b073f3bf
commit 1078a303ea
6 changed files with 265 additions and 227 deletions

View File

@ -144,21 +144,6 @@ func stopCluster(scenario *clusterScenario) error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
var _ = Describe("Cluster", func() { var _ = Describe("Cluster", func() {
scenario := &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
BeforeSuite(func() {
Expect(startCluster(scenario)).NotTo(HaveOccurred())
})
AfterSuite(func() {
Expect(stopCluster(scenario)).NotTo(HaveOccurred())
})
Describe("HashSlot", func() { Describe("HashSlot", func() {
It("should calculate hash slots", func() { It("should calculate hash slots", func() {
@ -202,7 +187,7 @@ var _ = Describe("Cluster", func() {
Describe("Commands", func() { Describe("Commands", func() {
It("should CLUSTER SLOTS", func() { It("should CLUSTER SLOTS", func() {
res, err := scenario.primary().ClusterSlots().Result() res, err := cluster.primary().ClusterSlots().Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(3)) Expect(res).To(HaveLen(3))
Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{ Expect(res).To(ConsistOf([]redis.ClusterSlotInfo{
@ -213,13 +198,13 @@ var _ = Describe("Cluster", func() {
}) })
It("should CLUSTER NODES", func() { It("should CLUSTER NODES", func() {
res, err := scenario.primary().ClusterNodes().Result() res, err := cluster.primary().ClusterNodes().Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(res)).To(BeNumerically(">", 400)) Expect(len(res)).To(BeNumerically(">", 400))
}) })
It("should CLUSTER INFO", func() { It("should CLUSTER INFO", func() {
res, err := scenario.primary().ClusterInfo().Result() res, err := cluster.primary().ClusterInfo().Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("cluster_known_nodes:6")) Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
}) })
@ -230,11 +215,11 @@ var _ = Describe("Cluster", func() {
var client *redis.ClusterClient var client *redis.ClusterClient
BeforeEach(func() { BeforeEach(func() {
client = scenario.clusterClient(nil) client = cluster.clusterClient(nil)
}) })
AfterEach(func() { AfterEach(func() {
for _, client := range scenario.masters() { for _, client := range cluster.masters() {
Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
} }
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
@ -304,7 +289,7 @@ var _ = Describe("Cluster", func() {
}) })
It("should return error when there are no attempts left", func() { It("should return error when there are no attempts left", func() {
client = scenario.clusterClient(&redis.ClusterOptions{ client = cluster.clusterClient(&redis.ClusterOptions{
MaxRedirects: -1, MaxRedirects: -1,
}) })
@ -321,17 +306,17 @@ var _ = Describe("Cluster", func() {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func BenchmarkRedisClusterPing(b *testing.B) { func BenchmarkRedisClusterPing(b *testing.B) {
scenario := &clusterScenario{ cluster := &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"}, ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6), nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6), processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6), clients: make(map[string]*redis.Client, 6),
} }
if err := startCluster(scenario); err != nil { if err := startCluster(cluster); err != nil {
b.Fatal(err) b.Fatal(err)
} }
defer stopCluster(scenario) defer stopCluster(cluster)
client := scenario.clusterClient(nil) client := cluster.clusterClient(nil)
defer client.Close() defer client.Close()
b.ResetTimer() b.ResetTimer()

View File

@ -307,9 +307,8 @@ var _ = Describe("Commands", func() {
Expect(refCount.Err()).NotTo(HaveOccurred()) Expect(refCount.Err()).NotTo(HaveOccurred())
Expect(refCount.Val()).To(Equal(int64(1))) Expect(refCount.Val()).To(Equal(int64(1)))
enc := client.ObjectEncoding("key") err := client.ObjectEncoding("key").Err()
Expect(enc.Err()).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(enc.Val()).To(Equal("raw"))
idleTime := client.ObjectIdleTime("key") idleTime := client.ObjectIdleTime("key")
Expect(idleTime.Err()).NotTo(HaveOccurred()) Expect(idleTime.Err()).NotTo(HaveOccurred())

223
main_test.go Normal file
View File

@ -0,0 +1,223 @@
package redis_test
import (
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"gopkg.in/redis.v2"
)
const redisAddr = ":6379"
const (
sentinelName = "mymaster"
sentinelMasterPort = "8123"
sentinelSlave1Port = "8124"
sentinelSlave2Port = "8125"
sentinelPort = "8126"
)
var sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
var cluster = &clusterScenario{
ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
nodeIds: make([]string, 6),
processes: make(map[string]*redisProcess, 6),
clients: make(map[string]*redis.Client, 6),
}
var _ = BeforeSuite(func() {
var err error
sentinelMaster, err = startRedis(sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinelSlave1, err = startRedis(
sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
sentinelSlave2, err = startRedis(
sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
Expect(err).NotTo(HaveOccurred())
Expect(startCluster(cluster)).NotTo(HaveOccurred())
})
var _ = AfterSuite(func() {
Expect(sentinel.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
Expect(stopCluster(cluster)).NotTo(HaveOccurred())
})
func TestGinkgoSuite(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "gopkg.in/redis.v2")
}
//------------------------------------------------------------------------------
// Replaces ginkgo's Eventually.
func waitForSubstring(fn func() string, substr string, timeout time.Duration) error {
var s string
found := make(chan struct{})
var exit int32
go func() {
for atomic.LoadInt32(&exit) == 0 {
s = fn()
if strings.Contains(s, substr) {
found <- struct{}{}
return
}
time.Sleep(timeout / 100)
}
}()
select {
case <-found:
return nil
case <-time.After(timeout):
atomic.StoreInt32(&exit, 1)
}
return fmt.Errorf("%q does not contain %q", s, substr)
}
func execCmd(name string, args ...string) (*os.Process, error) {
cmd := exec.Command(name, args...)
if testing.Verbose() {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd.Process, cmd.Start()
}
func connectTo(port string) (client *redis.Client, err error) {
client = redis.NewClient(&redis.Options{
Addr: ":" + port,
})
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if err = client.Ping().Err(); err == nil {
return client, nil
}
time.Sleep(250 * time.Millisecond)
}
return nil, err
}
type redisProcess struct {
*os.Process
*redis.Client
}
func (p *redisProcess) Close() error {
p.Client.Close()
return p.Kill()
}
var (
redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server"))
redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf"))
)
func redisDir(port string) (string, error) {
dir, err := filepath.Abs(filepath.Join(".test", "instances", port))
if err != nil {
return "", err
} else if err = os.RemoveAll(dir); err != nil {
return "", err
} else if err = os.MkdirAll(dir, 0775); err != nil {
return "", err
}
return dir, nil
}
func startRedis(port string, args ...string) (*redisProcess, error) {
dir, err := redisDir(port)
if err != nil {
return nil, err
}
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
return nil, err
}
baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
return &redisProcess{process, client}, err
}
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
dir, err := redisDir(port)
if err != nil {
return nil, err
}
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
for _, cmd := range []*redis.StatusCmd{
redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
} {
client.Process(cmd)
if err := cmd.Err(); err != nil {
process.Kill()
return nil, err
}
}
return &redisProcess{process, client}, nil
}
//------------------------------------------------------------------------------
type badNetConn struct {
net.TCPConn
}
var _ net.Conn = &badNetConn{}
func newBadNetConn() net.Conn {
return &badNetConn{}
}
func (badNetConn) Read([]byte) (int, error) {
return 0, net.UnknownNetworkError("badNetConn")
}
func (badNetConn) Write([]byte) (int, error) {
return 0, net.UnknownNetworkError("badNetConn")
}

View File

@ -1,29 +1,16 @@
package redis_test package redis_test
import ( import (
"fmt"
"net" "net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync/atomic"
"testing" "testing"
"time" "time"
"gopkg.in/redis.v2"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"gopkg.in/redis.v2"
) )
const redisAddr = ":6379"
func TestGinkgoSuite(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "gopkg.in/redis.v2")
}
var _ = Describe("Client", func() { var _ = Describe("Client", func() {
var client *redis.Client var client *redis.Client
@ -280,153 +267,3 @@ func BenchmarkPipeline(b *testing.B) {
} }
}) })
} }
//------------------------------------------------------------------------------
type badNetConn struct {
net.TCPConn
}
var _ net.Conn = &badNetConn{}
func newBadNetConn() net.Conn {
return &badNetConn{}
}
func (badNetConn) Read([]byte) (int, error) {
return 0, net.UnknownNetworkError("badNetConn")
}
func (badNetConn) Write([]byte) (int, error) {
return 0, net.UnknownNetworkError("badNetConn")
}
// Replaces ginkgo's Eventually.
func waitForSubstring(fn func() string, substr string, timeout time.Duration) error {
var s string
found := make(chan struct{})
var exit int32
go func() {
for atomic.LoadInt32(&exit) == 0 {
s = fn()
if strings.Contains(s, substr) {
found <- struct{}{}
return
}
time.Sleep(timeout / 100)
}
}()
select {
case <-found:
return nil
case <-time.After(timeout):
atomic.StoreInt32(&exit, 1)
}
return fmt.Errorf("%q does not contain %q", s, substr)
}
func execCmd(name string, args ...string) (*os.Process, error) {
cmd := exec.Command(name, args...)
if testing.Verbose() {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd.Process, cmd.Start()
}
func connectTo(port string) (client *redis.Client, err error) {
client = redis.NewClient(&redis.Options{
Addr: ":" + port,
})
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if err = client.Ping().Err(); err == nil {
return client, nil
}
time.Sleep(250 * time.Millisecond)
}
return nil, err
}
type redisProcess struct {
*os.Process
*redis.Client
}
func (p *redisProcess) Close() error {
p.Client.Close()
return p.Kill()
}
var (
redisServerBin, _ = filepath.Abs(filepath.Join(".test", "redis", "src", "redis-server"))
redisServerConf, _ = filepath.Abs(filepath.Join(".test", "redis.conf"))
)
func redisDir(port string) (string, error) {
dir, err := filepath.Abs(filepath.Join(".test", "instances", port))
if err != nil {
return "", err
} else if err = os.RemoveAll(dir); err != nil {
return "", err
} else if err = os.MkdirAll(dir, 0775); err != nil {
return "", err
}
return dir, nil
}
func startRedis(port string, args ...string) (*redisProcess, error) {
dir, err := redisDir(port)
if err != nil {
return nil, err
}
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
return nil, err
}
baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
return &redisProcess{process, client}, err
}
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
dir, err := redisDir(port)
if err != nil {
return nil, err
}
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
for _, cmd := range []*redis.StatusCmd{
redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
} {
client.Process(cmd)
if err := cmd.Err(); err != nil {
process.Kill()
return nil, err
}
}
return &redisProcess{process, client}, nil
}

View File

@ -185,9 +185,6 @@ func (d *sentinelFailover) MasterAddr() (string, error) {
sentinel := newSentinel(&Options{ sentinel := newSentinel(&Options{
Addr: sentinelAddr, Addr: sentinelAddr,
DB: d.opt.DB,
Password: d.opt.Password,
DialTimeout: d.opt.DialTimeout, DialTimeout: d.opt.DialTimeout,
ReadTimeout: d.opt.ReadTimeout, ReadTimeout: d.opt.ReadTimeout,
WriteTimeout: d.opt.WriteTimeout, WriteTimeout: d.opt.WriteTimeout,

View File

@ -7,48 +7,34 @@ import (
) )
var _ = Describe("Sentinel", func() { var _ = Describe("Sentinel", func() {
var client *redis.Client
const masterName = "mymaster" BeforeEach(func() {
const masterPort = "8123" client = redis.NewFailoverClient(&redis.FailoverOptions{
const sentinelPort = "8124" MasterName: sentinelName,
It("should facilitate failover", func() {
master, err := startRedis(masterPort)
Expect(err).NotTo(HaveOccurred())
defer master.Close()
sentinel, err := startSentinel(sentinelPort, masterName, masterPort)
Expect(err).NotTo(HaveOccurred())
defer sentinel.Close()
slave1, err := startRedis("8125", "--slaveof", "127.0.0.1", masterPort)
Expect(err).NotTo(HaveOccurred())
defer slave1.Close()
slave2, err := startRedis("8126", "--slaveof", "127.0.0.1", masterPort)
Expect(err).NotTo(HaveOccurred())
defer slave2.Close()
client := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: masterName,
SentinelAddrs: []string{":" + sentinelPort}, SentinelAddrs: []string{":" + sentinelPort},
}) })
defer client.Close() })
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should facilitate failover", func() {
// Set value on master, verify // Set value on master, verify
err = client.Set("foo", "master", 0).Err() err := client.Set("foo", "master", 0).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
val, err := master.Get("foo").Result() val, err := sentinelMaster.Get("foo").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("master")) Expect(val).To(Equal("master"))
// Wait until replicated // Wait until replicated
Eventually(func() string { Eventually(func() string {
return slave1.Get("foo").Val() return sentinelSlave1.Get("foo").Val()
}, "1s", "100ms").Should(Equal("master")) }, "1s", "100ms").Should(Equal("master"))
Eventually(func() string { Eventually(func() string {
return slave2.Get("foo").Val() return sentinelSlave2.Get("foo").Val()
}, "1s", "100ms").Should(Equal("master")) }, "1s", "100ms").Should(Equal("master"))
// Wait until slaves are picked up by sentinel. // Wait until slaves are picked up by sentinel.
@ -57,14 +43,14 @@ var _ = Describe("Sentinel", func() {
}, "10s", "100ms").Should(ContainSubstring("slaves=2")) }, "10s", "100ms").Should(ContainSubstring("slaves=2"))
// Kill master. // Kill master.
master.Shutdown() sentinelMaster.Shutdown()
Eventually(func() error { Eventually(func() error {
return master.Ping().Err() return sentinelMaster.Ping().Err()
}, "5s", "100ms").Should(HaveOccurred()) }, "5s", "100ms").Should(HaveOccurred())
// Wait for Redis sentinel to elect new master. // Wait for Redis sentinel to elect new master.
Eventually(func() string { Eventually(func() string {
return slave1.Info().Val() + slave2.Info().Val() return sentinelSlave1.Info().Val() + sentinelSlave2.Info().Val()
}, "30s", "1s").Should(ContainSubstring("role:master")) }, "30s", "1s").Should(ContainSubstring("role:master"))
// Check that client picked up new master. // Check that client picked up new master.
@ -73,4 +59,15 @@ var _ = Describe("Sentinel", func() {
}, "5s", "100ms").ShouldNot(HaveOccurred()) }, "5s", "100ms").ShouldNot(HaveOccurred())
}) })
It("supports DB selection", func() {
Expect(client.Close()).NotTo(HaveOccurred())
client = redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: []string{":" + sentinelPort},
DB: 1,
})
err := client.Ping().Err()
Expect(err).NotTo(HaveOccurred())
})
}) })