diff --git a/CHANGELOG.md b/CHANGELOG.md index c56f581..55b3c3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,9 @@ ring := redis.NewRing(&redis.RingOptions{ }) ``` +- `ClusterOptions.MaxRedirects` default value is changed from 8 to 3. +- `Options.MaxRetries` default value is changed from 0 to 3. + - `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`. ## v7.3 diff --git a/cluster.go b/cluster.go index 51ceb09..78ad22c 100644 --- a/cluster.go +++ b/cluster.go @@ -32,7 +32,7 @@ type ClusterOptions struct { // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. - // Default is 8 retries. + // Default is 3 retries. MaxRedirects int // Enables read-only commands on slave nodes. @@ -83,7 +83,7 @@ func (opt *ClusterOptions) init() { if opt.MaxRedirects == -1 { opt.MaxRedirects = 0 } else if opt.MaxRedirects == 0 { - opt.MaxRedirects = 8 + opt.MaxRedirects = 3 } if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { @@ -107,6 +107,9 @@ func (opt *ClusterOptions) init() { opt.WriteTimeout = opt.ReadTimeout } + if opt.MaxRetries == 0 { + opt.MaxRetries = -1 + } switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -132,12 +135,12 @@ func (opt *ClusterOptions) clientOptions() *Options { Dialer: opt.Dialer, OnConnect: opt.OnConnect, + Username: opt.Username, + Password: opt.Password, + MaxRetries: opt.MaxRetries, MinRetryBackoff: opt.MinRetryBackoff, MaxRetryBackoff: opt.MaxRetryBackoff, - Username: opt.Username, - Password: opt.Password, - readOnly: opt.ReadOnly, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, @@ -150,6 +153,8 @@ func (opt *ClusterOptions) clientOptions() *Options { IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: disableIdleCheck, + readOnly: opt.ReadOnly, + TLSConfig: opt.TLSConfig, } } diff --git a/main_test.go b/main_test.go index 5cc263e..1adbcbe 100644 --- a/main_test.go +++ b/main_test.go @@ -121,11 +121,15 @@ func TestGinkgoSuite(t *testing.T) { func redisOptions() *redis.Options { return &redis.Options{ - Addr: redisAddr, - DB: 15, - DialTimeout: 10 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + Addr: redisAddr, + DB: 15, + + DialTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + + MaxRetries: -1, + PoolSize: 10, PoolTimeout: 30 * time.Second, IdleTimeout: time.Minute, @@ -135,9 +139,12 @@ func redisOptions() *redis.Options { func redisClusterOptions() *redis.ClusterOptions { return &redis.ClusterOptions{ - DialTimeout: 10 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + DialTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + + MaxRedirects: 8, + PoolSize: 10, PoolTimeout: 30 * time.Second, IdleTimeout: time.Minute, @@ -151,9 +158,13 @@ func redisRingOptions() *redis.RingOptions { "ringShardOne": ":" + ringShard1Port, "ringShardTwo": ":" + ringShard2Port, }, - DialTimeout: 10 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + + DialTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + + MaxRetries: -1, + PoolSize: 10, PoolTimeout: 30 * time.Second, IdleTimeout: time.Minute, @@ -233,7 +244,8 @@ func execCmd(name string, args ...string) (*os.Process, error) { func connectTo(port string) (*redis.Client, error) { client := redis.NewClient(&redis.Options{ - Addr: ":" + port, + Addr: ":" + port, + MaxRetries: -1, }) err := eventually(func() error { diff --git a/options.go b/options.go index 1be7c8b..373037c 100644 --- a/options.go +++ b/options.go @@ -57,7 +57,7 @@ type Options struct { DB int // Maximum number of retries before giving up. - // Default is to not retry failed commands. + // Default is 3 retries. MaxRetries int // Minimum backoff between each retry. // Default is 8 milliseconds; -1 disables backoff. @@ -164,6 +164,8 @@ func (opt *Options) init() { if opt.MaxRetries == -1 { opt.MaxRetries = 0 + } else if opt.MaxRetries == 0 { + opt.MaxRetries = 3 } switch opt.MinRetryBackoff { case -1: diff --git a/redis_test.go b/redis_test.go index 45cfac7..044a7c3 100644 --- a/redis_test.go +++ b/redis_test.go @@ -215,7 +215,7 @@ var _ = Describe("Client", func() { It("should retry with backoff", func() { clientNoRetry := redis.NewClient(&redis.Options{ Addr: ":1234", - MaxRetries: 0, + MaxRetries: -1, }) defer clientNoRetry.Close() diff --git a/ring.go b/ring.go index ca48d8c..370216f 100644 --- a/ring.go +++ b/ring.go @@ -2,6 +2,7 @@ package redis import ( "context" + "crypto/tls" "errors" "fmt" "net" @@ -84,6 +85,9 @@ type RingOptions struct { PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration + + TLSConfig *tls.Config + Limiter Limiter } func (opt *RingOptions) init() { @@ -101,6 +105,11 @@ func (opt *RingOptions) init() { opt.NewConsistentHash = newRendezvous } + if opt.MaxRetries == -1 { + opt.MaxRetries = 0 + } else if opt.MaxRetries == 0 { + opt.MaxRetries = 3 + } switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 @@ -124,6 +133,8 @@ func (opt *RingOptions) clientOptions() *Options { Password: opt.Password, DB: opt.DB, + MaxRetries: -1, + DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, @@ -134,6 +145,9 @@ func (opt *RingOptions) clientOptions() *Options { PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, + + TLSConfig: opt.TLSConfig, + Limiter: opt.Limiter, } } diff --git a/sentinel.go b/sentinel.go index c7e1b15..b4f8244 100644 --- a/sentinel.go +++ b/sentinel.go @@ -56,8 +56,8 @@ type FailoverOptions struct { TLSConfig *tls.Config } -func (opt *FailoverOptions) options() *Options { - redisOpt := &Options{ +func (opt *FailoverOptions) clientOptions() *Options { + return &Options{ Addr: "FailoverClient", Dialer: opt.Dialer, @@ -84,17 +84,17 @@ func (opt *FailoverOptions) options() *Options { TLSConfig: opt.TLSConfig, } - redisOpt.init() - return redisOpt } -func (opt *FailoverOptions) clusterOptions() *ClusterOptions { - clusterOpt := &ClusterOptions{ +func (opt *FailoverOptions) sentinelOptions(addr string) *Options { + return &Options{ + Addr: addr, + Dialer: opt.Dialer, OnConnect: opt.OnConnect, - Username: opt.Username, - Password: opt.Password, + DB: 0, + Password: opt.SentinelPassword, MaxRetries: opt.MaxRetries, MinRetryBackoff: opt.MinRetryBackoff, @@ -113,24 +113,50 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { TLSConfig: opt.TLSConfig, } - clusterOpt.init() - return clusterOpt +} + +func (opt *FailoverOptions) clusterOptions() *ClusterOptions { + return &ClusterOptions{ + Dialer: opt.Dialer, + OnConnect: opt.OnConnect, + + Username: opt.Username, + Password: opt.Password, + + MaxRedirects: opt.MaxRetries, + MinRetryBackoff: opt.MinRetryBackoff, + MaxRetryBackoff: opt.MaxRetryBackoff, + + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + + TLSConfig: opt.TLSConfig, + } } // NewFailoverClient returns a Redis client that uses Redis Sentinel // for automatic failover. It's safe for concurrent use by multiple // goroutines. func NewFailoverClient(failoverOpt *FailoverOptions) *Client { - failover := &sentinelFailover{ - masterName: failoverOpt.MasterName, - sentinelAddrs: failoverOpt.SentinelAddrs, - sentinelPassword: failoverOpt.SentinelPassword, + sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) + copy(sentinelAddrs, failoverOpt.SentinelAddrs) - opt: failoverOpt.options(), + failover := &sentinelFailover{ + opt: failoverOpt, + sentinelAddrs: sentinelAddrs, } - opt := failoverOpt.options() - opt.Dialer = masterSlaveDialer(failover, failoverOpt.SlaveOnly) + opt := failoverOpt.clientOptions() + opt.Dialer = masterSlaveDialer(failover) + opt.init() connPool := newConnPool(opt) failover.onFailover = func(ctx context.Context, addr string) { @@ -150,13 +176,13 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { } func masterSlaveDialer( - failover *sentinelFailover, slaveOnly bool, + failover *sentinelFailover, ) func(ctx context.Context, network, addr string) (net.Conn, error) { return func(ctx context.Context, network, _ string) (net.Conn, error) { var addr string var err error - if slaveOnly { + if failover.opt.SlaveOnly { addr, err = failover.RandomSlaveAddr(ctx) } else { addr, err = failover.MasterAddr(ctx) @@ -349,14 +375,12 @@ func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd { //------------------------------------------------------------------------------ type sentinelFailover struct { - sentinelAddrs []string - sentinelPassword string + opt *FailoverOptions - opt *Options - onFailover func(ctx context.Context, addr string) + sentinelAddrs []string + onFailover func(ctx context.Context, addr string) mu sync.RWMutex - masterName string _masterAddr string sentinel *SentinelClient pubsub *PubSub @@ -419,31 +443,12 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { } for i, sentinelAddr := range c.sentinelAddrs { - sentinel := NewSentinelClient(&Options{ - Addr: sentinelAddr, - Dialer: c.opt.Dialer, + sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr)) - Username: c.opt.Username, - Password: c.opt.Password, - - MaxRetries: c.opt.MaxRetries, - - DialTimeout: c.opt.DialTimeout, - ReadTimeout: c.opt.ReadTimeout, - WriteTimeout: c.opt.WriteTimeout, - - PoolSize: c.opt.PoolSize, - PoolTimeout: c.opt.PoolTimeout, - IdleTimeout: c.opt.IdleTimeout, - IdleCheckFrequency: c.opt.IdleCheckFrequency, - - TLSConfig: c.opt.TLSConfig, - }) - - masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() + masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result() if err != nil { internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s", - c.masterName, err) + c.opt.MasterName, err) _ = sentinel.Close() continue } @@ -483,31 +488,12 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) } for i, sentinelAddr := range c.sentinelAddrs { - sentinel := NewSentinelClient(&Options{ - Addr: sentinelAddr, - Dialer: c.opt.Dialer, + sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr)) - Username: c.opt.Username, - Password: c.opt.Password, - - MaxRetries: c.opt.MaxRetries, - - DialTimeout: c.opt.DialTimeout, - ReadTimeout: c.opt.ReadTimeout, - WriteTimeout: c.opt.WriteTimeout, - - PoolSize: c.opt.PoolSize, - PoolTimeout: c.opt.PoolTimeout, - IdleTimeout: c.opt.IdleTimeout, - IdleCheckFrequency: c.opt.IdleCheckFrequency, - - TLSConfig: c.opt.TLSConfig, - }) - - slaves, err := sentinel.Slaves(ctx, c.masterName).Result() + slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result() if err != nil { internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s", - c.masterName, err) + c.opt.MasterName, err) _ = sentinel.Close() continue } @@ -524,20 +510,20 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) } func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string { - addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() + addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result() if err != nil { internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s", - c.masterName, err) + c.opt.MasterName, err) return "" } return net.JoinHostPort(addr[0], addr[1]) } func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string { - addrs, err := sentinel.Slaves(ctx, c.masterName).Result() + addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result() if err != nil { internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s", - c.masterName, err) + c.opt.MasterName, err) return []string{} } @@ -597,7 +583,7 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) { c._masterAddr = addr internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q", - c.masterName, addr) + c.opt.MasterName, addr) go c.onFailover(ctx, addr) } @@ -613,9 +599,9 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl } func (c *sentinelFailover) discoverSentinels(ctx context.Context) { - sentinels, err := c.sentinel.Sentinels(ctx, c.masterName).Result() + sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result() if err != nil { - internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.masterName, err) + internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err) return } for _, sentinel := range sentinels { @@ -626,7 +612,7 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) { sentinelAddr := vals[i+1].(string) if !contains(c.sentinelAddrs, sentinelAddr) { internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q", - sentinelAddr, c.masterName) + sentinelAddr, c.opt.MasterName) c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) } } @@ -644,7 +630,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) { if msg.Channel == "+switch-master" { parts := strings.Split(msg.Payload, " ") - if parts[0] != c.masterName { + if parts[0] != c.opt.MasterName { internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0]) continue } @@ -666,15 +652,15 @@ func contains(slice []string, str string) bool { //------------------------------------------------------------------------------ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { - failover := &sentinelFailover{ - masterName: failoverOpt.MasterName, - sentinelAddrs: failoverOpt.SentinelAddrs, + sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) + copy(sentinelAddrs, failoverOpt.SentinelAddrs) - opt: failoverOpt.options(), + failover := &sentinelFailover{ + opt: failoverOpt, + sentinelAddrs: sentinelAddrs, } opt := failoverOpt.clusterOptions() - opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) { masterAddr, err := failover.MasterAddr(ctx) if err != nil { diff --git a/sentinel_test.go b/sentinel_test.go index 29dfac6..2645c43 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -18,20 +18,34 @@ var _ = Describe("Sentinel", func() { client = redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: sentinelName, SentinelAddrs: sentinelAddrs, + MaxRetries: -1, }) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) sentinel := redis.NewSentinelClient(&redis.Options{ - Addr: ":" + sentinelPort1, + Addr: ":" + sentinelPort1, + MaxRetries: -1, }) addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result() Expect(err).NotTo(HaveOccurred()) master = redis.NewClient(&redis.Options{ - Addr: net.JoinHostPort(addr[0], addr[1]), + Addr: net.JoinHostPort(addr[0], addr[1]), + MaxRetries: -1, }) masterPort = addr[1] + + // Wait until slaves are picked up by sentinel. + Eventually(func() string { + return sentinel1.Info(ctx).Val() + }, "15s", "100ms").Should(ContainSubstring("slaves=2")) + Eventually(func() string { + return sentinel2.Info(ctx).Val() + }, "15s", "100ms").Should(ContainSubstring("slaves=2")) + Eventually(func() string { + return sentinel3.Info(ctx).Val() + }, "15s", "100ms").Should(ContainSubstring("slaves=2")) }) AfterEach(func() { @@ -52,25 +66,6 @@ var _ = Describe("Sentinel", func() { // Create subscription. ch := client.Subscribe(ctx, "foo").Channel() - // Wait until replicated. - Eventually(func() string { - return sentinelSlave1.Get(ctx, "foo").Val() - }, "15s", "100ms").Should(Equal("master")) - Eventually(func() string { - return sentinelSlave2.Get(ctx, "foo").Val() - }, "15s", "100ms").Should(Equal("master")) - - // Wait until slaves are picked up by sentinel. - Eventually(func() string { - return sentinel1.Info(ctx).Val() - }, "15s", "100ms").Should(ContainSubstring("slaves=2")) - Eventually(func() string { - return sentinel2.Info(ctx).Val() - }, "15s", "100ms").Should(ContainSubstring("slaves=2")) - Eventually(func() string { - return sentinel3.Info(ctx).Val() - }, "15s", "100ms").Should(ContainSubstring("slaves=2")) - // Kill master. err = master.Shutdown(ctx).Err() Expect(err).NotTo(HaveOccurred()) @@ -79,9 +74,9 @@ var _ = Describe("Sentinel", func() { }, "15s", "100ms").Should(HaveOccurred()) // Check that client picked up new master. - Eventually(func() error { - return client.Get(ctx, "foo").Err() - }, "15s", "100ms").ShouldNot(HaveOccurred()) + Eventually(func() string { + return client.Get(ctx, "foo").Val() + }, "15s", "100ms").Should(Equal("master")) // Check if subscription is renewed. var msg *redis.Message @@ -122,16 +117,29 @@ var _ = Describe("NewFailoverClusterClient", func() { Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) sentinel := redis.NewSentinelClient(&redis.Options{ - Addr: ":" + sentinelPort1, + Addr: ":" + sentinelPort1, + MaxRetries: -1, }) addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result() Expect(err).NotTo(HaveOccurred()) master = redis.NewClient(&redis.Options{ - Addr: net.JoinHostPort(addr[0], addr[1]), + Addr: net.JoinHostPort(addr[0], addr[1]), + MaxRetries: -1, }) masterPort = addr[1] + + // Wait until slaves are picked up by sentinel. + Eventually(func() string { + return sentinel1.Info(ctx).Val() + }, "15s", "100ms").Should(ContainSubstring("slaves=2")) + Eventually(func() string { + return sentinel2.Info(ctx).Val() + }, "15s", "100ms").Should(ContainSubstring("slaves=2")) + Eventually(func() string { + return sentinel3.Info(ctx).Val() + }, "15s", "100ms").Should(ContainSubstring("slaves=2")) }) AfterEach(func() { @@ -152,25 +160,6 @@ var _ = Describe("NewFailoverClusterClient", func() { // Create subscription. ch := client.Subscribe(ctx, "foo").Channel() - // Wait until replicated. - Eventually(func() string { - return sentinelSlave1.Get(ctx, "foo").Val() - }, "15s", "100ms").Should(Equal("master")) - Eventually(func() string { - return sentinelSlave2.Get(ctx, "foo").Val() - }, "15s", "100ms").Should(Equal("master")) - - // Wait until slaves are picked up by sentinel. - Eventually(func() string { - return sentinel1.Info(ctx).Val() - }, "15s", "100ms").Should(ContainSubstring("slaves=2")) - Eventually(func() string { - return sentinel2.Info(ctx).Val() - }, "15s", "100ms").Should(ContainSubstring("slaves=2")) - Eventually(func() string { - return sentinel3.Info(ctx).Val() - }, "15s", "100ms").Should(ContainSubstring("slaves=2")) - // Kill master. err = master.Shutdown(ctx).Err() Expect(err).NotTo(HaveOccurred()) @@ -179,9 +168,9 @@ var _ = Describe("NewFailoverClusterClient", func() { }, "15s", "100ms").Should(HaveOccurred()) // Check that client picked up new master. - Eventually(func() error { - return client.Get(ctx, "foo").Err() - }, "15s", "100ms").ShouldNot(HaveOccurred()) + Eventually(func() string { + return client.Get(ctx, "foo").Val() + }, "15s", "100ms").Should(Equal("master")) // Check if subscription is renewed. var msg *redis.Message