Merge pull request #1483 from go-redis/fix/default-retries

Tweak number of retries
This commit is contained in:
Vladimir Mihailenco 2020-09-11 12:33:49 +03:00 committed by GitHub
commit 68baad7f84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 160 additions and 149 deletions

View File

@ -26,6 +26,9 @@ ring := redis.NewRing(&redis.RingOptions{
}) })
``` ```
- `ClusterOptions.MaxRedirects` default value is changed from 8 to 3.
- `Options.MaxRetries` default value is changed from 0 to 3.
- `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`. - `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`.
## v7.3 ## v7.3

View File

@ -32,7 +32,7 @@ type ClusterOptions struct {
// The maximum number of retries before giving up. Command is retried // The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects. // on network errors and MOVED/ASK redirects.
// Default is 8 retries. // Default is 3 retries.
MaxRedirects int MaxRedirects int
// Enables read-only commands on slave nodes. // Enables read-only commands on slave nodes.
@ -83,7 +83,7 @@ func (opt *ClusterOptions) init() {
if opt.MaxRedirects == -1 { if opt.MaxRedirects == -1 {
opt.MaxRedirects = 0 opt.MaxRedirects = 0
} else if opt.MaxRedirects == 0 { } else if opt.MaxRedirects == 0 {
opt.MaxRedirects = 8 opt.MaxRedirects = 3
} }
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
@ -107,6 +107,9 @@ func (opt *ClusterOptions) init() {
opt.WriteTimeout = opt.ReadTimeout opt.WriteTimeout = opt.ReadTimeout
} }
if opt.MaxRetries == 0 {
opt.MaxRetries = -1
}
switch opt.MinRetryBackoff { switch opt.MinRetryBackoff {
case -1: case -1:
opt.MinRetryBackoff = 0 opt.MinRetryBackoff = 0
@ -132,12 +135,12 @@ func (opt *ClusterOptions) clientOptions() *Options {
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
Username: opt.Username,
Password: opt.Password,
MaxRetries: opt.MaxRetries, MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff, MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff, MaxRetryBackoff: opt.MaxRetryBackoff,
Username: opt.Username,
Password: opt.Password,
readOnly: opt.ReadOnly,
DialTimeout: opt.DialTimeout, DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout, ReadTimeout: opt.ReadTimeout,
@ -150,6 +153,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
IdleTimeout: opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck, IdleCheckFrequency: disableIdleCheck,
readOnly: opt.ReadOnly,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
} }
} }

View File

@ -123,9 +123,13 @@ func redisOptions() *redis.Options {
return &redis.Options{ return &redis.Options{
Addr: redisAddr, Addr: redisAddr,
DB: 15, DB: 15,
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 10, PoolSize: 10,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute, IdleTimeout: time.Minute,
@ -138,6 +142,9 @@ func redisClusterOptions() *redis.ClusterOptions {
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
MaxRedirects: 8,
PoolSize: 10, PoolSize: 10,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute, IdleTimeout: time.Minute,
@ -151,9 +158,13 @@ func redisRingOptions() *redis.RingOptions {
"ringShardOne": ":" + ringShard1Port, "ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port, "ringShardTwo": ":" + ringShard2Port,
}, },
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 10, PoolSize: 10,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute, IdleTimeout: time.Minute,
@ -234,6 +245,7 @@ func execCmd(name string, args ...string) (*os.Process, error) {
func connectTo(port string) (*redis.Client, error) { func connectTo(port string) (*redis.Client, error) {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: ":" + port, Addr: ":" + port,
MaxRetries: -1,
}) })
err := eventually(func() error { err := eventually(func() error {

View File

@ -57,7 +57,7 @@ type Options struct {
DB int DB int
// Maximum number of retries before giving up. // Maximum number of retries before giving up.
// Default is to not retry failed commands. // Default is 3 retries.
MaxRetries int MaxRetries int
// Minimum backoff between each retry. // Minimum backoff between each retry.
// Default is 8 milliseconds; -1 disables backoff. // Default is 8 milliseconds; -1 disables backoff.
@ -164,6 +164,8 @@ func (opt *Options) init() {
if opt.MaxRetries == -1 { if opt.MaxRetries == -1 {
opt.MaxRetries = 0 opt.MaxRetries = 0
} else if opt.MaxRetries == 0 {
opt.MaxRetries = 3
} }
switch opt.MinRetryBackoff { switch opt.MinRetryBackoff {
case -1: case -1:

View File

@ -215,7 +215,7 @@ var _ = Describe("Client", func() {
It("should retry with backoff", func() { It("should retry with backoff", func() {
clientNoRetry := redis.NewClient(&redis.Options{ clientNoRetry := redis.NewClient(&redis.Options{
Addr: ":1234", Addr: ":1234",
MaxRetries: 0, MaxRetries: -1,
}) })
defer clientNoRetry.Close() defer clientNoRetry.Close()

14
ring.go
View File

@ -2,6 +2,7 @@ package redis
import ( import (
"context" "context"
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"net" "net"
@ -84,6 +85,9 @@ type RingOptions struct {
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration IdleTimeout time.Duration
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
TLSConfig *tls.Config
Limiter Limiter
} }
func (opt *RingOptions) init() { func (opt *RingOptions) init() {
@ -101,6 +105,11 @@ func (opt *RingOptions) init() {
opt.NewConsistentHash = newRendezvous opt.NewConsistentHash = newRendezvous
} }
if opt.MaxRetries == -1 {
opt.MaxRetries = 0
} else if opt.MaxRetries == 0 {
opt.MaxRetries = 3
}
switch opt.MinRetryBackoff { switch opt.MinRetryBackoff {
case -1: case -1:
opt.MinRetryBackoff = 0 opt.MinRetryBackoff = 0
@ -124,6 +133,8 @@ func (opt *RingOptions) clientOptions() *Options {
Password: opt.Password, Password: opt.Password,
DB: opt.DB, DB: opt.DB,
MaxRetries: -1,
DialTimeout: opt.DialTimeout, DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout, ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout, WriteTimeout: opt.WriteTimeout,
@ -134,6 +145,9 @@ func (opt *RingOptions) clientOptions() *Options {
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency, IdleCheckFrequency: opt.IdleCheckFrequency,
TLSConfig: opt.TLSConfig,
Limiter: opt.Limiter,
} }
} }

View File

@ -56,8 +56,8 @@ type FailoverOptions struct {
TLSConfig *tls.Config TLSConfig *tls.Config
} }
func (opt *FailoverOptions) options() *Options { func (opt *FailoverOptions) clientOptions() *Options {
redisOpt := &Options{ return &Options{
Addr: "FailoverClient", Addr: "FailoverClient",
Dialer: opt.Dialer, Dialer: opt.Dialer,
@ -84,17 +84,17 @@ func (opt *FailoverOptions) options() *Options {
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
} }
redisOpt.init()
return redisOpt
} }
func (opt *FailoverOptions) clusterOptions() *ClusterOptions { func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
clusterOpt := &ClusterOptions{ return &Options{
Addr: addr,
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
Username: opt.Username, DB: 0,
Password: opt.Password, Password: opt.SentinelPassword,
MaxRetries: opt.MaxRetries, MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff, MinRetryBackoff: opt.MinRetryBackoff,
@ -113,24 +113,50 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
} }
clusterOpt.init() }
return clusterOpt
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
return &ClusterOptions{
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
Username: opt.Username,
Password: opt.Password,
MaxRedirects: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig,
}
} }
// NewFailoverClient returns a Redis client that uses Redis Sentinel // NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple // for automatic failover. It's safe for concurrent use by multiple
// goroutines. // goroutines.
func NewFailoverClient(failoverOpt *FailoverOptions) *Client { func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
failover := &sentinelFailover{ sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
masterName: failoverOpt.MasterName, copy(sentinelAddrs, failoverOpt.SentinelAddrs)
sentinelAddrs: failoverOpt.SentinelAddrs,
sentinelPassword: failoverOpt.SentinelPassword,
opt: failoverOpt.options(), failover := &sentinelFailover{
opt: failoverOpt,
sentinelAddrs: sentinelAddrs,
} }
opt := failoverOpt.options() opt := failoverOpt.clientOptions()
opt.Dialer = masterSlaveDialer(failover, failoverOpt.SlaveOnly) opt.Dialer = masterSlaveDialer(failover)
opt.init()
connPool := newConnPool(opt) connPool := newConnPool(opt)
failover.onFailover = func(ctx context.Context, addr string) { failover.onFailover = func(ctx context.Context, addr string) {
@ -150,13 +176,13 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
} }
func masterSlaveDialer( func masterSlaveDialer(
failover *sentinelFailover, slaveOnly bool, failover *sentinelFailover,
) func(ctx context.Context, network, addr string) (net.Conn, error) { ) func(ctx context.Context, network, addr string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) { return func(ctx context.Context, network, _ string) (net.Conn, error) {
var addr string var addr string
var err error var err error
if slaveOnly { if failover.opt.SlaveOnly {
addr, err = failover.RandomSlaveAddr(ctx) addr, err = failover.RandomSlaveAddr(ctx)
} else { } else {
addr, err = failover.MasterAddr(ctx) addr, err = failover.MasterAddr(ctx)
@ -349,14 +375,12 @@ func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type sentinelFailover struct { type sentinelFailover struct {
sentinelAddrs []string opt *FailoverOptions
sentinelPassword string
opt *Options sentinelAddrs []string
onFailover func(ctx context.Context, addr string) onFailover func(ctx context.Context, addr string)
mu sync.RWMutex mu sync.RWMutex
masterName string
_masterAddr string _masterAddr string
sentinel *SentinelClient sentinel *SentinelClient
pubsub *PubSub pubsub *PubSub
@ -419,31 +443,12 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
} }
for i, sentinelAddr := range c.sentinelAddrs { for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{ sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
Addr: sentinelAddr,
Dialer: c.opt.Dialer,
Username: c.opt.Username, masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
Password: c.opt.Password,
MaxRetries: c.opt.MaxRetries,
DialTimeout: c.opt.DialTimeout,
ReadTimeout: c.opt.ReadTimeout,
WriteTimeout: c.opt.WriteTimeout,
PoolSize: c.opt.PoolSize,
PoolTimeout: c.opt.PoolTimeout,
IdleTimeout: c.opt.IdleTimeout,
IdleCheckFrequency: c.opt.IdleCheckFrequency,
TLSConfig: c.opt.TLSConfig,
})
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
c.masterName, err) c.opt.MasterName, err)
_ = sentinel.Close() _ = sentinel.Close()
continue continue
} }
@ -483,31 +488,12 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error)
} }
for i, sentinelAddr := range c.sentinelAddrs { for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{ sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
Addr: sentinelAddr,
Dialer: c.opt.Dialer,
Username: c.opt.Username, slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
Password: c.opt.Password,
MaxRetries: c.opt.MaxRetries,
DialTimeout: c.opt.DialTimeout,
ReadTimeout: c.opt.ReadTimeout,
WriteTimeout: c.opt.WriteTimeout,
PoolSize: c.opt.PoolSize,
PoolTimeout: c.opt.PoolTimeout,
IdleTimeout: c.opt.IdleTimeout,
IdleCheckFrequency: c.opt.IdleCheckFrequency,
TLSConfig: c.opt.TLSConfig,
})
slaves, err := sentinel.Slaves(ctx, c.masterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
c.masterName, err) c.opt.MasterName, err)
_ = sentinel.Close() _ = sentinel.Close()
continue continue
} }
@ -524,20 +510,20 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error)
} }
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string { func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
c.masterName, err) c.opt.MasterName, err)
return "" return ""
} }
return net.JoinHostPort(addr[0], addr[1]) return net.JoinHostPort(addr[0], addr[1])
} }
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string { func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
addrs, err := sentinel.Slaves(ctx, c.masterName).Result() addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
c.masterName, err) c.opt.MasterName, err)
return []string{} return []string{}
} }
@ -597,7 +583,7 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
c._masterAddr = addr c._masterAddr = addr
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q", internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
c.masterName, addr) c.opt.MasterName, addr)
go c.onFailover(ctx, addr) go c.onFailover(ctx, addr)
} }
@ -613,9 +599,9 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
} }
func (c *sentinelFailover) discoverSentinels(ctx context.Context) { func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
sentinels, err := c.sentinel.Sentinels(ctx, c.masterName).Result() sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.masterName, err) internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
return return
} }
for _, sentinel := range sentinels { for _, sentinel := range sentinels {
@ -626,7 +612,7 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
sentinelAddr := vals[i+1].(string) sentinelAddr := vals[i+1].(string)
if !contains(c.sentinelAddrs, sentinelAddr) { if !contains(c.sentinelAddrs, sentinelAddr) {
internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q", internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
sentinelAddr, c.masterName) sentinelAddr, c.opt.MasterName)
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
} }
} }
@ -644,7 +630,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
if msg.Channel == "+switch-master" { if msg.Channel == "+switch-master" {
parts := strings.Split(msg.Payload, " ") parts := strings.Split(msg.Payload, " ")
if parts[0] != c.masterName { if parts[0] != c.opt.MasterName {
internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0]) internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
continue continue
} }
@ -666,15 +652,15 @@ func contains(slice []string, str string) bool {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
failover := &sentinelFailover{ sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
masterName: failoverOpt.MasterName, copy(sentinelAddrs, failoverOpt.SentinelAddrs)
sentinelAddrs: failoverOpt.SentinelAddrs,
opt: failoverOpt.options(), failover := &sentinelFailover{
opt: failoverOpt,
sentinelAddrs: sentinelAddrs,
} }
opt := failoverOpt.clusterOptions() opt := failoverOpt.clusterOptions()
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) { opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
masterAddr, err := failover.MasterAddr(ctx) masterAddr, err := failover.MasterAddr(ctx)
if err != nil { if err != nil {

View File

@ -18,11 +18,13 @@ var _ = Describe("Sentinel", func() {
client = redis.NewFailoverClient(&redis.FailoverOptions{ client = redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelName, MasterName: sentinelName,
SentinelAddrs: sentinelAddrs, SentinelAddrs: sentinelAddrs,
MaxRetries: -1,
}) })
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
sentinel := redis.NewSentinelClient(&redis.Options{ sentinel := redis.NewSentinelClient(&redis.Options{
Addr: ":" + sentinelPort1, Addr: ":" + sentinelPort1,
MaxRetries: -1,
}) })
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result() addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
@ -30,8 +32,20 @@ var _ = Describe("Sentinel", func() {
master = redis.NewClient(&redis.Options{ master = redis.NewClient(&redis.Options{
Addr: net.JoinHostPort(addr[0], addr[1]), Addr: net.JoinHostPort(addr[0], addr[1]),
MaxRetries: -1,
}) })
masterPort = addr[1] masterPort = addr[1]
// Wait until slaves are picked up by sentinel.
Eventually(func() string {
return sentinel1.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel2.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel3.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
}) })
AfterEach(func() { AfterEach(func() {
@ -52,25 +66,6 @@ var _ = Describe("Sentinel", func() {
// Create subscription. // Create subscription.
ch := client.Subscribe(ctx, "foo").Channel() ch := client.Subscribe(ctx, "foo").Channel()
// Wait until replicated.
Eventually(func() string {
return sentinelSlave1.Get(ctx, "foo").Val()
}, "15s", "100ms").Should(Equal("master"))
Eventually(func() string {
return sentinelSlave2.Get(ctx, "foo").Val()
}, "15s", "100ms").Should(Equal("master"))
// Wait until slaves are picked up by sentinel.
Eventually(func() string {
return sentinel1.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel2.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel3.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
// Kill master. // Kill master.
err = master.Shutdown(ctx).Err() err = master.Shutdown(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -79,9 +74,9 @@ var _ = Describe("Sentinel", func() {
}, "15s", "100ms").Should(HaveOccurred()) }, "15s", "100ms").Should(HaveOccurred())
// Check that client picked up new master. // Check that client picked up new master.
Eventually(func() error { Eventually(func() string {
return client.Get(ctx, "foo").Err() return client.Get(ctx, "foo").Val()
}, "15s", "100ms").ShouldNot(HaveOccurred()) }, "15s", "100ms").Should(Equal("master"))
// Check if subscription is renewed. // Check if subscription is renewed.
var msg *redis.Message var msg *redis.Message
@ -123,6 +118,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
sentinel := redis.NewSentinelClient(&redis.Options{ sentinel := redis.NewSentinelClient(&redis.Options{
Addr: ":" + sentinelPort1, Addr: ":" + sentinelPort1,
MaxRetries: -1,
}) })
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result() addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
@ -130,8 +126,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
master = redis.NewClient(&redis.Options{ master = redis.NewClient(&redis.Options{
Addr: net.JoinHostPort(addr[0], addr[1]), Addr: net.JoinHostPort(addr[0], addr[1]),
MaxRetries: -1,
}) })
masterPort = addr[1] masterPort = addr[1]
// Wait until slaves are picked up by sentinel.
Eventually(func() string {
return sentinel1.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel2.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel3.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
}) })
AfterEach(func() { AfterEach(func() {
@ -152,25 +160,6 @@ var _ = Describe("NewFailoverClusterClient", func() {
// Create subscription. // Create subscription.
ch := client.Subscribe(ctx, "foo").Channel() ch := client.Subscribe(ctx, "foo").Channel()
// Wait until replicated.
Eventually(func() string {
return sentinelSlave1.Get(ctx, "foo").Val()
}, "15s", "100ms").Should(Equal("master"))
Eventually(func() string {
return sentinelSlave2.Get(ctx, "foo").Val()
}, "15s", "100ms").Should(Equal("master"))
// Wait until slaves are picked up by sentinel.
Eventually(func() string {
return sentinel1.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel2.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel3.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
// Kill master. // Kill master.
err = master.Shutdown(ctx).Err() err = master.Shutdown(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -179,9 +168,9 @@ var _ = Describe("NewFailoverClusterClient", func() {
}, "15s", "100ms").Should(HaveOccurred()) }, "15s", "100ms").Should(HaveOccurred())
// Check that client picked up new master. // Check that client picked up new master.
Eventually(func() error { Eventually(func() string {
return client.Get(ctx, "foo").Err() return client.Get(ctx, "foo").Val()
}, "15s", "100ms").ShouldNot(HaveOccurred()) }, "15s", "100ms").Should(Equal("master"))
// Check if subscription is renewed. // Check if subscription is renewed.
var msg *redis.Message var msg *redis.Message