forked from mirror/redis
Make FailoverClient(sentinel mode) more robust (#1655)
* add failover option AllowDisconnectedSlaves * checkout unrelated changes * Make NewFailoverClient (sentinel mode) robust. * fix lint issue * Fix bug * Fix lint issue * add comment * checkout unrelated changes * Refine code * checkout unrelated changes
This commit is contained in:
parent
f594401261
commit
8b19c31049
|
@ -43,3 +43,8 @@ func (s *source) Seed(seed int64) {
|
||||||
s.src.Seed(seed)
|
s.src.Seed(seed)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shuffle pseudo-randomizes the order of elements.
|
||||||
|
// n is the number of elements.
|
||||||
|
// swap swaps the elements with indexes i and j.
|
||||||
|
func Shuffle(n int, swap func(i, j int)) { pseudo.Shuffle(n, swap) }
|
||||||
|
|
57
sentinel.go
57
sentinel.go
|
@ -36,6 +36,12 @@ type FailoverOptions struct {
|
||||||
// Route all commands to slave read-only nodes.
|
// Route all commands to slave read-only nodes.
|
||||||
SlaveOnly bool
|
SlaveOnly bool
|
||||||
|
|
||||||
|
// Use slaves disconnected with master when cannot get connected slaves
|
||||||
|
// Now, this option only works in RandomSlaveAddr function.
|
||||||
|
UseDisconnectedSlaves bool
|
||||||
|
|
||||||
|
// Client queries sentinels in a random order
|
||||||
|
QuerySentinelRandomly bool
|
||||||
// Following options are copied from Options struct.
|
// Following options are copied from Options struct.
|
||||||
|
|
||||||
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
|
@ -433,10 +439,22 @@ func (c *sentinelFailover) closeSentinel() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
|
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
|
||||||
addresses, err := c.slaveAddrs(ctx)
|
if c.opt == nil {
|
||||||
|
return "", errors.New("opt is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
addresses, err := c.slaveAddrs(ctx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(addresses) == 0 && c.opt.UseDisconnectedSlaves {
|
||||||
|
addresses, err = c.slaveAddrs(ctx, true)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(addresses) == 0 {
|
if len(addresses) == 0 {
|
||||||
return c.MasterAddr(ctx)
|
return c.MasterAddr(ctx)
|
||||||
}
|
}
|
||||||
|
@ -466,6 +484,11 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
||||||
_ = c.closeSentinel()
|
_ = c.closeSentinel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.opt.QuerySentinelRandomly {
|
||||||
|
rand.Shuffle(len(c.sentinelAddrs), func(i, j int) {
|
||||||
|
c.sentinelAddrs[i], c.sentinelAddrs[j] = c.sentinelAddrs[j], c.sentinelAddrs[i]
|
||||||
|
})
|
||||||
|
}
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
||||||
|
|
||||||
|
@ -488,7 +511,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
||||||
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
|
func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
sentinel := c.sentinel
|
sentinel := c.sentinel
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
@ -510,6 +533,13 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
|
||||||
}
|
}
|
||||||
_ = c.closeSentinel()
|
_ = c.closeSentinel()
|
||||||
}
|
}
|
||||||
|
if c.opt.QuerySentinelRandomly {
|
||||||
|
rand.Shuffle(len(c.sentinelAddrs), func(i, j int) {
|
||||||
|
c.sentinelAddrs[i], c.sentinelAddrs[j] = c.sentinelAddrs[j], c.sentinelAddrs[i]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var sentinelReachable bool
|
||||||
|
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
||||||
|
@ -521,15 +551,21 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
|
||||||
_ = sentinel.Close()
|
_ = sentinel.Close()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
sentinelReachable = true
|
||||||
|
addrs := parseSlaveAddrs(slaves, useDisconnected)
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Push working sentinel to the top.
|
// Push working sentinel to the top.
|
||||||
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
||||||
c.setSentinel(ctx, sentinel)
|
c.setSentinel(ctx, sentinel)
|
||||||
|
|
||||||
addrs := parseSlaveAddrs(slaves)
|
|
||||||
return addrs, nil
|
return addrs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sentinelReachable {
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
|
return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -550,12 +586,11 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel
|
||||||
c.opt.MasterName, err)
|
c.opt.MasterName, err)
|
||||||
return []string{}
|
return []string{}
|
||||||
}
|
}
|
||||||
return parseSlaveAddrs(addrs)
|
return parseSlaveAddrs(addrs, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseSlaveAddrs(addrs []interface{}) []string {
|
func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
|
||||||
nodes := make([]string, 0, len(addrs))
|
nodes := make([]string, 0, len(addrs))
|
||||||
|
|
||||||
for _, node := range addrs {
|
for _, node := range addrs {
|
||||||
ip := ""
|
ip := ""
|
||||||
port := ""
|
port := ""
|
||||||
|
@ -577,8 +612,12 @@ func parseSlaveAddrs(addrs []interface{}) []string {
|
||||||
|
|
||||||
for _, flag := range flags {
|
for _, flag := range flags {
|
||||||
switch flag {
|
switch flag {
|
||||||
case "s_down", "o_down", "disconnected":
|
case "s_down", "o_down":
|
||||||
isDown = true
|
isDown = true
|
||||||
|
case "disconnected":
|
||||||
|
if !keepDisconnected {
|
||||||
|
isDown = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,7 +744,7 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
||||||
Addr: masterAddr,
|
Addr: masterAddr,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
slaveAddrs, err := failover.slaveAddrs(ctx)
|
slaveAddrs, err := failover.slaveAddrs(ctx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue