diff --git a/cluster.go b/cluster.go index ff935fc4..9d07d04f 100644 --- a/cluster.go +++ b/cluster.go @@ -649,28 +649,33 @@ func (c *ClusterClient) execClusterCmds( if err == nil { continue } - if internal.IsNetworkError(err) { + + if i == 0 && internal.IsNetworkError(err) { cmd.reset() - failedCmds[nil] = append(failedCmds[nil], cmds[i:]...) + failedCmds[nil] = append(failedCmds[nil], cmds...) break } + moved, ask, addr := internal.IsMovedError(err) if moved { c.lazyReloadSlots() - cmd.reset() + node, err := c.nodeByAddr(addr) if err != nil { setRetErr(err) continue } + + cmd.reset() failedCmds[node] = append(failedCmds[node], cmd) } else if ask { - cmd.reset() node, err := c.nodeByAddr(addr) if err != nil { setRetErr(err) continue } + + cmd.reset() failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) } else { setRetErr(err) diff --git a/pipeline.go b/pipeline.go index 1cbc00f3..ac8bcc2f 100644 --- a/pipeline.go +++ b/pipeline.go @@ -84,26 +84,23 @@ func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) { return cmds, err } -func execCmds(cn *pool.Conn, cmds []Cmder) ([]Cmder, error) { +func execCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) { if err := writeCmd(cn, cmds...); err != nil { setCmdsErr(cmds, err) - return cmds, err + return true, err } - var firstCmdErr error - var failedCmds []Cmder - for _, cmd := range cmds { + for i, cmd := range cmds { err := cmd.readReply(cn) if err == nil { continue } - if firstCmdErr == nil { - firstCmdErr = err + if i == 0 && internal.IsNetworkError(err) { + return true, err } - if internal.IsRetryableError(err) { - failedCmds = append(failedCmds, cmd) + if firstErr == nil { + firstErr = err } } - - return failedCmds, firstCmdErr + return false, firstErr } diff --git a/redis.go b/redis.go index e866a5ca..aee38933 100644 --- a/redis.go +++ b/redis.go @@ -197,28 +197,31 @@ func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { } func (c *Client) pipelineExec(cmds []Cmder) error { - var retErr error - failedCmds := cmds + var firstErr error for i := 0; i <= c.opt.MaxRetries; i++ { + if i > 0 { + resetCmds(cmds) + } + cn, _, err := c.conn() if err != nil { - setCmdsErr(failedCmds, err) + setCmdsErr(cmds, err) return err } - if i > 0 { - resetCmds(failedCmds) - } - failedCmds, err = execCmds(cn, failedCmds) + retry, err := execCmds(cn, cmds) c.putConn(cn, err, false) - if err != nil && retErr == nil { - retErr = err + if err == nil { + return nil } - if len(failedCmds) == 0 { + if firstErr == nil { + firstErr = err + } + if !retry { break } } - return retErr + return firstErr } func (c *Client) pubSub() *PubSub { diff --git a/ring.go b/ring.go index 77193f32..96085346 100644 --- a/ring.go +++ b/ring.go @@ -365,9 +365,7 @@ func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) { return c.Pipeline().pipelined(fn) } -func (c *Ring) pipelineExec(cmds []Cmder) error { - var retErr error - +func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.arg(0)) @@ -379,14 +377,18 @@ func (c *Ring) pipelineExec(cmds []Cmder) error { } for i := 0; i <= c.opt.MaxRetries; i++ { - failedCmdsMap := make(map[string][]Cmder) + var failedCmdsMap map[string][]Cmder for name, cmds := range cmdsMap { + if i > 0 { + resetCmds(cmds) + } + client, err := c.shardByName(name) if err != nil { setCmdsErr(cmds, err) - if retErr == nil { - retErr = err + if firstErr == nil { + firstErr = err } continue } @@ -394,22 +396,25 @@ func (c *Ring) pipelineExec(cmds []Cmder) error { cn, _, err := client.conn() if err != nil { setCmdsErr(cmds, err) - if retErr == nil { - retErr = err + if firstErr == nil { + firstErr = err } continue } - if i > 0 { - resetCmds(cmds) - } - failedCmds, err := execCmds(cn, cmds) + retry, err := execCmds(cn, cmds) client.putConn(cn, err, false) - if err != nil && retErr == nil { - retErr = err + if err == nil { + continue } - if len(failedCmds) > 0 { - failedCmdsMap[name] = failedCmds + if firstErr == nil { + firstErr = err + } + if retry { + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) + } + failedCmdsMap[name] = cmds } } @@ -419,5 +424,5 @@ func (c *Ring) pipelineExec(cmds []Cmder) error { cmdsMap = failedCmdsMap } - return retErr + return firstErr }