diff --git a/cluster.go b/cluster.go index 807e8884..dac0d889 100644 --- a/cluster.go +++ b/cluster.go @@ -241,11 +241,11 @@ func (n *clusterNode) SetGeneration(gen uint32) { type clusterNodes struct { opt *ClusterOptions - mu sync.RWMutex - allAddrs []string - allNodes map[string]*clusterNode - clusterAddrs []string - closed bool + mu sync.RWMutex + addrs []string + nodes map[string]*clusterNode + activeAddrs []string + closed bool _generation uint32 // atomic } @@ -254,8 +254,8 @@ func newClusterNodes(opt *ClusterOptions) *clusterNodes { return &clusterNodes{ opt: opt, - allAddrs: opt.Addrs, - allNodes: make(map[string]*clusterNode), + addrs: opt.Addrs, + nodes: make(map[string]*clusterNode), } } @@ -269,14 +269,14 @@ func (c *clusterNodes) Close() error { c.closed = true var firstErr error - for _, node := range c.allNodes { + for _, node := range c.nodes { if err := node.Client.Close(); err != nil && firstErr == nil { firstErr = err } } - c.allNodes = nil - c.clusterAddrs = nil + c.nodes = nil + c.activeAddrs = nil return firstErr } @@ -286,10 +286,10 @@ func (c *clusterNodes) Addrs() ([]string, error) { c.mu.RLock() closed := c.closed if !closed { - if len(c.clusterAddrs) > 0 { - addrs = c.clusterAddrs + if len(c.activeAddrs) > 0 { + addrs = c.activeAddrs } else { - addrs = c.allAddrs + addrs = c.addrs } } c.mu.RUnlock() @@ -311,16 +311,20 @@ func (c *clusterNodes) NextGeneration() uint32 { func (c *clusterNodes) GC(generation uint32) { //nolint:prealloc var collected []*clusterNode + c.mu.Lock() - for addr, node := range c.allNodes { + + c.activeAddrs = c.activeAddrs[:0] + for addr, node := range c.nodes { if node.Generation() >= generation { + c.activeAddrs = append(c.activeAddrs, addr) continue } - c.clusterAddrs = remove(c.clusterAddrs, addr) - delete(c.allNodes, addr) + delete(c.nodes, addr) collected = append(collected, node) } + c.mu.Unlock() for _, node := range collected { @@ -344,16 +348,15 @@ func (c *clusterNodes) Get(addr string) (*clusterNode, error) { return nil, pool.ErrClosed } - node, ok := c.allNodes[addr] + node, ok := c.nodes[addr] if ok { return node, err } node = newClusterNode(c.opt, addr) - c.allAddrs = appendIfNotExists(c.allAddrs, addr) - c.clusterAddrs = append(c.clusterAddrs, addr) - c.allNodes[addr] = node + c.addrs = appendIfNotExists(c.addrs, addr) + c.nodes[addr] = node return node, err } @@ -365,7 +368,7 @@ func (c *clusterNodes) get(addr string) (*clusterNode, error) { if c.closed { err = pool.ErrClosed } else { - node = c.allNodes[addr] + node = c.nodes[addr] } c.mu.RUnlock() return node, err @@ -379,8 +382,8 @@ func (c *clusterNodes) All() ([]*clusterNode, error) { return nil, pool.ErrClosed } - cp := make([]*clusterNode, 0, len(c.allNodes)) - for _, node := range c.allNodes { + cp := make([]*clusterNode, 0, len(c.nodes)) + for _, node := range c.nodes { cp = append(cp, node) } return cp, nil @@ -993,6 +996,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) { } var firstErr error + for _, idx := range rand.Perm(len(addrs)) { addr := addrs[idx] @@ -1017,12 +1021,12 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) { /* * No node is connectable. It's possible that all nodes' IP has changed. - * Clear clusterAddrs to let client be able to re-connect using the initial - * setting of the addresses (e.g. [redsi-cluster-0:6379, redis-cluster-1:6379]), + * Clear activeAddrs to let client be able to re-connect using the initial + * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]), * which might have chance to resolve domain name and get updated IP address. */ c.nodes.mu.Lock() - c.nodes.clusterAddrs = nil + c.nodes.activeAddrs = nil c.nodes.mu.Unlock() return nil, firstErr