diff --git a/cluster.go b/cluster.go index b5463f43..26a668ca 100644 --- a/cluster.go +++ b/cluster.go @@ -1061,7 +1061,7 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro cn, err := node.Client.getConn(ctx) if err != nil { if err == pool.ErrClosed { - c.mapCmdsByNode(cmds, failedCmds) + _ = c.mapCmdsByNode(cmds, failedCmds) } else { setCmdsErr(cmds, err) } @@ -1265,7 +1265,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er cn, err := node.Client.getConn(ctx) if err != nil { if err == pool.ErrClosed { - c.mapCmdsByNode(cmds, failedCmds) + _ = c.mapCmdsByNode(cmds, failedCmds) } else { setCmdsErr(cmds, err) } diff --git a/ring.go b/ring.go index f7bda31a..f8b5e5a1 100644 --- a/ring.go +++ b/ring.go @@ -586,10 +586,32 @@ func (c *Ring) Pipeline() Pipeliner { } func (c *Ring) processPipeline(ctx context.Context, cmds []Cmder) error { - return c.hooks.processPipeline(ctx, cmds, c._processPipeline) + return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { + return c.generalProcessPipeline(ctx, cmds, false) + }) } -func (c *Ring) _processPipeline(ctx context.Context, cmds []Cmder) error { +func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { + return c.TxPipeline().Pipelined(fn) +} + +func (c *Ring) TxPipeline() Pipeliner { + pipe := Pipeline{ + exec: c.processTxPipeline, + } + pipe.init() + return &pipe +} + +func (c *Ring) processTxPipeline(ctx context.Context, cmds []Cmder) error { + return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { + return c.generalProcessPipeline(ctx, cmds, true) + }) +} + +func (c *Ring) generalProcessPipeline( + ctx context.Context, cmds []Cmder, tx bool, +) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) @@ -626,7 +648,12 @@ func (c *Ring) _processPipeline(ctx context.Context, cmds []Cmder) error { return } - canRetry, err := shard.Client.pipelineProcessCmds(ctx, cn, cmds) + var canRetry bool + if tx { + canRetry, err = shard.Client.txPipelineProcessCmds(ctx, cn, cmds) + } else { + canRetry, err = shard.Client.pipelineProcessCmds(ctx, cn, cmds) + } shard.Client.releaseConnStrict(cn, err) if canRetry && internal.IsRetryableError(err, true) { @@ -650,14 +677,6 @@ func (c *Ring) _processPipeline(ctx context.Context, cmds []Cmder) error { return cmdsFirstErr(cmds) } -func (c *Ring) TxPipeline() Pipeliner { - panic("not implemented") -} - -func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { - panic("not implemented") -} - // Close closes the ring client, releasing any open resources. // // It is rare to Close a Ring, as the Ring is meant to be long-lived