Merge pull request #1252 from go-redis/fix/set-cmd-err

Set error on the commands in case there are no more attempts left
This commit is contained in:
Vladimir Mihailenco 2020-02-02 14:50:31 +02:00 committed by GitHub
commit 2df96f7ef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 51 deletions

View File

@ -1046,7 +1046,7 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error
func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
cmdsMap := newCmdsMap()
err := c.mapCmdsByNode(cmds, cmdsMap)
err := c.mapCmdsByNode(cmdsMap, cmds)
if err != nil {
setCmdsErr(cmds, err)
return err
@ -1080,11 +1080,15 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
return c.pipelineReadCmds(node, rd, cmds, failedCmds)
})
})
if err != nil {
err = c.mapCmdsByNode(cmds, failedCmds)
if err != nil {
if err == nil {
return
}
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
setCmdsErr(cmds, err)
}
} else {
setCmdsErr(cmds, err)
}
}(node, cmds)
}
@ -1099,41 +1103,27 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
return cmdsFirstErr(cmds)
}
type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
m.mu.Lock()
m.m[node] = append(m.m[node], cmds...)
m.mu.Unlock()
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
func (c *ClusterClient) mapCmdsByNode(cmdsMap *cmdsMap, cmds []Cmder) error {
state, err := c.state.Get()
if err != nil {
return err
}
cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds)
if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
var node *clusterNode
var err error
if cmdsAreReadOnly {
cmdInfo := c.cmdInfo(cmd.Name())
node, err = c.cmdNode(cmdInfo, slot)
} else {
node, err = state.slotMasterNode(slot)
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
}
cmdsMap.Add(node, cmd)
}
return nil
}
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
node, err := state.slotMasterNode(slot)
if err != nil {
return err
}
@ -1261,7 +1251,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
return err
}
err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
err := c.txPipelineReadQueued(rd, cmds, failedCmds)
if err != nil {
moved, ask, addr := isMovedError(err)
@ -1272,13 +1262,16 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
}
return pipelineReadCmds(rd, cmds)
})
return err
})
if err != nil {
err = c.mapCmdsByNode(cmds, failedCmds)
if err != nil {
if err == nil {
return
}
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
setCmdsErr(cmds, err)
}
} else {
setCmdsErr(cmds, err)
}
}(node, cmds)
}
@ -1561,6 +1554,12 @@ func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, e
}
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
return c.slotReadOnlyNode(state, slot)
}
return state.slotMasterNode(slot)
}
func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
@ -1570,20 +1569,12 @@ func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, e
return state.slotSlaveNode(slot)
}
return state.slotMasterNode(slot)
}
func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
state, err := c.state.Get()
if err != nil {
return nil, err
}
nodes := state.slotNodes(slot)
if len(nodes) > 0 {
return nodes[0], nil
}
return c.nodes.Random()
return state.slotMasterNode(slot)
}
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
@ -1622,3 +1613,22 @@ func remove(ss []string, es ...string) []string {
}
return ss
}
//------------------------------------------------------------------------------
type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
m.mu.Lock()
m.m[node] = append(m.m[node], cmds...)
m.mu.Unlock()
}

View File

@ -152,6 +152,9 @@ func (opt *Options) init() {
opt.IdleCheckFrequency = time.Minute
}
if opt.MaxRetries == -1 {
opt.MaxRetries = 0
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0