execute commans concurrently on each cluster node in method `defaultProcessPipeline` (#861)

Execute commands concurrently on each cluster node
This commit is contained in:
zhanyr 2018-09-11 17:37:57 +08:00 committed by Vladimir Mihailenco
parent dd997adc42
commit a9e329d3bc
1 changed files with 38 additions and 15 deletions

View File

@ -1254,24 +1254,47 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder) failedCmds := make(map[*clusterNode][]Cmder)
var wg sync.WaitGroup
var lock sync.RWMutex
for node, cmds := range cmdsMap { for node, cmds := range cmdsMap {
cn, err := node.Client.getConn() wg.Add(1)
if err != nil { go func(node *clusterNode, cmds []Cmder) {
if err == pool.ErrClosed { defer wg.Done()
c.remapCmds(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
continue
}
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) failedCmdsTmp := make(map[*clusterNode][]Cmder)
if err == nil || internal.IsRedisError(err) {
node.Client.connPool.Put(cn) cn, err := node.Client.getConn()
} else { if err != nil {
node.Client.connPool.Remove(cn) if err == pool.ErrClosed {
} c.remapCmds(cmds, failedCmdsTmp)
} else {
setCmdsErr(cmds, err)
}
} else {
err = c.pipelineProcessCmds(node, cn, cmds, failedCmdsTmp)
if err == nil || internal.IsRedisError(err) {
node.Client.connPool.Put(cn)
} else {
node.Client.connPool.Remove(cn)
}
}
if len(failedCmdsTmp) > 0 {
for node, cs := range failedCmdsTmp {
lock.Lock()
if _, ok := failedCmds[node]; ok {
failedCmds[node] = append(failedCmds[node], cs...)
} else {
failedCmds[node] = cs
}
lock.Unlock()
}
}
}(node, cmds)
} }
wg.Wait()
if len(failedCmds) == 0 { if len(failedCmds) == 0 {
break break