Set error on the commands in case there are no more attempts left

This commit is contained in:
Vladimir Mihailenco 2020-02-02 13:01:57 +02:00
parent 5edc4c8384
commit 45de1c42ae
2 changed files with 64 additions and 51 deletions

View File

@ -1046,7 +1046,7 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error
func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error { func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
cmdsMap := newCmdsMap() cmdsMap := newCmdsMap()
err := c.mapCmdsByNode(cmds, cmdsMap) err := c.mapCmdsByNode(cmdsMap, cmds)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
@ -1080,11 +1080,15 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
return c.pipelineReadCmds(node, rd, cmds, failedCmds) return c.pipelineReadCmds(node, rd, cmds, failedCmds)
}) })
}) })
if err != nil { if err == nil {
err = c.mapCmdsByNode(cmds, failedCmds) return
if err != nil { }
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
} }
} else {
setCmdsErr(cmds, err)
} }
}(node, cmds) }(node, cmds)
} }
@ -1099,41 +1103,27 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
return cmdsFirstErr(cmds) return cmdsFirstErr(cmds)
} }
type cmdsMap struct { func (c *ClusterClient) mapCmdsByNode(cmdsMap *cmdsMap, cmds []Cmder) error {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
m.mu.Lock()
m.m[node] = append(m.m[node], cmds...)
m.mu.Unlock()
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get() state, err := c.state.Get()
if err != nil { if err != nil {
return err return err
} }
cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
}
cmdsMap.Add(node, cmd)
}
return nil
}
for _, cmd := range cmds { for _, cmd := range cmds {
slot := c.cmdSlot(cmd) slot := c.cmdSlot(cmd)
node, err := state.slotMasterNode(slot)
var node *clusterNode
var err error
if cmdsAreReadOnly {
cmdInfo := c.cmdInfo(cmd.Name())
node, err = c.cmdNode(cmdInfo, slot)
} else {
node, err = state.slotMasterNode(slot)
}
if err != nil { if err != nil {
return err return err
} }
@ -1261,7 +1251,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
return err return err
} }
err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
err := c.txPipelineReadQueued(rd, cmds, failedCmds) err := c.txPipelineReadQueued(rd, cmds, failedCmds)
if err != nil { if err != nil {
moved, ask, addr := isMovedError(err) moved, ask, addr := isMovedError(err)
@ -1272,13 +1262,16 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
} }
return pipelineReadCmds(rd, cmds) return pipelineReadCmds(rd, cmds)
}) })
return err
}) })
if err != nil { if err == nil {
err = c.mapCmdsByNode(cmds, failedCmds) return
if err != nil { }
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
} }
} else {
setCmdsErr(cmds, err)
} }
}(node, cmds) }(node, cmds)
} }
@ -1561,29 +1554,27 @@ func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, e
} }
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency { return c.slotReadOnlyNode(state, slot)
return state.slotClosestNode(slot)
}
if c.opt.RouteRandomly {
return state.slotRandomNode(slot)
}
return state.slotSlaveNode(slot)
} }
return state.slotMasterNode(slot) return state.slotMasterNode(slot)
} }
func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
if c.opt.RouteRandomly {
return state.slotRandomNode(slot)
}
return state.slotSlaveNode(slot)
}
func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) { func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
state, err := c.state.Get() state, err := c.state.Get()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return state.slotMasterNode(slot)
nodes := state.slotNodes(slot)
if len(nodes) > 0 {
return nodes[0], nil
}
return c.nodes.Random()
} }
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
@ -1622,3 +1613,22 @@ func remove(ss []string, es ...string) []string {
} }
return ss return ss
} }
//------------------------------------------------------------------------------
type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
m.mu.Lock()
m.m[node] = append(m.m[node], cmds...)
m.mu.Unlock()
}

View File

@ -152,6 +152,9 @@ func (opt *Options) init() {
opt.IdleCheckFrequency = time.Minute opt.IdleCheckFrequency = time.Minute
} }
if opt.MaxRetries == -1 {
opt.MaxRetries = 0
}
switch opt.MinRetryBackoff { switch opt.MinRetryBackoff {
case -1: case -1:
opt.MinRetryBackoff = 0 opt.MinRetryBackoff = 0