forked from mirror/redis
Compare commits
1 Commits
master
...
fix/moved-
Author | SHA1 | Date |
---|---|---|
Vladimir Mihailenco | 425f2fc69b |
60
cluster.go
60
cluster.go
|
@ -170,6 +170,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type clusterNode struct {
|
type clusterNode struct {
|
||||||
|
id string
|
||||||
Client *Client
|
Client *Client
|
||||||
|
|
||||||
latency uint32 // atomic
|
latency uint32 // atomic
|
||||||
|
@ -177,10 +178,11 @@ type clusterNode struct {
|
||||||
failing uint32 // atomic
|
failing uint32 // atomic
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
func newClusterNode(clOpt *ClusterOptions, id, addr string) *clusterNode {
|
||||||
opt := clOpt.clientOptions()
|
opt := clOpt.clientOptions()
|
||||||
opt.Addr = addr
|
opt.Addr = addr
|
||||||
node := clusterNode{
|
node := clusterNode{
|
||||||
|
id: id,
|
||||||
Client: clOpt.NewClient(opt),
|
Client: clOpt.NewClient(opt),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,33 +354,51 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
|
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
|
||||||
|
return c.GetOrCreateWithID(addr, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clusterNodes) GetOrCreateWithID(addr, id string) (*clusterNode, error) {
|
||||||
node, err := c.get(addr)
|
node, err := c.get(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if node != nil {
|
if node != nil && (id == "" || node.id == id) {
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
node, oldNode, err := c.getOrCreate(addr, id)
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if oldNode != nil {
|
||||||
|
_ = oldNode.Client.Close()
|
||||||
|
}
|
||||||
|
return node, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clusterNodes) getOrCreate(addr, id string) (node, oldNode *clusterNode, _ error) {
|
||||||
if c.closed {
|
if c.closed {
|
||||||
return nil, pool.ErrClosed
|
return nil, nil, pool.ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
node, ok := c.nodes[addr]
|
oldNode, ok := c.nodes[addr]
|
||||||
if ok {
|
if ok {
|
||||||
return node, nil
|
// The id is changed when node is re-configured, for example, IP addr is changed.
|
||||||
|
if id == "" || oldNode.id == id {
|
||||||
|
return oldNode, nil, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
c.addrs = appendIfNotExists(c.addrs, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
node = newClusterNode(c.opt, addr)
|
node = newClusterNode(c.opt, id, addr)
|
||||||
|
|
||||||
c.addrs = appendIfNotExists(c.addrs, addr)
|
|
||||||
c.nodes[addr] = node
|
c.nodes[addr] = node
|
||||||
|
|
||||||
return node, nil
|
return node, oldNode, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clusterNodes) get(addr string) (*clusterNode, error) {
|
func (c *clusterNodes) get(addr string) (*clusterNode, error) {
|
||||||
|
@ -416,7 +436,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
n := rand.Intn(len(addrs))
|
n := rand.Intn(len(addrs))
|
||||||
return c.Get(addrs[n])
|
return c.GetOrCreate(addrs[n])
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -474,7 +494,7 @@ func newClusterState(
|
||||||
addr = replaceLoopbackHost(addr, originHost)
|
addr = replaceLoopbackHost(addr, originHost)
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := c.nodes.Get(addr)
|
node, err := c.nodes.GetOrCreateWithID(addr, slotNode.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -824,8 +844,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
||||||
var addr string
|
var addr string
|
||||||
moved, ask, addr = isMovedError(lastErr)
|
moved, ask, addr = isMovedError(lastErr)
|
||||||
if moved || ask {
|
if moved || ask {
|
||||||
|
c.state.LazyReload()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
node, err = c.nodes.Get(addr)
|
node, err = c.nodes.GetOrCreate(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1022,7 +1044,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
|
||||||
for _, idx := range rand.Perm(len(addrs)) {
|
for _, idx := range rand.Perm(len(addrs)) {
|
||||||
addr := addrs[idx]
|
addr := addrs[idx]
|
||||||
|
|
||||||
node, err := c.nodes.Get(addr)
|
node, err := c.nodes.GetOrCreate(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if firstErr == nil {
|
if firstErr == nil {
|
||||||
firstErr = err
|
firstErr = err
|
||||||
|
@ -1236,7 +1258,7 @@ func (c *ClusterClient) checkMovedErr(
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := c.nodes.Get(addr)
|
node, err := c.nodes.GetOrCreate(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -1422,7 +1444,7 @@ func (c *ClusterClient) cmdsMoved(
|
||||||
addr string,
|
addr string,
|
||||||
failedCmds *cmdsMap,
|
failedCmds *cmdsMap,
|
||||||
) error {
|
) error {
|
||||||
node, err := c.nodes.Get(addr)
|
node, err := c.nodes.GetOrCreate(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1477,7 +1499,7 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s
|
||||||
|
|
||||||
moved, ask, addr := isMovedError(err)
|
moved, ask, addr := isMovedError(err)
|
||||||
if moved || ask {
|
if moved || ask {
|
||||||
node, err = c.nodes.Get(addr)
|
node, err = c.nodes.GetOrCreate(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1589,7 +1611,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
|
||||||
for _, idx := range perm {
|
for _, idx := range perm {
|
||||||
addr := addrs[idx]
|
addr := addrs[idx]
|
||||||
|
|
||||||
node, err := c.nodes.Get(addr)
|
node, err := c.nodes.GetOrCreate(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if firstErr == nil {
|
if firstErr == nil {
|
||||||
firstErr = err
|
firstErr = err
|
||||||
|
|
Loading…
Reference in New Issue