diff --git a/cluster.go b/cluster.go index afaf3f3..accdb3d 100644 --- a/cluster.go +++ b/cluster.go @@ -226,7 +226,7 @@ func (c *clusterNodes) NextGeneration() uint32 { } // GC removes unused nodes. -func (c *clusterNodes) GC(generation uint32) error { +func (c *clusterNodes) GC(generation uint32) { var collected []*clusterNode c.mu.Lock() for i := 0; i < len(c.addrs); { @@ -243,14 +243,11 @@ func (c *clusterNodes) GC(generation uint32) error { } c.mu.Unlock() - var firstErr error - for _, node := range collected { - if err := node.Client.Close(); err != nil && firstErr == nil { - firstErr = err + time.AfterFunc(time.Minute, func() { + for _, node := range collected { + _ = node.Client.Close() } - } - - return firstErr + }) } func (c *clusterNodes) All() ([]*clusterNode, error) { @@ -596,6 +593,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { break } + if internal.IsRetryableError(err, true) { + continue + } + moved, ask, addr := internal.IsMovedError(err) if moved || ask { c.lazyReloadState() @@ -606,6 +607,13 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } + if err == pool.ErrClosed { + node, err = state.slotMasterNode(slot) + if err != nil { + return err + } + } + return err } @@ -641,10 +649,10 @@ func (c *ClusterClient) Process(cmd Cmder) error { if ask { pipe := node.Client.Pipeline() - pipe.Process(NewCmd("ASKING")) - pipe.Process(cmd) + _ = pipe.Process(NewCmd("ASKING")) + _ = pipe.Process(cmd) _, err = pipe.Exec() - pipe.Close() + _ = pipe.Close() ask = false } else { err = node.Client.Process(cmd) @@ -685,6 +693,14 @@ func (c *ClusterClient) Process(cmd Cmder) error { continue } + if err == pool.ErrClosed { + _, node, err = c.cmdSlotAndNode(state, cmd) + if err != nil { + cmd.setErr(err) + return err + } + } + break } @@ -921,7 +937,11 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { for node, cmds := range cmdsMap { cn, _, err := node.Client.getConn() if err != nil { - setCmdsErr(cmds, err) + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } continue } @@ -961,6 +981,18 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e return cmdsMap, nil } +func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { + remappedCmds, err := c.mapCmdsByNode(cmds) + if err != nil { + setCmdsErr(cmds, err) + return + } + + for node, cmds := range remappedCmds { + failedCmds[node] = cmds + } +} + func (c *ClusterClient) pipelineProcessCmds( node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { @@ -1067,7 +1099,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { for node, cmds := range cmdsMap { cn, _, err := node.Client.getConn() if err != nil { - setCmdsErr(cmds, err) + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } continue }