diff --git a/cluster.go b/cluster.go index 5348333..697c406 100644 --- a/cluster.go +++ b/cluster.go @@ -1046,7 +1046,7 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error { cmdsMap := newCmdsMap() - err := c.mapCmdsByNode(cmds, cmdsMap) + err := c.mapCmdsByNode(cmdsMap, cmds) if err != nil { setCmdsErr(cmds, err) return err @@ -1080,11 +1080,15 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro return c.pipelineReadCmds(node, rd, cmds, failedCmds) }) }) - if err != nil { - err = c.mapCmdsByNode(cmds, failedCmds) - if err != nil { + if err == nil { + return + } + if attempt < c.opt.MaxRedirects { + if err := c.mapCmdsByNode(failedCmds, cmds); err != nil { setCmdsErr(cmds, err) } + } else { + setCmdsErr(cmds, err) } }(node, cmds) } @@ -1099,41 +1103,27 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro return cmdsFirstErr(cmds) } -type cmdsMap struct { - mu sync.Mutex - m map[*clusterNode][]Cmder -} - -func newCmdsMap() *cmdsMap { - return &cmdsMap{ - m: make(map[*clusterNode][]Cmder), - } -} - -func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) { - m.mu.Lock() - m.m[node] = append(m.m[node], cmds...) - m.mu.Unlock() -} - -func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { +func (c *ClusterClient) mapCmdsByNode(cmdsMap *cmdsMap, cmds []Cmder) error { state, err := c.state.Get() if err != nil { return err } - cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) + if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) { + for _, cmd := range cmds { + slot := c.cmdSlot(cmd) + node, err := c.slotReadOnlyNode(state, slot) + if err != nil { + return err + } + cmdsMap.Add(node, cmd) + } + return nil + } + for _, cmd := range cmds { slot := c.cmdSlot(cmd) - - var node *clusterNode - var err error - if cmdsAreReadOnly { - cmdInfo := c.cmdInfo(cmd.Name()) - node, err = c.cmdNode(cmdInfo, slot) - } else { - node, err = state.slotMasterNode(slot) - } + node, err := state.slotMasterNode(slot) if err != nil { return err } @@ -1261,7 +1251,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er return err } - err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { err := c.txPipelineReadQueued(rd, cmds, failedCmds) if err != nil { moved, ask, addr := isMovedError(err) @@ -1272,13 +1262,16 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er } return pipelineReadCmds(rd, cmds) }) - return err }) - if err != nil { - err = c.mapCmdsByNode(cmds, failedCmds) - if err != nil { + if err == nil { + return + } + if attempt < c.opt.MaxRedirects { + if err := c.mapCmdsByNode(failedCmds, cmds); err != nil { setCmdsErr(cmds, err) } + } else { + setCmdsErr(cmds, err) } }(node, cmds) } @@ -1561,29 +1554,27 @@ func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, e } if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { - if c.opt.RouteByLatency { - return state.slotClosestNode(slot) - } - if c.opt.RouteRandomly { - return state.slotRandomNode(slot) - } - return state.slotSlaveNode(slot) + return c.slotReadOnlyNode(state, slot) } - return state.slotMasterNode(slot) } +func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) { + if c.opt.RouteByLatency { + return state.slotClosestNode(slot) + } + if c.opt.RouteRandomly { + return state.slotRandomNode(slot) + } + return state.slotSlaveNode(slot) +} + func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { state, err := c.state.Get() if err != nil { return nil, err } - - nodes := state.slotNodes(slot) - if len(nodes) > 0 { - return nodes[0], nil - } - return c.nodes.Random() + return state.slotMasterNode(slot) } func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { @@ -1622,3 +1613,22 @@ func remove(ss []string, es ...string) []string { } return ss } + +//------------------------------------------------------------------------------ + +type cmdsMap struct { + mu sync.Mutex + m map[*clusterNode][]Cmder +} + +func newCmdsMap() *cmdsMap { + return &cmdsMap{ + m: make(map[*clusterNode][]Cmder), + } +} + +func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) { + m.mu.Lock() + m.m[node] = append(m.m[node], cmds...) + m.mu.Unlock() +} diff --git a/options.go b/options.go index 1c8ccbe..ae26919 100644 --- a/options.go +++ b/options.go @@ -152,6 +152,9 @@ func (opt *Options) init() { opt.IdleCheckFrequency = time.Minute } + if opt.MaxRetries == -1 { + opt.MaxRetries = 0 + } switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0