From 76fd0eac615cf06adf896b539f6d89460969f311 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 9 Jan 2021 09:27:42 +0200 Subject: [PATCH] Fix Tx pipeline hook --- cluster.go | 6 +++++- cluster_test.go | 8 ++++---- internal/pool/pool.go | 1 + 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cluster.go b/cluster.go index c40e18d1..d8cd1d6b 100644 --- a/cluster.go +++ b/cluster.go @@ -1256,10 +1256,13 @@ func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) erro } func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { - return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline) + return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline) } func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error { + // Trim multi .. exec. + cmds = cmds[1 : len(cmds)-1] + state, err := c.state.Get(ctx) if err != nil { setCmdsErr(cmds, err) @@ -1295,6 +1298,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er if err == nil { return } + if attempt < c.opt.MaxRedirects { if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil { setCmdsErr(cmds, err) diff --git a/cluster_test.go b/cluster_test.go index 27d2a1ca..561832a0 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -820,14 +820,14 @@ var _ = Describe("ClusterClient", func() { client.AddHook(&hook{ beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { - Expect(cmds).To(HaveLen(1)) - Expect(cmds[0].String()).To(Equal("ping: ")) + Expect(cmds).To(HaveLen(3)) + Expect(cmds[1].String()).To(Equal("ping: ")) stack = append(stack, "cluster.BeforeProcessPipeline") return ctx, nil }, afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error { - Expect(cmds).To(HaveLen(1)) - Expect(cmds[0].String()).To(Equal("ping: PONG")) + Expect(cmds).To(HaveLen(3)) + Expect(cmds[1].String()).To(Equal("ping: PONG")) stack = append(stack, "cluster.AfterProcessPipeline") return nil }, diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 355742bf..254a18de 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -477,6 +477,7 @@ func (p *ConnPool) ReapStaleConns() (int, error) { p.connsMu.Lock() cn := p.reapStaleConn() p.connsMu.Unlock() + p.freeTurn() if cn != nil {