forked from mirror/redis
Tweak number of retries
This commit is contained in:
parent
513fcfb224
commit
b657760cca
|
@ -26,6 +26,9 @@ ring := redis.NewRing(&redis.RingOptions{
|
||||||
})
|
})
|
||||||
```
|
```
|
||||||
|
|
||||||
|
- `ClusterOptions.MaxRedirects` default value is changed from 8 to 3.
|
||||||
|
- `Options.MaxRetries` default value is changed from 0 to 3.
|
||||||
|
|
||||||
- `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`.
|
- `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`.
|
||||||
|
|
||||||
## v7.3
|
## v7.3
|
||||||
|
|
15
cluster.go
15
cluster.go
|
@ -32,7 +32,7 @@ type ClusterOptions struct {
|
||||||
|
|
||||||
// The maximum number of retries before giving up. Command is retried
|
// The maximum number of retries before giving up. Command is retried
|
||||||
// on network errors and MOVED/ASK redirects.
|
// on network errors and MOVED/ASK redirects.
|
||||||
// Default is 8 retries.
|
// Default is 3 retries.
|
||||||
MaxRedirects int
|
MaxRedirects int
|
||||||
|
|
||||||
// Enables read-only commands on slave nodes.
|
// Enables read-only commands on slave nodes.
|
||||||
|
@ -83,7 +83,7 @@ func (opt *ClusterOptions) init() {
|
||||||
if opt.MaxRedirects == -1 {
|
if opt.MaxRedirects == -1 {
|
||||||
opt.MaxRedirects = 0
|
opt.MaxRedirects = 0
|
||||||
} else if opt.MaxRedirects == 0 {
|
} else if opt.MaxRedirects == 0 {
|
||||||
opt.MaxRedirects = 8
|
opt.MaxRedirects = 3
|
||||||
}
|
}
|
||||||
|
|
||||||
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
|
if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
|
||||||
|
@ -107,6 +107,9 @@ func (opt *ClusterOptions) init() {
|
||||||
opt.WriteTimeout = opt.ReadTimeout
|
opt.WriteTimeout = opt.ReadTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opt.MaxRetries == 0 {
|
||||||
|
opt.MaxRetries = -1
|
||||||
|
}
|
||||||
switch opt.MinRetryBackoff {
|
switch opt.MinRetryBackoff {
|
||||||
case -1:
|
case -1:
|
||||||
opt.MinRetryBackoff = 0
|
opt.MinRetryBackoff = 0
|
||||||
|
@ -132,12 +135,12 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
|
Username: opt.Username,
|
||||||
|
Password: opt.Password,
|
||||||
|
|
||||||
MaxRetries: opt.MaxRetries,
|
MaxRetries: opt.MaxRetries,
|
||||||
MinRetryBackoff: opt.MinRetryBackoff,
|
MinRetryBackoff: opt.MinRetryBackoff,
|
||||||
MaxRetryBackoff: opt.MaxRetryBackoff,
|
MaxRetryBackoff: opt.MaxRetryBackoff,
|
||||||
Username: opt.Username,
|
|
||||||
Password: opt.Password,
|
|
||||||
readOnly: opt.ReadOnly,
|
|
||||||
|
|
||||||
DialTimeout: opt.DialTimeout,
|
DialTimeout: opt.DialTimeout,
|
||||||
ReadTimeout: opt.ReadTimeout,
|
ReadTimeout: opt.ReadTimeout,
|
||||||
|
@ -150,6 +153,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
IdleCheckFrequency: disableIdleCheck,
|
IdleCheckFrequency: disableIdleCheck,
|
||||||
|
|
||||||
|
readOnly: opt.ReadOnly,
|
||||||
|
|
||||||
TLSConfig: opt.TLSConfig,
|
TLSConfig: opt.TLSConfig,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
12
main_test.go
12
main_test.go
|
@ -123,9 +123,13 @@ func redisOptions() *redis.Options {
|
||||||
return &redis.Options{
|
return &redis.Options{
|
||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
DB: 15,
|
DB: 15,
|
||||||
|
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
WriteTimeout: 30 * time.Second,
|
WriteTimeout: 30 * time.Second,
|
||||||
|
|
||||||
|
MaxRetries: -1,
|
||||||
|
|
||||||
PoolSize: 10,
|
PoolSize: 10,
|
||||||
PoolTimeout: 30 * time.Second,
|
PoolTimeout: 30 * time.Second,
|
||||||
IdleTimeout: time.Minute,
|
IdleTimeout: time.Minute,
|
||||||
|
@ -138,6 +142,9 @@ func redisClusterOptions() *redis.ClusterOptions {
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
WriteTimeout: 30 * time.Second,
|
WriteTimeout: 30 * time.Second,
|
||||||
|
|
||||||
|
MaxRedirects: 8,
|
||||||
|
|
||||||
PoolSize: 10,
|
PoolSize: 10,
|
||||||
PoolTimeout: 30 * time.Second,
|
PoolTimeout: 30 * time.Second,
|
||||||
IdleTimeout: time.Minute,
|
IdleTimeout: time.Minute,
|
||||||
|
@ -151,9 +158,13 @@ func redisRingOptions() *redis.RingOptions {
|
||||||
"ringShardOne": ":" + ringShard1Port,
|
"ringShardOne": ":" + ringShard1Port,
|
||||||
"ringShardTwo": ":" + ringShard2Port,
|
"ringShardTwo": ":" + ringShard2Port,
|
||||||
},
|
},
|
||||||
|
|
||||||
DialTimeout: 10 * time.Second,
|
DialTimeout: 10 * time.Second,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
WriteTimeout: 30 * time.Second,
|
WriteTimeout: 30 * time.Second,
|
||||||
|
|
||||||
|
MaxRetries: -1,
|
||||||
|
|
||||||
PoolSize: 10,
|
PoolSize: 10,
|
||||||
PoolTimeout: 30 * time.Second,
|
PoolTimeout: 30 * time.Second,
|
||||||
IdleTimeout: time.Minute,
|
IdleTimeout: time.Minute,
|
||||||
|
@ -234,6 +245,7 @@ func execCmd(name string, args ...string) (*os.Process, error) {
|
||||||
func connectTo(port string) (*redis.Client, error) {
|
func connectTo(port string) (*redis.Client, error) {
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: ":" + port,
|
Addr: ":" + port,
|
||||||
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
|
|
||||||
err := eventually(func() error {
|
err := eventually(func() error {
|
||||||
|
|
|
@ -57,7 +57,7 @@ type Options struct {
|
||||||
DB int
|
DB int
|
||||||
|
|
||||||
// Maximum number of retries before giving up.
|
// Maximum number of retries before giving up.
|
||||||
// Default is to not retry failed commands.
|
// Default is 3 retries.
|
||||||
MaxRetries int
|
MaxRetries int
|
||||||
// Minimum backoff between each retry.
|
// Minimum backoff between each retry.
|
||||||
// Default is 8 milliseconds; -1 disables backoff.
|
// Default is 8 milliseconds; -1 disables backoff.
|
||||||
|
@ -164,6 +164,8 @@ func (opt *Options) init() {
|
||||||
|
|
||||||
if opt.MaxRetries == -1 {
|
if opt.MaxRetries == -1 {
|
||||||
opt.MaxRetries = 0
|
opt.MaxRetries = 0
|
||||||
|
} else if opt.MaxRetries == 0 {
|
||||||
|
opt.MaxRetries = 3
|
||||||
}
|
}
|
||||||
switch opt.MinRetryBackoff {
|
switch opt.MinRetryBackoff {
|
||||||
case -1:
|
case -1:
|
||||||
|
|
|
@ -215,7 +215,7 @@ var _ = Describe("Client", func() {
|
||||||
It("should retry with backoff", func() {
|
It("should retry with backoff", func() {
|
||||||
clientNoRetry := redis.NewClient(&redis.Options{
|
clientNoRetry := redis.NewClient(&redis.Options{
|
||||||
Addr: ":1234",
|
Addr: ":1234",
|
||||||
MaxRetries: 0,
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
defer clientNoRetry.Close()
|
defer clientNoRetry.Close()
|
||||||
|
|
||||||
|
|
14
ring.go
14
ring.go
|
@ -2,6 +2,7 @@ package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
@ -84,6 +85,9 @@ type RingOptions struct {
|
||||||
PoolTimeout time.Duration
|
PoolTimeout time.Duration
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
IdleCheckFrequency time.Duration
|
IdleCheckFrequency time.Duration
|
||||||
|
|
||||||
|
TLSConfig *tls.Config
|
||||||
|
Limiter Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *RingOptions) init() {
|
func (opt *RingOptions) init() {
|
||||||
|
@ -101,6 +105,11 @@ func (opt *RingOptions) init() {
|
||||||
opt.NewConsistentHash = newRendezvous
|
opt.NewConsistentHash = newRendezvous
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opt.MaxRetries == -1 {
|
||||||
|
opt.MaxRetries = 0
|
||||||
|
} else if opt.MaxRetries == 0 {
|
||||||
|
opt.MaxRetries = 3
|
||||||
|
}
|
||||||
switch opt.MinRetryBackoff {
|
switch opt.MinRetryBackoff {
|
||||||
case -1:
|
case -1:
|
||||||
opt.MinRetryBackoff = 0
|
opt.MinRetryBackoff = 0
|
||||||
|
@ -124,6 +133,8 @@ func (opt *RingOptions) clientOptions() *Options {
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
DB: opt.DB,
|
DB: opt.DB,
|
||||||
|
|
||||||
|
MaxRetries: -1,
|
||||||
|
|
||||||
DialTimeout: opt.DialTimeout,
|
DialTimeout: opt.DialTimeout,
|
||||||
ReadTimeout: opt.ReadTimeout,
|
ReadTimeout: opt.ReadTimeout,
|
||||||
WriteTimeout: opt.WriteTimeout,
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
@ -134,6 +145,9 @@ func (opt *RingOptions) clientOptions() *Options {
|
||||||
PoolTimeout: opt.PoolTimeout,
|
PoolTimeout: opt.PoolTimeout,
|
||||||
IdleTimeout: opt.IdleTimeout,
|
IdleTimeout: opt.IdleTimeout,
|
||||||
IdleCheckFrequency: opt.IdleCheckFrequency,
|
IdleCheckFrequency: opt.IdleCheckFrequency,
|
||||||
|
|
||||||
|
TLSConfig: opt.TLSConfig,
|
||||||
|
Limiter: opt.Limiter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
148
sentinel.go
148
sentinel.go
|
@ -56,8 +56,8 @@ type FailoverOptions struct {
|
||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *FailoverOptions) options() *Options {
|
func (opt *FailoverOptions) clientOptions() *Options {
|
||||||
redisOpt := &Options{
|
return &Options{
|
||||||
Addr: "FailoverClient",
|
Addr: "FailoverClient",
|
||||||
|
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
|
@ -84,17 +84,17 @@ func (opt *FailoverOptions) options() *Options {
|
||||||
|
|
||||||
TLSConfig: opt.TLSConfig,
|
TLSConfig: opt.TLSConfig,
|
||||||
}
|
}
|
||||||
redisOpt.init()
|
|
||||||
return redisOpt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
|
||||||
clusterOpt := &ClusterOptions{
|
return &Options{
|
||||||
|
Addr: addr,
|
||||||
|
|
||||||
Dialer: opt.Dialer,
|
Dialer: opt.Dialer,
|
||||||
OnConnect: opt.OnConnect,
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
Username: opt.Username,
|
DB: 0,
|
||||||
Password: opt.Password,
|
Password: opt.SentinelPassword,
|
||||||
|
|
||||||
MaxRetries: opt.MaxRetries,
|
MaxRetries: opt.MaxRetries,
|
||||||
MinRetryBackoff: opt.MinRetryBackoff,
|
MinRetryBackoff: opt.MinRetryBackoff,
|
||||||
|
@ -113,24 +113,50 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||||
|
|
||||||
TLSConfig: opt.TLSConfig,
|
TLSConfig: opt.TLSConfig,
|
||||||
}
|
}
|
||||||
clusterOpt.init()
|
}
|
||||||
return clusterOpt
|
|
||||||
|
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||||
|
return &ClusterOptions{
|
||||||
|
Dialer: opt.Dialer,
|
||||||
|
OnConnect: opt.OnConnect,
|
||||||
|
|
||||||
|
Username: opt.Username,
|
||||||
|
Password: opt.Password,
|
||||||
|
|
||||||
|
MaxRedirects: opt.MaxRetries,
|
||||||
|
MinRetryBackoff: opt.MinRetryBackoff,
|
||||||
|
MaxRetryBackoff: opt.MaxRetryBackoff,
|
||||||
|
|
||||||
|
DialTimeout: opt.DialTimeout,
|
||||||
|
ReadTimeout: opt.ReadTimeout,
|
||||||
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
|
||||||
|
PoolSize: opt.PoolSize,
|
||||||
|
PoolTimeout: opt.PoolTimeout,
|
||||||
|
IdleTimeout: opt.IdleTimeout,
|
||||||
|
IdleCheckFrequency: opt.IdleCheckFrequency,
|
||||||
|
MinIdleConns: opt.MinIdleConns,
|
||||||
|
MaxConnAge: opt.MaxConnAge,
|
||||||
|
|
||||||
|
TLSConfig: opt.TLSConfig,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFailoverClient returns a Redis client that uses Redis Sentinel
|
// NewFailoverClient returns a Redis client that uses Redis Sentinel
|
||||||
// for automatic failover. It's safe for concurrent use by multiple
|
// for automatic failover. It's safe for concurrent use by multiple
|
||||||
// goroutines.
|
// goroutines.
|
||||||
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
||||||
failover := &sentinelFailover{
|
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
||||||
masterName: failoverOpt.MasterName,
|
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
||||||
sentinelAddrs: failoverOpt.SentinelAddrs,
|
|
||||||
sentinelPassword: failoverOpt.SentinelPassword,
|
|
||||||
|
|
||||||
opt: failoverOpt.options(),
|
failover := &sentinelFailover{
|
||||||
|
opt: failoverOpt,
|
||||||
|
sentinelAddrs: sentinelAddrs,
|
||||||
}
|
}
|
||||||
|
|
||||||
opt := failoverOpt.options()
|
opt := failoverOpt.clientOptions()
|
||||||
opt.Dialer = masterSlaveDialer(failover, failoverOpt.SlaveOnly)
|
opt.Dialer = masterSlaveDialer(failover)
|
||||||
|
opt.init()
|
||||||
|
|
||||||
connPool := newConnPool(opt)
|
connPool := newConnPool(opt)
|
||||||
failover.onFailover = func(ctx context.Context, addr string) {
|
failover.onFailover = func(ctx context.Context, addr string) {
|
||||||
|
@ -150,13 +176,13 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
func masterSlaveDialer(
|
func masterSlaveDialer(
|
||||||
failover *sentinelFailover, slaveOnly bool,
|
failover *sentinelFailover,
|
||||||
) func(ctx context.Context, network, addr string) (net.Conn, error) {
|
) func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
return func(ctx context.Context, network, _ string) (net.Conn, error) {
|
return func(ctx context.Context, network, _ string) (net.Conn, error) {
|
||||||
var addr string
|
var addr string
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if slaveOnly {
|
if failover.opt.SlaveOnly {
|
||||||
addr, err = failover.RandomSlaveAddr(ctx)
|
addr, err = failover.RandomSlaveAddr(ctx)
|
||||||
} else {
|
} else {
|
||||||
addr, err = failover.MasterAddr(ctx)
|
addr, err = failover.MasterAddr(ctx)
|
||||||
|
@ -349,14 +375,12 @@ func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type sentinelFailover struct {
|
type sentinelFailover struct {
|
||||||
sentinelAddrs []string
|
opt *FailoverOptions
|
||||||
sentinelPassword string
|
|
||||||
|
|
||||||
opt *Options
|
sentinelAddrs []string
|
||||||
onFailover func(ctx context.Context, addr string)
|
onFailover func(ctx context.Context, addr string)
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
masterName string
|
|
||||||
_masterAddr string
|
_masterAddr string
|
||||||
sentinel *SentinelClient
|
sentinel *SentinelClient
|
||||||
pubsub *PubSub
|
pubsub *PubSub
|
||||||
|
@ -419,31 +443,12 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(&Options{
|
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
||||||
Addr: sentinelAddr,
|
|
||||||
Dialer: c.opt.Dialer,
|
|
||||||
|
|
||||||
Username: c.opt.Username,
|
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
|
||||||
Password: c.opt.Password,
|
|
||||||
|
|
||||||
MaxRetries: c.opt.MaxRetries,
|
|
||||||
|
|
||||||
DialTimeout: c.opt.DialTimeout,
|
|
||||||
ReadTimeout: c.opt.ReadTimeout,
|
|
||||||
WriteTimeout: c.opt.WriteTimeout,
|
|
||||||
|
|
||||||
PoolSize: c.opt.PoolSize,
|
|
||||||
PoolTimeout: c.opt.PoolTimeout,
|
|
||||||
IdleTimeout: c.opt.IdleTimeout,
|
|
||||||
IdleCheckFrequency: c.opt.IdleCheckFrequency,
|
|
||||||
|
|
||||||
TLSConfig: c.opt.TLSConfig,
|
|
||||||
})
|
|
||||||
|
|
||||||
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
|
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
|
||||||
c.masterName, err)
|
c.opt.MasterName, err)
|
||||||
_ = sentinel.Close()
|
_ = sentinel.Close()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -483,31 +488,12 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(&Options{
|
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
||||||
Addr: sentinelAddr,
|
|
||||||
Dialer: c.opt.Dialer,
|
|
||||||
|
|
||||||
Username: c.opt.Username,
|
slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
|
||||||
Password: c.opt.Password,
|
|
||||||
|
|
||||||
MaxRetries: c.opt.MaxRetries,
|
|
||||||
|
|
||||||
DialTimeout: c.opt.DialTimeout,
|
|
||||||
ReadTimeout: c.opt.ReadTimeout,
|
|
||||||
WriteTimeout: c.opt.WriteTimeout,
|
|
||||||
|
|
||||||
PoolSize: c.opt.PoolSize,
|
|
||||||
PoolTimeout: c.opt.PoolTimeout,
|
|
||||||
IdleTimeout: c.opt.IdleTimeout,
|
|
||||||
IdleCheckFrequency: c.opt.IdleCheckFrequency,
|
|
||||||
|
|
||||||
TLSConfig: c.opt.TLSConfig,
|
|
||||||
})
|
|
||||||
|
|
||||||
slaves, err := sentinel.Slaves(ctx, c.masterName).Result()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
|
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
|
||||||
c.masterName, err)
|
c.opt.MasterName, err)
|
||||||
_ = sentinel.Close()
|
_ = sentinel.Close()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -524,20 +510,20 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
|
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
|
||||||
addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
|
addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
|
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
|
||||||
c.masterName, err)
|
c.opt.MasterName, err)
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
return net.JoinHostPort(addr[0], addr[1])
|
return net.JoinHostPort(addr[0], addr[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
|
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
|
||||||
addrs, err := sentinel.Slaves(ctx, c.masterName).Result()
|
addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
|
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
|
||||||
c.masterName, err)
|
c.opt.MasterName, err)
|
||||||
return []string{}
|
return []string{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -597,7 +583,7 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
|
||||||
c._masterAddr = addr
|
c._masterAddr = addr
|
||||||
|
|
||||||
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
|
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
|
||||||
c.masterName, addr)
|
c.opt.MasterName, addr)
|
||||||
go c.onFailover(ctx, addr)
|
go c.onFailover(ctx, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -613,9 +599,9 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
||||||
sentinels, err := c.sentinel.Sentinels(ctx, c.masterName).Result()
|
sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.masterName, err)
|
internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, sentinel := range sentinels {
|
for _, sentinel := range sentinels {
|
||||||
|
@ -626,7 +612,7 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
||||||
sentinelAddr := vals[i+1].(string)
|
sentinelAddr := vals[i+1].(string)
|
||||||
if !contains(c.sentinelAddrs, sentinelAddr) {
|
if !contains(c.sentinelAddrs, sentinelAddr) {
|
||||||
internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
|
internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
|
||||||
sentinelAddr, c.masterName)
|
sentinelAddr, c.opt.MasterName)
|
||||||
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
|
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -644,7 +630,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
|
||||||
|
|
||||||
if msg.Channel == "+switch-master" {
|
if msg.Channel == "+switch-master" {
|
||||||
parts := strings.Split(msg.Payload, " ")
|
parts := strings.Split(msg.Payload, " ")
|
||||||
if parts[0] != c.masterName {
|
if parts[0] != c.opt.MasterName {
|
||||||
internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
|
internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -666,15 +652,15 @@ func contains(slice []string, str string) bool {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
||||||
failover := &sentinelFailover{
|
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
||||||
masterName: failoverOpt.MasterName,
|
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
||||||
sentinelAddrs: failoverOpt.SentinelAddrs,
|
|
||||||
|
|
||||||
opt: failoverOpt.options(),
|
failover := &sentinelFailover{
|
||||||
|
opt: failoverOpt,
|
||||||
|
sentinelAddrs: sentinelAddrs,
|
||||||
}
|
}
|
||||||
|
|
||||||
opt := failoverOpt.clusterOptions()
|
opt := failoverOpt.clusterOptions()
|
||||||
|
|
||||||
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
|
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
|
||||||
masterAddr, err := failover.MasterAddr(ctx)
|
masterAddr, err := failover.MasterAddr(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -18,11 +18,13 @@ var _ = Describe("Sentinel", func() {
|
||||||
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
MasterName: sentinelName,
|
MasterName: sentinelName,
|
||||||
SentinelAddrs: sentinelAddrs,
|
SentinelAddrs: sentinelAddrs,
|
||||||
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sentinel := redis.NewSentinelClient(&redis.Options{
|
sentinel := redis.NewSentinelClient(&redis.Options{
|
||||||
Addr: ":" + sentinelPort1,
|
Addr: ":" + sentinelPort1,
|
||||||
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
|
|
||||||
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
|
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
|
||||||
|
@ -30,8 +32,20 @@ var _ = Describe("Sentinel", func() {
|
||||||
|
|
||||||
master = redis.NewClient(&redis.Options{
|
master = redis.NewClient(&redis.Options{
|
||||||
Addr: net.JoinHostPort(addr[0], addr[1]),
|
Addr: net.JoinHostPort(addr[0], addr[1]),
|
||||||
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
masterPort = addr[1]
|
masterPort = addr[1]
|
||||||
|
|
||||||
|
// Wait until slaves are picked up by sentinel.
|
||||||
|
Eventually(func() string {
|
||||||
|
return sentinel1.Info(ctx).Val()
|
||||||
|
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
||||||
|
Eventually(func() string {
|
||||||
|
return sentinel2.Info(ctx).Val()
|
||||||
|
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
||||||
|
Eventually(func() string {
|
||||||
|
return sentinel3.Info(ctx).Val()
|
||||||
|
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
|
@ -52,25 +66,6 @@ var _ = Describe("Sentinel", func() {
|
||||||
// Create subscription.
|
// Create subscription.
|
||||||
ch := client.Subscribe(ctx, "foo").Channel()
|
ch := client.Subscribe(ctx, "foo").Channel()
|
||||||
|
|
||||||
// Wait until replicated.
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinelSlave1.Get(ctx, "foo").Val()
|
|
||||||
}, "15s", "100ms").Should(Equal("master"))
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinelSlave2.Get(ctx, "foo").Val()
|
|
||||||
}, "15s", "100ms").Should(Equal("master"))
|
|
||||||
|
|
||||||
// Wait until slaves are picked up by sentinel.
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinel1.Info(ctx).Val()
|
|
||||||
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinel2.Info(ctx).Val()
|
|
||||||
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinel3.Info(ctx).Val()
|
|
||||||
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
|
||||||
|
|
||||||
// Kill master.
|
// Kill master.
|
||||||
err = master.Shutdown(ctx).Err()
|
err = master.Shutdown(ctx).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -79,9 +74,9 @@ var _ = Describe("Sentinel", func() {
|
||||||
}, "15s", "100ms").Should(HaveOccurred())
|
}, "15s", "100ms").Should(HaveOccurred())
|
||||||
|
|
||||||
// Check that client picked up new master.
|
// Check that client picked up new master.
|
||||||
Eventually(func() error {
|
Eventually(func() string {
|
||||||
return client.Get(ctx, "foo").Err()
|
return client.Get(ctx, "foo").Val()
|
||||||
}, "15s", "100ms").ShouldNot(HaveOccurred())
|
}, "15s", "100ms").Should(Equal("master"))
|
||||||
|
|
||||||
// Check if subscription is renewed.
|
// Check if subscription is renewed.
|
||||||
var msg *redis.Message
|
var msg *redis.Message
|
||||||
|
@ -123,6 +118,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
|
|
||||||
sentinel := redis.NewSentinelClient(&redis.Options{
|
sentinel := redis.NewSentinelClient(&redis.Options{
|
||||||
Addr: ":" + sentinelPort1,
|
Addr: ":" + sentinelPort1,
|
||||||
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
|
|
||||||
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
|
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
|
||||||
|
@ -130,8 +126,20 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
|
|
||||||
master = redis.NewClient(&redis.Options{
|
master = redis.NewClient(&redis.Options{
|
||||||
Addr: net.JoinHostPort(addr[0], addr[1]),
|
Addr: net.JoinHostPort(addr[0], addr[1]),
|
||||||
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
masterPort = addr[1]
|
masterPort = addr[1]
|
||||||
|
|
||||||
|
// Wait until slaves are picked up by sentinel.
|
||||||
|
Eventually(func() string {
|
||||||
|
return sentinel1.Info(ctx).Val()
|
||||||
|
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
||||||
|
Eventually(func() string {
|
||||||
|
return sentinel2.Info(ctx).Val()
|
||||||
|
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
||||||
|
Eventually(func() string {
|
||||||
|
return sentinel3.Info(ctx).Val()
|
||||||
|
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
|
@ -152,25 +160,6 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
// Create subscription.
|
// Create subscription.
|
||||||
ch := client.Subscribe(ctx, "foo").Channel()
|
ch := client.Subscribe(ctx, "foo").Channel()
|
||||||
|
|
||||||
// Wait until replicated.
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinelSlave1.Get(ctx, "foo").Val()
|
|
||||||
}, "15s", "100ms").Should(Equal("master"))
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinelSlave2.Get(ctx, "foo").Val()
|
|
||||||
}, "15s", "100ms").Should(Equal("master"))
|
|
||||||
|
|
||||||
// Wait until slaves are picked up by sentinel.
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinel1.Info(ctx).Val()
|
|
||||||
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinel2.Info(ctx).Val()
|
|
||||||
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
|
||||||
Eventually(func() string {
|
|
||||||
return sentinel3.Info(ctx).Val()
|
|
||||||
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
|
|
||||||
|
|
||||||
// Kill master.
|
// Kill master.
|
||||||
err = master.Shutdown(ctx).Err()
|
err = master.Shutdown(ctx).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -179,9 +168,9 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
}, "15s", "100ms").Should(HaveOccurred())
|
}, "15s", "100ms").Should(HaveOccurred())
|
||||||
|
|
||||||
// Check that client picked up new master.
|
// Check that client picked up new master.
|
||||||
Eventually(func() error {
|
Eventually(func() string {
|
||||||
return client.Get(ctx, "foo").Err()
|
return client.Get(ctx, "foo").Val()
|
||||||
}, "15s", "100ms").ShouldNot(HaveOccurred())
|
}, "15s", "100ms").Should(Equal("master"))
|
||||||
|
|
||||||
// Check if subscription is renewed.
|
// Check if subscription is renewed.
|
||||||
var msg *redis.Message
|
var msg *redis.Message
|
||||||
|
|
Loading…
Reference in New Issue