diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c40d5e..1964566 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- Cluster and Ring pipelines process commands for each node in its own goroutine. + ## 6.14 - Added Options.MinIdleConns. diff --git a/cluster.go b/cluster.go index 8cbd447..658f6c9 100644 --- a/cluster.go +++ b/cluster.go @@ -1241,7 +1241,8 @@ func (c *ClusterClient) WrapProcessPipeline( } func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsByNode(cmds) + cmdsMap := newCmdsMap() + err := c.mapCmdsByNode(cmds, cmdsMap) if err != nil { setCmdsErr(cmds, err) return err @@ -1252,51 +1253,35 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) - + failedCmds := newCmdsMap() var wg sync.WaitGroup - var lock sync.RWMutex - for node, cmds := range cmdsMap { + + for node, cmds := range cmdsMap.m { wg.Add(1) go func(node *clusterNode, cmds []Cmder) { defer wg.Done() - failedCmdsTmp := make(map[*clusterNode][]Cmder) - cn, err := node.Client.getConn() if err != nil { if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmdsTmp) + c.mapCmdsByNode(cmds, failedCmds) } else { setCmdsErr(cmds, err) } + return + } + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + node.Client.connPool.Put(cn) } else { - err = c.pipelineProcessCmds(node, cn, cmds, failedCmdsTmp) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + node.Client.connPool.Remove(cn) } - - if len(failedCmdsTmp) > 0 { - for node, cs := range failedCmdsTmp { - lock.Lock() - if _, ok := failedCmds[node]; ok { - failedCmds[node] = append(failedCmds[node], cs...) - } else { - failedCmds[node] = cs - } - lock.Unlock() - } - } - }(node, cmds) } - wg.Wait() - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } cmdsMap = failedCmds @@ -1305,14 +1290,24 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { +type cmdsMap struct { + mu sync.Mutex + m map[*clusterNode][]Cmder +} + +func newCmdsMap() *cmdsMap { + return &cmdsMap{ + m: make(map[*clusterNode][]Cmder), + } +} + +func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { state, err := c.state.Get() if err != nil { setCmdsErr(cmds, err) - return nil, err + return err } - cmdsMap := make(map[*clusterNode][]Cmder) cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { var node *clusterNode @@ -1324,11 +1319,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e node, err = state.slotMasterNode(slot) } if err != nil { - return nil, err + return err } - cmdsMap[node] = append(cmdsMap[node], cmd) + cmdsMap.mu.Lock() + cmdsMap.m[node] = append(cmdsMap.m[node], cmd) + cmdsMap.mu.Unlock() } - return cmdsMap, nil + return nil } func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { @@ -1341,27 +1338,17 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { return true } -func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { - remappedCmds, err := c.mapCmdsByNode(cmds) - if err != nil { - setCmdsErr(cmds, err) - return - } - - for node, cmds := range remappedCmds { - failedCmds[node] = cmds - } -} - func (c *ClusterClient) pipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmds...) }) if err != nil { setCmdsErr(cmds, err) - failedCmds[node] = cmds + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } @@ -1372,7 +1359,7 @@ func (c *ClusterClient) pipelineProcessCmds( } func (c *ClusterClient) pipelineReadCmds( - rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { for _, cmd := range cmds { err := cmd.readReply(rd) @@ -1394,7 +1381,7 @@ func (c *ClusterClient) pipelineReadCmds( } func (c *ClusterClient) checkMovedErr( - cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, + cmd Cmder, err error, failedCmds *cmdsMap, ) bool { moved, ask, addr := internal.IsMovedError(err) @@ -1406,7 +1393,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() return true } @@ -1416,7 +1405,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Unlock() return true } @@ -1456,31 +1447,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) - } - continue - } + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() - err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return + } + + err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) + if err == nil || internal.IsRedisError(err) { + node.Client.connPool.Put(cn) + } else { + node.Client.connPool.Remove(cn) + } + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } - cmdsMap = failedCmds + cmdsMap = failedCmds.m } } @@ -1497,14 +1495,16 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } func (c *ClusterClient) txPipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { return txPipelineWriteMulti(wr, cmds) }) if err != nil { setCmdsErr(cmds, err) - failedCmds[node] = cmds + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } @@ -1520,7 +1520,7 @@ func (c *ClusterClient) txPipelineProcessCmds( } func (c *ClusterClient) txPipelineReadQueued( - rd *proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { // Parse queued replies. var statusCmd StatusCmd diff --git a/ring.go b/ring.go index 61050ca..42a4e6c 100644 --- a/ring.go +++ b/ring.go @@ -592,36 +592,46 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } + var mu sync.Mutex var failedCmdsMap map[string][]Cmder + var wg sync.WaitGroup for hash, cmds := range cmdsMap { - shard, err := c.shards.GetByHash(hash) - if err != nil { - setCmdsErr(cmds, err) - continue - } + wg.Add(1) + go func(hash string, cmds []Cmder) { + defer wg.Done() - cn, err := shard.Client.getConn() - if err != nil { - setCmdsErr(cmds, err) - continue - } - - canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - if err == nil || internal.IsRedisError(err) { - shard.Client.connPool.Put(cn) - continue - } - shard.Client.connPool.Remove(cn) - - if canRetry && internal.IsRetryableError(err, true) { - if failedCmdsMap == nil { - failedCmdsMap = make(map[string][]Cmder) + shard, err := c.shards.GetByHash(hash) + if err != nil { + setCmdsErr(cmds, err) + return } - failedCmdsMap[hash] = cmds - } + + cn, err := shard.Client.getConn() + if err != nil { + setCmdsErr(cmds, err) + return + } + + canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) + if err == nil || internal.IsRedisError(err) { + shard.Client.connPool.Put(cn) + return + } + shard.Client.connPool.Remove(cn) + + if canRetry && internal.IsRetryableError(err, true) { + mu.Lock() + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) + } + failedCmdsMap[hash] = cmds + mu.Unlock() + } + }(hash, cmds) } + wg.Wait() if len(failedCmdsMap) == 0 { break }