Compare commits

...

1 Commits

Author SHA1 Message Date
Vladimir Mihailenco 425f2fc69b fix: use slot node id to detect node re-configuration 2021-09-29 14:43:24 +03:00
1 changed files with 41 additions and 19 deletions

View File

@ -170,6 +170,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterNode struct { type clusterNode struct {
id string
Client *Client Client *Client
latency uint32 // atomic latency uint32 // atomic
@ -177,10 +178,11 @@ type clusterNode struct {
failing uint32 // atomic failing uint32 // atomic
} }
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { func newClusterNode(clOpt *ClusterOptions, id, addr string) *clusterNode {
opt := clOpt.clientOptions() opt := clOpt.clientOptions()
opt.Addr = addr opt.Addr = addr
node := clusterNode{ node := clusterNode{
id: id,
Client: clOpt.NewClient(opt), Client: clOpt.NewClient(opt),
} }
@ -352,33 +354,51 @@ func (c *clusterNodes) GC(generation uint32) {
} }
} }
func (c *clusterNodes) Get(addr string) (*clusterNode, error) { func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return c.GetOrCreateWithID(addr, "")
}
func (c *clusterNodes) GetOrCreateWithID(addr, id string) (*clusterNode, error) {
node, err := c.get(addr) node, err := c.get(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if node != nil { if node != nil && (id == "" || node.id == id) {
return node, nil return node, nil
} }
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() node, oldNode, err := c.getOrCreate(addr, id)
c.mu.Unlock()
if c.closed { if err != nil {
return nil, pool.ErrClosed return nil, err
}
if oldNode != nil {
_ = oldNode.Client.Close()
} }
node, ok := c.nodes[addr]
if ok {
return node, nil return node, nil
} }
node = newClusterNode(c.opt, addr) func (c *clusterNodes) getOrCreate(addr, id string) (node, oldNode *clusterNode, _ error) {
if c.closed {
return nil, nil, pool.ErrClosed
}
oldNode, ok := c.nodes[addr]
if ok {
// The id is changed when node is re-configured, for example, IP addr is changed.
if id == "" || oldNode.id == id {
return oldNode, nil, nil
}
} else {
c.addrs = appendIfNotExists(c.addrs, addr) c.addrs = appendIfNotExists(c.addrs, addr)
}
node = newClusterNode(c.opt, id, addr)
c.nodes[addr] = node c.nodes[addr] = node
return node, nil return node, oldNode, nil
} }
func (c *clusterNodes) get(addr string) (*clusterNode, error) { func (c *clusterNodes) get(addr string) (*clusterNode, error) {
@ -416,7 +436,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
} }
n := rand.Intn(len(addrs)) n := rand.Intn(len(addrs))
return c.Get(addrs[n]) return c.GetOrCreate(addrs[n])
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -474,7 +494,7 @@ func newClusterState(
addr = replaceLoopbackHost(addr, originHost) addr = replaceLoopbackHost(addr, originHost)
} }
node, err := c.nodes.Get(addr) node, err := c.nodes.GetOrCreateWithID(addr, slotNode.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -824,8 +844,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
var addr string var addr string
moved, ask, addr = isMovedError(lastErr) moved, ask, addr = isMovedError(lastErr)
if moved || ask { if moved || ask {
c.state.LazyReload()
var err error var err error
node, err = c.nodes.Get(addr) node, err = c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
return err return err
} }
@ -1022,7 +1044,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
for _, idx := range rand.Perm(len(addrs)) { for _, idx := range rand.Perm(len(addrs)) {
addr := addrs[idx] addr := addrs[idx]
node, err := c.nodes.Get(addr) node, err := c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
if firstErr == nil { if firstErr == nil {
firstErr = err firstErr = err
@ -1236,7 +1258,7 @@ func (c *ClusterClient) checkMovedErr(
return false return false
} }
node, err := c.nodes.Get(addr) node, err := c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
return false return false
} }
@ -1422,7 +1444,7 @@ func (c *ClusterClient) cmdsMoved(
addr string, addr string,
failedCmds *cmdsMap, failedCmds *cmdsMap,
) error { ) error {
node, err := c.nodes.Get(addr) node, err := c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
return err return err
} }
@ -1477,7 +1499,7 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s
moved, ask, addr := isMovedError(err) moved, ask, addr := isMovedError(err)
if moved || ask { if moved || ask {
node, err = c.nodes.Get(addr) node, err = c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
return err return err
} }
@ -1589,7 +1611,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
for _, idx := range perm { for _, idx := range perm {
addr := addrs[idx] addr := addrs[idx]
node, err := c.nodes.Get(addr) node, err := c.nodes.GetOrCreate(addr)
if err != nil { if err != nil {
if firstErr == nil { if firstErr == nil {
firstErr = err firstErr = err