Compare commits

...

1 Commits

Author SHA1 Message Date
Vladimir Mihailenco 425f2fc69b fix: use slot node id to detect node re-configuration 2021-09-29 14:43:24 +03:00
1 changed files with 41 additions and 19 deletions

View File

@ -170,6 +170,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
//------------------------------------------------------------------------------
type clusterNode struct {
id string
Client *Client
latency uint32 // atomic
@ -177,10 +178,11 @@ type clusterNode struct {
failing uint32 // atomic
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
func newClusterNode(clOpt *ClusterOptions, id, addr string) *clusterNode {
opt := clOpt.clientOptions()
opt.Addr = addr
node := clusterNode{
id: id,
Client: clOpt.NewClient(opt),
}
@ -352,33 +354,51 @@ func (c *clusterNodes) GC(generation uint32) {
}
}
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return c.GetOrCreateWithID(addr, "")
}
func (c *clusterNodes) GetOrCreateWithID(addr, id string) (*clusterNode, error) {
node, err := c.get(addr)
if err != nil {
return nil, err
}
if node != nil {
if node != nil && (id == "" || node.id == id) {
return node, nil
}
c.mu.Lock()
defer c.mu.Unlock()
node, oldNode, err := c.getOrCreate(addr, id)
c.mu.Unlock()
if c.closed {
return nil, pool.ErrClosed
if err != nil {
return nil, err
}
if oldNode != nil {
_ = oldNode.Client.Close()
}
node, ok := c.nodes[addr]
if ok {
return node, nil
}
node = newClusterNode(c.opt, addr)
func (c *clusterNodes) getOrCreate(addr, id string) (node, oldNode *clusterNode, _ error) {
if c.closed {
return nil, nil, pool.ErrClosed
}
oldNode, ok := c.nodes[addr]
if ok {
// The id is changed when node is re-configured, for example, IP addr is changed.
if id == "" || oldNode.id == id {
return oldNode, nil, nil
}
} else {
c.addrs = appendIfNotExists(c.addrs, addr)
}
node = newClusterNode(c.opt, id, addr)
c.nodes[addr] = node
return node, nil
return node, oldNode, nil
}
func (c *clusterNodes) get(addr string) (*clusterNode, error) {
@ -416,7 +436,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
}
n := rand.Intn(len(addrs))
return c.Get(addrs[n])
return c.GetOrCreate(addrs[n])
}
//------------------------------------------------------------------------------
@ -474,7 +494,7 @@ func newClusterState(
addr = replaceLoopbackHost(addr, originHost)
}
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreateWithID(addr, slotNode.ID)
if err != nil {
return nil, err
}
@ -824,8 +844,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {
c.state.LazyReload()
var err error
node, err = c.nodes.Get(addr)
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@ -1022,7 +1044,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
for _, idx := range rand.Perm(len(addrs)) {
addr := addrs[idx]
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
@ -1236,7 +1258,7 @@ func (c *ClusterClient) checkMovedErr(
return false
}
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return false
}
@ -1422,7 +1444,7 @@ func (c *ClusterClient) cmdsMoved(
addr string,
failedCmds *cmdsMap,
) error {
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@ -1477,7 +1499,7 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s
moved, ask, addr := isMovedError(err)
if moved || ask {
node, err = c.nodes.Get(addr)
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@ -1589,7 +1611,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
for _, idx := range perm {
addr := addrs[idx]
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err