From 10edc85b676ecf99ff3059d5f9c93403ce2a595a Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 13 Dec 2018 13:27:41 +0200 Subject: [PATCH] Retry cluster pipeline read commands --- cluster.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cluster.go b/cluster.go index af2b057..da5508a 100644 --- a/cluster.go +++ b/cluster.go @@ -1314,14 +1314,15 @@ func (c *ClusterClient) pipelineProcessCmds( } err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { - return c.pipelineReadCmds(rd, cmds, failedCmds) + return c.pipelineReadCmds(node, rd, cmds, failedCmds) }) return err } func (c *ClusterClient) pipelineReadCmds( - rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, + node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { + var firstErr error for _, cmd := range cmds { err := cmd.readReply(rd) if err == nil { @@ -1336,9 +1337,14 @@ func (c *ClusterClient) pipelineReadCmds( continue } - return err + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() + if firstErr == nil { + firstErr = err + } } - return nil + return firstErr } func (c *ClusterClient) checkMovedErr(