From 52ec5258496cb79171f0cd6c213fbed8dcb1a25e Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 25 Jul 2019 13:28:15 +0300 Subject: [PATCH] Propagate context in Pipeline --- cluster.go | 2 ++ commands.go | 4 +--- pipeline.go | 3 ++- redis.go | 34 +++++++++++++++++++++++----------- ring.go | 2 ++ tx.go | 5 +++-- 6 files changed, 33 insertions(+), 17 deletions(-) diff --git a/cluster.go b/cluster.go index 26a668ca..932d3d6c 100644 --- a/cluster.go +++ b/cluster.go @@ -1023,6 +1023,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { func (c *ClusterClient) Pipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processPipeline, } pipe.init() @@ -1220,6 +1221,7 @@ func (c *ClusterClient) checkMovedErr( // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *ClusterClient) TxPipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processTxPipeline, } pipe.init() diff --git a/commands.go b/commands.go index 710aef2b..26af5599 100644 --- a/commands.go +++ b/commands.go @@ -42,9 +42,7 @@ func appendArgs(dst, src []interface{}) []interface{} { } } - for _, v := range src { - dst = append(dst, v) - } + dst = append(dst, src...) return dst } diff --git a/pipeline.go b/pipeline.go index 8b9147ea..3c861cdc 100644 --- a/pipeline.go +++ b/pipeline.go @@ -41,6 +41,7 @@ type Pipeline struct { cmdable statefulCmdable + ctx context.Context exec pipelineExecer mu sync.Mutex @@ -98,7 +99,7 @@ func (c *Pipeline) discard() error { // Exec always returns list of commands and error of the first failed // command if any. func (c *Pipeline) Exec() ([]Cmder, error) { - return c.ExecContext(context.Background()) + return c.ExecContext(c.ctx) } func (c *Pipeline) ExecContext(ctx context.Context) ([]Cmder, error) { diff --git a/redis.go b/redis.go index 10200bbd..abdda7e9 100644 --- a/redis.go +++ b/redis.go @@ -136,7 +136,7 @@ func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) { return nil, err } - err = c.initConn(cn) + err = c.initConn(ctx, cn) if err != nil { _ = c.connPool.CloseConn(cn) return nil, err @@ -169,7 +169,7 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { return nil, err } - err = c.initConn(cn) + err = c.initConn(ctx, cn) if err != nil { c.connPool.Remove(cn) return nil, err @@ -202,7 +202,7 @@ func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { } } -func (c *baseClient) initConn(cn *pool.Conn) error { +func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { if cn.Inited { return nil } @@ -215,7 +215,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } - conn := newConn(c.opt, cn) + conn := newConn(ctx, c.opt, cn) _, err := conn.Pipelined(func(pipe Pipeliner) error { if c.opt.Password != "" { pipe.Auth(c.opt.Password) @@ -547,6 +547,7 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Client) Pipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processPipeline, } pipe.init() @@ -560,6 +561,7 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Client) TxPipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processTxPipeline, } pipe.init() @@ -625,19 +627,27 @@ func (c *Client) PSubscribe(channels ...string) *PubSub { //------------------------------------------------------------------------------ -// Conn is like Client, but its pool contains single connection. -type Conn struct { +type conn struct { baseClient cmdable statefulCmdable } -func newConn(opt *Options, cn *pool.Conn) *Conn { +// Conn is like Client, but its pool contains single connection. +type Conn struct { + *conn + ctx context.Context +} + +func newConn(ctx context.Context, opt *Options, cn *pool.Conn) *Conn { c := Conn{ - baseClient: baseClient{ - opt: opt, - connPool: pool.NewSingleConnPool(cn), + conn: &conn{ + baseClient: baseClient{ + opt: opt, + connPool: pool.NewSingleConnPool(cn), + }, }, + ctx: ctx, } c.cmdable = c.Process c.statefulCmdable = c.Process @@ -645,7 +655,7 @@ func newConn(opt *Options, cn *pool.Conn) *Conn { } func (c *Conn) Process(cmd Cmder) error { - return c.ProcessContext(context.TODO(), cmd) + return c.ProcessContext(c.ctx, cmd) } func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error { @@ -658,6 +668,7 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Conn) Pipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processPipeline, } pipe.init() @@ -671,6 +682,7 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC. func (c *Conn) TxPipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processTxPipeline, } pipe.init() diff --git a/ring.go b/ring.go index f8b5e5a1..df71cc80 100644 --- a/ring.go +++ b/ring.go @@ -579,6 +579,7 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processPipeline, } pipe.init() @@ -597,6 +598,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { func (c *Ring) TxPipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processTxPipeline, } pipe.init() diff --git a/tx.go b/tx.go index bcaebde4..38a88555 100644 --- a/tx.go +++ b/tx.go @@ -93,7 +93,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd { args[1+i] = key } cmd := NewStatusCmd(args...) - c.Process(cmd) + _ = c.Process(cmd) return cmd } @@ -105,13 +105,14 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd { args[1+i] = key } cmd := NewStatusCmd(args...) - c.Process(cmd) + _ = c.Process(cmd) return cmd } // Pipeline creates a new pipeline. It is more convenient to use Pipelined. func (c *Tx) Pipeline() Pipeliner { pipe := Pipeline{ + ctx: c.ctx, exec: c.processTxPipeline, } pipe.init()