mirror of https://github.com/go-redis/redis.git
Retry cluster pipeline read commands
This commit is contained in:
parent
45a00fd91a
commit
10edc85b67
14
cluster.go
14
cluster.go
|
@ -1314,14 +1314,15 @@ func (c *ClusterClient) pipelineProcessCmds(
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||||
return c.pipelineReadCmds(rd, cmds, failedCmds)
|
return c.pipelineReadCmds(node, rd, cmds, failedCmds)
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) pipelineReadCmds(
|
func (c *ClusterClient) pipelineReadCmds(
|
||||||
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
|
node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
|
||||||
) error {
|
) error {
|
||||||
|
var firstErr error
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
err := cmd.readReply(rd)
|
err := cmd.readReply(rd)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -1336,9 +1337,14 @@ func (c *ClusterClient) pipelineReadCmds(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
failedCmds.mu.Lock()
|
||||||
|
failedCmds.m[node] = append(failedCmds.m[node], cmd)
|
||||||
|
failedCmds.mu.Unlock()
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) checkMovedErr(
|
func (c *ClusterClient) checkMovedErr(
|
||||||
|
|
Loading…
Reference in New Issue