forked from mirror/redis
Compare commits
1 Commits
master
...
fix/moved-
Author | SHA1 | Date |
---|---|---|
Vladimir Mihailenco | 425f2fc69b |
62
cluster.go
62
cluster.go
|
@ -170,6 +170,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
type clusterNode struct {
|
||||
id string
|
||||
Client *Client
|
||||
|
||||
latency uint32 // atomic
|
||||
|
@ -177,10 +178,11 @@ type clusterNode struct {
|
|||
failing uint32 // atomic
|
||||
}
|
||||
|
||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||
func newClusterNode(clOpt *ClusterOptions, id, addr string) *clusterNode {
|
||||
opt := clOpt.clientOptions()
|
||||
opt.Addr = addr
|
||||
node := clusterNode{
|
||||
id: id,
|
||||
Client: clOpt.NewClient(opt),
|
||||
}
|
||||
|
||||
|
@ -352,33 +354,51 @@ func (c *clusterNodes) GC(generation uint32) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
|
||||
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
|
||||
return c.GetOrCreateWithID(addr, "")
|
||||
}
|
||||
|
||||
func (c *clusterNodes) GetOrCreateWithID(addr, id string) (*clusterNode, error) {
|
||||
node, err := c.get(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if node != nil {
|
||||
if node != nil && (id == "" || node.id == id) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
node, oldNode, err := c.getOrCreate(addr, id)
|
||||
c.mu.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return nil, pool.ErrClosed
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if oldNode != nil {
|
||||
_ = oldNode.Client.Close()
|
||||
}
|
||||
|
||||
node, ok := c.nodes[addr]
|
||||
if ok {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (c *clusterNodes) getOrCreate(addr, id string) (node, oldNode *clusterNode, _ error) {
|
||||
if c.closed {
|
||||
return nil, nil, pool.ErrClosed
|
||||
}
|
||||
|
||||
node = newClusterNode(c.opt, addr)
|
||||
|
||||
oldNode, ok := c.nodes[addr]
|
||||
if ok {
|
||||
// The id is changed when node is re-configured, for example, IP addr is changed.
|
||||
if id == "" || oldNode.id == id {
|
||||
return oldNode, nil, nil
|
||||
}
|
||||
} else {
|
||||
c.addrs = appendIfNotExists(c.addrs, addr)
|
||||
}
|
||||
|
||||
node = newClusterNode(c.opt, id, addr)
|
||||
c.nodes[addr] = node
|
||||
|
||||
return node, nil
|
||||
return node, oldNode, nil
|
||||
}
|
||||
|
||||
func (c *clusterNodes) get(addr string) (*clusterNode, error) {
|
||||
|
@ -416,7 +436,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
|
|||
}
|
||||
|
||||
n := rand.Intn(len(addrs))
|
||||
return c.Get(addrs[n])
|
||||
return c.GetOrCreate(addrs[n])
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
@ -474,7 +494,7 @@ func newClusterState(
|
|||
addr = replaceLoopbackHost(addr, originHost)
|
||||
}
|
||||
|
||||
node, err := c.nodes.Get(addr)
|
||||
node, err := c.nodes.GetOrCreateWithID(addr, slotNode.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -824,8 +844,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
|||
var addr string
|
||||
moved, ask, addr = isMovedError(lastErr)
|
||||
if moved || ask {
|
||||
c.state.LazyReload()
|
||||
|
||||
var err error
|
||||
node, err = c.nodes.Get(addr)
|
||||
node, err = c.nodes.GetOrCreate(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1022,7 +1044,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
|
|||
for _, idx := range rand.Perm(len(addrs)) {
|
||||
addr := addrs[idx]
|
||||
|
||||
node, err := c.nodes.Get(addr)
|
||||
node, err := c.nodes.GetOrCreate(addr)
|
||||
if err != nil {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
|
@ -1236,7 +1258,7 @@ func (c *ClusterClient) checkMovedErr(
|
|||
return false
|
||||
}
|
||||
|
||||
node, err := c.nodes.Get(addr)
|
||||
node, err := c.nodes.GetOrCreate(addr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
@ -1422,7 +1444,7 @@ func (c *ClusterClient) cmdsMoved(
|
|||
addr string,
|
||||
failedCmds *cmdsMap,
|
||||
) error {
|
||||
node, err := c.nodes.Get(addr)
|
||||
node, err := c.nodes.GetOrCreate(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1477,7 +1499,7 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s
|
|||
|
||||
moved, ask, addr := isMovedError(err)
|
||||
if moved || ask {
|
||||
node, err = c.nodes.Get(addr)
|
||||
node, err = c.nodes.GetOrCreate(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1589,7 +1611,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
|
|||
for _, idx := range perm {
|
||||
addr := addrs[idx]
|
||||
|
||||
node, err := c.nodes.Get(addr)
|
||||
node, err := c.nodes.GetOrCreate(addr)
|
||||
if err != nil {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
|
|
Loading…
Reference in New Issue