From 8b19c310495637fb61e2b237abdedca922aced9a Mon Sep 17 00:00:00 2001 From: "renzheng.wang" Date: Wed, 10 Feb 2021 23:20:08 +0800 Subject: [PATCH] Make FailoverClient(sentinel mode) more robust (#1655) * add failover option AllowDisconnectedSlaves * checkout unrelated changes * Make NewFailoverClient (sentinel mode) robust. * fix lint issue * Fix bug * Fix lint issue * add comment * checkout unrelated changes * Refine code * checkout unrelated changes --- internal/rand/rand.go | 5 ++++ sentinel.go | 57 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/internal/rand/rand.go b/internal/rand/rand.go index 40676f3..2edccba 100644 --- a/internal/rand/rand.go +++ b/internal/rand/rand.go @@ -43,3 +43,8 @@ func (s *source) Seed(seed int64) { s.src.Seed(seed) s.mu.Unlock() } + +// Shuffle pseudo-randomizes the order of elements. +// n is the number of elements. +// swap swaps the elements with indexes i and j. +func Shuffle(n int, swap func(i, j int)) { pseudo.Shuffle(n, swap) } diff --git a/sentinel.go b/sentinel.go index d785168..913d014 100644 --- a/sentinel.go +++ b/sentinel.go @@ -36,6 +36,12 @@ type FailoverOptions struct { // Route all commands to slave read-only nodes. SlaveOnly bool + // Use slaves disconnected with master when cannot get connected slaves + // Now, this option only works in RandomSlaveAddr function. + UseDisconnectedSlaves bool + + // Client queries sentinels in a random order + QuerySentinelRandomly bool // Following options are copied from Options struct. Dialer func(ctx context.Context, network, addr string) (net.Conn, error) @@ -433,10 +439,22 @@ func (c *sentinelFailover) closeSentinel() error { } func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) { - addresses, err := c.slaveAddrs(ctx) + if c.opt == nil { + return "", errors.New("opt is nil") + } + + addresses, err := c.slaveAddrs(ctx, false) if err != nil { return "", err } + + if len(addresses) == 0 && c.opt.UseDisconnectedSlaves { + addresses, err = c.slaveAddrs(ctx, true) + if err != nil { + return "", err + } + } + if len(addresses) == 0 { return c.MasterAddr(ctx) } @@ -466,6 +484,11 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { _ = c.closeSentinel() } + if c.opt.QuerySentinelRandomly { + rand.Shuffle(len(c.sentinelAddrs), func(i, j int) { + c.sentinelAddrs[i], c.sentinelAddrs[j] = c.sentinelAddrs[j], c.sentinelAddrs[i] + }) + } for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr)) @@ -488,7 +511,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { return "", errors.New("redis: all sentinels specified in configuration are unreachable") } -func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) { +func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) { c.mu.RLock() sentinel := c.sentinel c.mu.RUnlock() @@ -510,6 +533,13 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) { } _ = c.closeSentinel() } + if c.opt.QuerySentinelRandomly { + rand.Shuffle(len(c.sentinelAddrs), func(i, j int) { + c.sentinelAddrs[i], c.sentinelAddrs[j] = c.sentinelAddrs[j], c.sentinelAddrs[i] + }) + } + + var sentinelReachable bool for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr)) @@ -521,15 +551,21 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) { _ = sentinel.Close() continue } - + sentinelReachable = true + addrs := parseSlaveAddrs(slaves, useDisconnected) + if len(addrs) == 0 { + continue + } // Push working sentinel to the top. c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] c.setSentinel(ctx, sentinel) - addrs := parseSlaveAddrs(slaves) return addrs, nil } + if sentinelReachable { + return []string{}, nil + } return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable") } @@ -550,12 +586,11 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel c.opt.MasterName, err) return []string{} } - return parseSlaveAddrs(addrs) + return parseSlaveAddrs(addrs, false) } -func parseSlaveAddrs(addrs []interface{}) []string { +func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string { nodes := make([]string, 0, len(addrs)) - for _, node := range addrs { ip := "" port := "" @@ -577,8 +612,12 @@ func parseSlaveAddrs(addrs []interface{}) []string { for _, flag := range flags { switch flag { - case "s_down", "o_down", "disconnected": + case "s_down", "o_down": isDown = true + case "disconnected": + if !keepDisconnected { + isDown = true + } } } @@ -705,7 +744,7 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { Addr: masterAddr, }} - slaveAddrs, err := failover.slaveAddrs(ctx) + slaveAddrs, err := failover.slaveAddrs(ctx, false) if err != nil { return nil, err }