Retry multiple commands more conservatively.

This commit is contained in:
Vladimir Mihailenco 2016-10-13 13:56:24 +03:00
parent 21cae7f9ab
commit 8558a92fa4
4 changed files with 53 additions and 43 deletions

View File

@ -649,28 +649,33 @@ func (c *ClusterClient) execClusterCmds(
if err == nil { if err == nil {
continue continue
} }
if internal.IsNetworkError(err) {
if i == 0 && internal.IsNetworkError(err) {
cmd.reset() cmd.reset()
failedCmds[nil] = append(failedCmds[nil], cmds[i:]...) failedCmds[nil] = append(failedCmds[nil], cmds...)
break break
} }
moved, ask, addr := internal.IsMovedError(err) moved, ask, addr := internal.IsMovedError(err)
if moved { if moved {
c.lazyReloadSlots() c.lazyReloadSlots()
cmd.reset()
node, err := c.nodeByAddr(addr) node, err := c.nodeByAddr(addr)
if err != nil { if err != nil {
setRetErr(err) setRetErr(err)
continue continue
} }
cmd.reset()
failedCmds[node] = append(failedCmds[node], cmd) failedCmds[node] = append(failedCmds[node], cmd)
} else if ask { } else if ask {
cmd.reset()
node, err := c.nodeByAddr(addr) node, err := c.nodeByAddr(addr)
if err != nil { if err != nil {
setRetErr(err) setRetErr(err)
continue continue
} }
cmd.reset()
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
} else { } else {
setRetErr(err) setRetErr(err)

View File

@ -84,26 +84,23 @@ func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return cmds, err return cmds, err
} }
func execCmds(cn *pool.Conn, cmds []Cmder) ([]Cmder, error) { func execCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
if err := writeCmd(cn, cmds...); err != nil { if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return cmds, err return true, err
} }
var firstCmdErr error for i, cmd := range cmds {
var failedCmds []Cmder
for _, cmd := range cmds {
err := cmd.readReply(cn) err := cmd.readReply(cn)
if err == nil { if err == nil {
continue continue
} }
if firstCmdErr == nil { if i == 0 && internal.IsNetworkError(err) {
firstCmdErr = err return true, err
} }
if internal.IsRetryableError(err) { if firstErr == nil {
failedCmds = append(failedCmds, cmd) firstErr = err
} }
} }
return false, firstErr
return failedCmds, firstCmdErr
} }

View File

@ -197,28 +197,31 @@ func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
} }
func (c *Client) pipelineExec(cmds []Cmder) error { func (c *Client) pipelineExec(cmds []Cmder) error {
var retErr error var firstErr error
failedCmds := cmds
for i := 0; i <= c.opt.MaxRetries; i++ { for i := 0; i <= c.opt.MaxRetries; i++ {
if i > 0 {
resetCmds(cmds)
}
cn, _, err := c.conn() cn, _, err := c.conn()
if err != nil { if err != nil {
setCmdsErr(failedCmds, err) setCmdsErr(cmds, err)
return err return err
} }
if i > 0 { retry, err := execCmds(cn, cmds)
resetCmds(failedCmds)
}
failedCmds, err = execCmds(cn, failedCmds)
c.putConn(cn, err, false) c.putConn(cn, err, false)
if err != nil && retErr == nil { if err == nil {
retErr = err return nil
} }
if len(failedCmds) == 0 { if firstErr == nil {
firstErr = err
}
if !retry {
break break
} }
} }
return retErr return firstErr
} }
func (c *Client) pubSub() *PubSub { func (c *Client) pubSub() *PubSub {

39
ring.go
View File

@ -365,9 +365,7 @@ func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn) return c.Pipeline().pipelined(fn)
} }
func (c *Ring) pipelineExec(cmds []Cmder) error { func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
var retErr error
cmdsMap := make(map[string][]Cmder) cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds { for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.arg(0)) cmdInfo := c.cmdInfo(cmd.arg(0))
@ -379,14 +377,18 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
} }
for i := 0; i <= c.opt.MaxRetries; i++ { for i := 0; i <= c.opt.MaxRetries; i++ {
failedCmdsMap := make(map[string][]Cmder) var failedCmdsMap map[string][]Cmder
for name, cmds := range cmdsMap { for name, cmds := range cmdsMap {
if i > 0 {
resetCmds(cmds)
}
client, err := c.shardByName(name) client, err := c.shardByName(name)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
if retErr == nil { if firstErr == nil {
retErr = err firstErr = err
} }
continue continue
} }
@ -394,22 +396,25 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
cn, _, err := client.conn() cn, _, err := client.conn()
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
if retErr == nil { if firstErr == nil {
retErr = err firstErr = err
} }
continue continue
} }
if i > 0 { retry, err := execCmds(cn, cmds)
resetCmds(cmds)
}
failedCmds, err := execCmds(cn, cmds)
client.putConn(cn, err, false) client.putConn(cn, err, false)
if err != nil && retErr == nil { if err == nil {
retErr = err continue
} }
if len(failedCmds) > 0 { if firstErr == nil {
failedCmdsMap[name] = failedCmds firstErr = err
}
if retry {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
failedCmdsMap[name] = cmds
} }
} }
@ -419,5 +424,5 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
cmdsMap = failedCmdsMap cmdsMap = failedCmdsMap
} }
return retErr return firstErr
} }