Process pipeline commands concurrently

This commit is contained in:
Vladimir Mihailenco 2018-09-11 13:09:16 +03:00
parent a9e329d3bc
commit f7094544a5
3 changed files with 111 additions and 97 deletions

View File

@ -1,5 +1,9 @@
# Changelog
## Unreleased
- Cluster and Ring pipelines process commands for each node in its own goroutine.
## 6.14
- Added Options.MinIdleConns.

View File

@ -1241,7 +1241,8 @@ func (c *ClusterClient) WrapProcessPipeline(
}
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
cmdsMap := newCmdsMap()
err := c.mapCmdsByNode(cmds, cmdsMap)
if err != nil {
setCmdsErr(cmds, err)
return err
@ -1252,51 +1253,35 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
failedCmds := make(map[*clusterNode][]Cmder)
failedCmds := newCmdsMap()
var wg sync.WaitGroup
var lock sync.RWMutex
for node, cmds := range cmdsMap {
for node, cmds := range cmdsMap.m {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
failedCmdsTmp := make(map[*clusterNode][]Cmder)
cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmdsTmp)
c.mapCmdsByNode(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
return
}
} else {
err = c.pipelineProcessCmds(node, cn, cmds, failedCmdsTmp)
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) {
node.Client.connPool.Put(cn)
} else {
node.Client.connPool.Remove(cn)
}
}
if len(failedCmdsTmp) > 0 {
for node, cs := range failedCmdsTmp {
lock.Lock()
if _, ok := failedCmds[node]; ok {
failedCmds[node] = append(failedCmds[node], cs...)
} else {
failedCmds[node] = cs
}
lock.Unlock()
}
}
}(node, cmds)
}
wg.Wait()
if len(failedCmds) == 0 {
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds
@ -1305,14 +1290,24 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}
func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get()
if err != nil {
setCmdsErr(cmds, err)
return nil, err
return err
}
cmdsMap := make(map[*clusterNode][]Cmder)
cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
var node *clusterNode
@ -1324,11 +1319,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
node, err = state.slotMasterNode(slot)
}
if err != nil {
return nil, err
return err
}
cmdsMap[node] = append(cmdsMap[node], cmd)
cmdsMap.mu.Lock()
cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
cmdsMap.mu.Unlock()
}
return cmdsMap, nil
return nil
}
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@ -1341,27 +1338,17 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
return true
}
func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
remappedCmds, err := c.mapCmdsByNode(cmds)
if err != nil {
setCmdsErr(cmds, err)
return
}
for node, cmds := range remappedCmds {
failedCmds[node] = cmds
}
}
func (c *ClusterClient) pipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmds...)
})
if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
failedCmds.mu.Lock()
failedCmds.m[node] = cmds
failedCmds.mu.Unlock()
return err
}
@ -1372,7 +1359,7 @@ func (c *ClusterClient) pipelineProcessCmds(
}
func (c *ClusterClient) pipelineReadCmds(
rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
for _, cmd := range cmds {
err := cmd.readReply(rd)
@ -1394,7 +1381,7 @@ func (c *ClusterClient) pipelineReadCmds(
}
func (c *ClusterClient) checkMovedErr(
cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
cmd Cmder, err error, failedCmds *cmdsMap,
) bool {
moved, ask, addr := internal.IsMovedError(err)
@ -1406,7 +1393,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
failedCmds[node] = append(failedCmds[node], cmd)
failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], cmd)
failedCmds.mu.Unlock()
return true
}
@ -1416,7 +1405,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
failedCmds.mu.Lock()
failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
failedCmds.mu.Unlock()
return true
}
@ -1456,17 +1447,22 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
failedCmds := make(map[*clusterNode][]Cmder)
failedCmds := newCmdsMap()
var wg sync.WaitGroup
for node, cmds := range cmdsMap {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
cn, err := node.Client.getConn()
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
c.mapCmdsByNode(cmds, failedCmds)
} else {
setCmdsErr(cmds, err)
}
continue
return
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
@ -1475,12 +1471,14 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
} else {
node.Client.connPool.Remove(cn)
}
}(node, cmds)
}
if len(failedCmds) == 0 {
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds
cmdsMap = failedCmds.m
}
}
@ -1497,14 +1495,16 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
}
func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
return txPipelineWriteMulti(wr, cmds)
})
if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
failedCmds.mu.Lock()
failedCmds.m[node] = cmds
failedCmds.mu.Unlock()
return err
}
@ -1520,7 +1520,7 @@ func (c *ClusterClient) txPipelineProcessCmds(
}
func (c *ClusterClient) txPipelineReadQueued(
rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
// Parse queued replies.
var statusCmd StatusCmd

16
ring.go
View File

@ -592,36 +592,46 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
var mu sync.Mutex
var failedCmdsMap map[string][]Cmder
var wg sync.WaitGroup
for hash, cmds := range cmdsMap {
wg.Add(1)
go func(hash string, cmds []Cmder) {
defer wg.Done()
shard, err := c.shards.GetByHash(hash)
if err != nil {
setCmdsErr(cmds, err)
continue
return
}
cn, err := shard.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
return
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
if err == nil || internal.IsRedisError(err) {
shard.Client.connPool.Put(cn)
continue
return
}
shard.Client.connPool.Remove(cn)
if canRetry && internal.IsRetryableError(err, true) {
mu.Lock()
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
failedCmdsMap[hash] = cmds
mu.Unlock()
}
}(hash, cmds)
}
wg.Wait()
if len(failedCmdsMap) == 0 {
break
}