mirror of https://github.com/go-redis/redis.git
Merge pull request #402 from go-redis/fix/retry-multi-cmds-conservatively
Retry multiple commands more conservatively.
This commit is contained in:
commit
3490ff5d21
13
cluster.go
13
cluster.go
|
@ -649,28 +649,33 @@ func (c *ClusterClient) execClusterCmds(
|
|||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if internal.IsNetworkError(err) {
|
||||
|
||||
if i == 0 && internal.IsNetworkError(err) {
|
||||
cmd.reset()
|
||||
failedCmds[nil] = append(failedCmds[nil], cmds[i:]...)
|
||||
failedCmds[nil] = append(failedCmds[nil], cmds...)
|
||||
break
|
||||
}
|
||||
|
||||
moved, ask, addr := internal.IsMovedError(err)
|
||||
if moved {
|
||||
c.lazyReloadSlots()
|
||||
cmd.reset()
|
||||
|
||||
node, err := c.nodeByAddr(addr)
|
||||
if err != nil {
|
||||
setRetErr(err)
|
||||
continue
|
||||
}
|
||||
|
||||
cmd.reset()
|
||||
failedCmds[node] = append(failedCmds[node], cmd)
|
||||
} else if ask {
|
||||
cmd.reset()
|
||||
node, err := c.nodeByAddr(addr)
|
||||
if err != nil {
|
||||
setRetErr(err)
|
||||
continue
|
||||
}
|
||||
|
||||
cmd.reset()
|
||||
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
|
||||
} else {
|
||||
setRetErr(err)
|
||||
|
|
19
pipeline.go
19
pipeline.go
|
@ -85,26 +85,23 @@ func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
|
|||
return cmds, err
|
||||
}
|
||||
|
||||
func execCmds(cn *pool.Conn, cmds []Cmder) ([]Cmder, error) {
|
||||
func execCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
|
||||
if err := writeCmd(cn, cmds...); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return cmds, err
|
||||
return true, err
|
||||
}
|
||||
|
||||
var firstCmdErr error
|
||||
var failedCmds []Cmder
|
||||
for _, cmd := range cmds {
|
||||
for i, cmd := range cmds {
|
||||
err := cmd.readReply(cn)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if firstCmdErr == nil {
|
||||
firstCmdErr = err
|
||||
if i == 0 && internal.IsNetworkError(err) {
|
||||
return true, err
|
||||
}
|
||||
if internal.IsRetryableError(err) {
|
||||
failedCmds = append(failedCmds, cmd)
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
return failedCmds, firstCmdErr
|
||||
return false, firstErr
|
||||
}
|
||||
|
|
25
redis.go
25
redis.go
|
@ -197,28 +197,31 @@ func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
|
|||
}
|
||||
|
||||
func (c *Client) pipelineExec(cmds []Cmder) error {
|
||||
var retErr error
|
||||
failedCmds := cmds
|
||||
var firstErr error
|
||||
for i := 0; i <= c.opt.MaxRetries; i++ {
|
||||
if i > 0 {
|
||||
resetCmds(cmds)
|
||||
}
|
||||
|
||||
cn, _, err := c.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(failedCmds, err)
|
||||
setCmdsErr(cmds, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
resetCmds(failedCmds)
|
||||
}
|
||||
failedCmds, err = execCmds(cn, failedCmds)
|
||||
retry, err := execCmds(cn, cmds)
|
||||
c.putConn(cn, err, false)
|
||||
if err != nil && retErr == nil {
|
||||
retErr = err
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if len(failedCmds) == 0 {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
if !retry {
|
||||
break
|
||||
}
|
||||
}
|
||||
return retErr
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (c *Client) pubSub() *PubSub {
|
||||
|
|
39
ring.go
39
ring.go
|
@ -365,9 +365,7 @@ func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
|
|||
return c.Pipeline().pipelined(fn)
|
||||
}
|
||||
|
||||
func (c *Ring) pipelineExec(cmds []Cmder) error {
|
||||
var retErr error
|
||||
|
||||
func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
cmdInfo := c.cmdInfo(cmd.arg(0))
|
||||
|
@ -379,14 +377,18 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
|
|||
}
|
||||
|
||||
for i := 0; i <= c.opt.MaxRetries; i++ {
|
||||
failedCmdsMap := make(map[string][]Cmder)
|
||||
var failedCmdsMap map[string][]Cmder
|
||||
|
||||
for name, cmds := range cmdsMap {
|
||||
if i > 0 {
|
||||
resetCmds(cmds)
|
||||
}
|
||||
|
||||
client, err := c.shardByName(name)
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
if retErr == nil {
|
||||
retErr = err
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -394,22 +396,25 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
|
|||
cn, _, err := client.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
if retErr == nil {
|
||||
retErr = err
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if i > 0 {
|
||||
resetCmds(cmds)
|
||||
}
|
||||
failedCmds, err := execCmds(cn, cmds)
|
||||
retry, err := execCmds(cn, cmds)
|
||||
client.putConn(cn, err, false)
|
||||
if err != nil && retErr == nil {
|
||||
retErr = err
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if len(failedCmds) > 0 {
|
||||
failedCmdsMap[name] = failedCmds
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
if retry {
|
||||
if failedCmdsMap == nil {
|
||||
failedCmdsMap = make(map[string][]Cmder)
|
||||
}
|
||||
failedCmdsMap[name] = cmds
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -419,5 +424,5 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
|
|||
cmdsMap = failedCmdsMap
|
||||
}
|
||||
|
||||
return retErr
|
||||
return firstErr
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue