Merge pull request #936 from go-redis/fix/retry-cluster-pipeline-read-cmds

Retry cluster pipeline read commands
This commit is contained in:
Vladimir Mihailenco 2018-12-17 16:16:01 +02:00 committed by GitHub
commit 7f89fbac80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 4 deletions

View File

@ -1314,14 +1314,15 @@ func (c *ClusterClient) pipelineProcessCmds(
} }
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
return c.pipelineReadCmds(rd, cmds, failedCmds) return c.pipelineReadCmds(node, rd, cmds, failedCmds)
}) })
return err return err
} }
func (c *ClusterClient) pipelineReadCmds( func (c *ClusterClient) pipelineReadCmds(
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error { ) error {
var firstErr error
for _, cmd := range cmds { for _, cmd := range cmds {
err := cmd.readReply(rd) err := cmd.readReply(rd)
if err == nil { if err == nil {
@ -1336,9 +1337,14 @@ func (c *ClusterClient) pipelineReadCmds(
continue continue
} }
return err failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
if firstErr == nil {
firstErr = err
} }
return nil }
return firstErr
} }
func (c *ClusterClient) checkMovedErr( func (c *ClusterClient) checkMovedErr(