forked from mirror/redis
Rename clusterAddrs to activeAddrs
This commit is contained in:
parent
f108b8bce3
commit
d66af70ae8
56
cluster.go
56
cluster.go
|
@ -241,11 +241,11 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
type clusterNodes struct {
|
type clusterNodes struct {
|
||||||
opt *ClusterOptions
|
opt *ClusterOptions
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
allAddrs []string
|
addrs []string
|
||||||
allNodes map[string]*clusterNode
|
nodes map[string]*clusterNode
|
||||||
clusterAddrs []string
|
activeAddrs []string
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
_generation uint32 // atomic
|
_generation uint32 // atomic
|
||||||
}
|
}
|
||||||
|
@ -254,8 +254,8 @@ func newClusterNodes(opt *ClusterOptions) *clusterNodes {
|
||||||
return &clusterNodes{
|
return &clusterNodes{
|
||||||
opt: opt,
|
opt: opt,
|
||||||
|
|
||||||
allAddrs: opt.Addrs,
|
addrs: opt.Addrs,
|
||||||
allNodes: make(map[string]*clusterNode),
|
nodes: make(map[string]*clusterNode),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,14 +269,14 @@ func (c *clusterNodes) Close() error {
|
||||||
c.closed = true
|
c.closed = true
|
||||||
|
|
||||||
var firstErr error
|
var firstErr error
|
||||||
for _, node := range c.allNodes {
|
for _, node := range c.nodes {
|
||||||
if err := node.Client.Close(); err != nil && firstErr == nil {
|
if err := node.Client.Close(); err != nil && firstErr == nil {
|
||||||
firstErr = err
|
firstErr = err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.allNodes = nil
|
c.nodes = nil
|
||||||
c.clusterAddrs = nil
|
c.activeAddrs = nil
|
||||||
|
|
||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
@ -286,10 +286,10 @@ func (c *clusterNodes) Addrs() ([]string, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
closed := c.closed
|
closed := c.closed
|
||||||
if !closed {
|
if !closed {
|
||||||
if len(c.clusterAddrs) > 0 {
|
if len(c.activeAddrs) > 0 {
|
||||||
addrs = c.clusterAddrs
|
addrs = c.activeAddrs
|
||||||
} else {
|
} else {
|
||||||
addrs = c.allAddrs
|
addrs = c.addrs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
@ -311,16 +311,20 @@ func (c *clusterNodes) NextGeneration() uint32 {
|
||||||
func (c *clusterNodes) GC(generation uint32) {
|
func (c *clusterNodes) GC(generation uint32) {
|
||||||
//nolint:prealloc
|
//nolint:prealloc
|
||||||
var collected []*clusterNode
|
var collected []*clusterNode
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
for addr, node := range c.allNodes {
|
|
||||||
|
c.activeAddrs = c.activeAddrs[:0]
|
||||||
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
c.clusterAddrs = remove(c.clusterAddrs, addr)
|
delete(c.nodes, addr)
|
||||||
delete(c.allNodes, addr)
|
|
||||||
collected = append(collected, node)
|
collected = append(collected, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
for _, node := range collected {
|
for _, node := range collected {
|
||||||
|
@ -344,16 +348,15 @@ func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
|
||||||
return nil, pool.ErrClosed
|
return nil, pool.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
node, ok := c.allNodes[addr]
|
node, ok := c.nodes[addr]
|
||||||
if ok {
|
if ok {
|
||||||
return node, err
|
return node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
node = newClusterNode(c.opt, addr)
|
node = newClusterNode(c.opt, addr)
|
||||||
|
|
||||||
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
|
c.addrs = appendIfNotExists(c.addrs, addr)
|
||||||
c.clusterAddrs = append(c.clusterAddrs, addr)
|
c.nodes[addr] = node
|
||||||
c.allNodes[addr] = node
|
|
||||||
|
|
||||||
return node, err
|
return node, err
|
||||||
}
|
}
|
||||||
|
@ -365,7 +368,7 @@ func (c *clusterNodes) get(addr string) (*clusterNode, error) {
|
||||||
if c.closed {
|
if c.closed {
|
||||||
err = pool.ErrClosed
|
err = pool.ErrClosed
|
||||||
} else {
|
} else {
|
||||||
node = c.allNodes[addr]
|
node = c.nodes[addr]
|
||||||
}
|
}
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
return node, err
|
return node, err
|
||||||
|
@ -379,8 +382,8 @@ func (c *clusterNodes) All() ([]*clusterNode, error) {
|
||||||
return nil, pool.ErrClosed
|
return nil, pool.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
cp := make([]*clusterNode, 0, len(c.allNodes))
|
cp := make([]*clusterNode, 0, len(c.nodes))
|
||||||
for _, node := range c.allNodes {
|
for _, node := range c.nodes {
|
||||||
cp = append(cp, node)
|
cp = append(cp, node)
|
||||||
}
|
}
|
||||||
return cp, nil
|
return cp, nil
|
||||||
|
@ -993,6 +996,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var firstErr error
|
var firstErr error
|
||||||
|
|
||||||
for _, idx := range rand.Perm(len(addrs)) {
|
for _, idx := range rand.Perm(len(addrs)) {
|
||||||
addr := addrs[idx]
|
addr := addrs[idx]
|
||||||
|
|
||||||
|
@ -1017,12 +1021,12 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* No node is connectable. It's possible that all nodes' IP has changed.
|
* No node is connectable. It's possible that all nodes' IP has changed.
|
||||||
* Clear clusterAddrs to let client be able to re-connect using the initial
|
* Clear activeAddrs to let client be able to re-connect using the initial
|
||||||
* setting of the addresses (e.g. [redsi-cluster-0:6379, redis-cluster-1:6379]),
|
* setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
|
||||||
* which might have chance to resolve domain name and get updated IP address.
|
* which might have chance to resolve domain name and get updated IP address.
|
||||||
*/
|
*/
|
||||||
c.nodes.mu.Lock()
|
c.nodes.mu.Lock()
|
||||||
c.nodes.clusterAddrs = nil
|
c.nodes.activeAddrs = nil
|
||||||
c.nodes.mu.Unlock()
|
c.nodes.mu.Unlock()
|
||||||
|
|
||||||
return nil, firstErr
|
return nil, firstErr
|
||||||
|
|
Loading…
Reference in New Issue