Merge pull request #1596 from go-redis/fix/tx-pipeline-hook

Fix Tx pipeline hook
This commit is contained in:
Vladimir Mihailenco 2021-01-09 10:30:36 +02:00 committed by GitHub
commit 9481d69402
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 5 deletions

View File

@ -1256,10 +1256,13 @@ func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) erro
} }
func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline) return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
} }
func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error { func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
// Trim multi .. exec.
cmds = cmds[1 : len(cmds)-1]
state, err := c.state.Get(ctx) state, err := c.state.Get(ctx)
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
@ -1295,6 +1298,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
if err == nil { if err == nil {
return return
} }
if attempt < c.opt.MaxRedirects { if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil { if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)

View File

@ -820,14 +820,14 @@ var _ = Describe("ClusterClient", func() {
client.AddHook(&hook{ client.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(3))
Expect(cmds[0].String()).To(Equal("ping: ")) Expect(cmds[1].String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcessPipeline") stack = append(stack, "cluster.BeforeProcessPipeline")
return ctx, nil return ctx, nil
}, },
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error { afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(3))
Expect(cmds[0].String()).To(Equal("ping: PONG")) Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcessPipeline") stack = append(stack, "cluster.AfterProcessPipeline")
return nil return nil
}, },

View File

@ -477,6 +477,7 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
p.connsMu.Lock() p.connsMu.Lock()
cn := p.reapStaleConn() cn := p.reapStaleConn()
p.connsMu.Unlock() p.connsMu.Unlock()
p.freeTurn() p.freeTurn()
if cn != nil { if cn != nil {