mirror of https://github.com/go-redis/redis.git
Merge pull request #1103 from go-redis/fix/pipeline-propagate-ctx
Propagate context in Pipeline
This commit is contained in:
commit
9228e88887
|
@ -1023,6 +1023,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
|
||||||
|
|
||||||
func (c *ClusterClient) Pipeline() Pipeliner {
|
func (c *ClusterClient) Pipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processPipeline,
|
exec: c.processPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
@ -1220,6 +1221,7 @@ func (c *ClusterClient) checkMovedErr(
|
||||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||||
func (c *ClusterClient) TxPipeline() Pipeliner {
|
func (c *ClusterClient) TxPipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processTxPipeline,
|
exec: c.processTxPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
|
|
@ -42,9 +42,7 @@ func appendArgs(dst, src []interface{}) []interface{} {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range src {
|
dst = append(dst, src...)
|
||||||
dst = append(dst, v)
|
|
||||||
}
|
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ type Pipeline struct {
|
||||||
cmdable
|
cmdable
|
||||||
statefulCmdable
|
statefulCmdable
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
exec pipelineExecer
|
exec pipelineExecer
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
@ -98,7 +99,7 @@ func (c *Pipeline) discard() error {
|
||||||
// Exec always returns list of commands and error of the first failed
|
// Exec always returns list of commands and error of the first failed
|
||||||
// command if any.
|
// command if any.
|
||||||
func (c *Pipeline) Exec() ([]Cmder, error) {
|
func (c *Pipeline) Exec() ([]Cmder, error) {
|
||||||
return c.ExecContext(context.Background())
|
return c.ExecContext(c.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Pipeline) ExecContext(ctx context.Context) ([]Cmder, error) {
|
func (c *Pipeline) ExecContext(ctx context.Context) ([]Cmder, error) {
|
||||||
|
|
34
redis.go
34
redis.go
|
@ -136,7 +136,7 @@ func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.initConn(cn)
|
err = c.initConn(ctx, cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = c.connPool.CloseConn(cn)
|
_ = c.connPool.CloseConn(cn)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -169,7 +169,7 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.initConn(cn)
|
err = c.initConn(ctx, cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.connPool.Remove(cn)
|
c.connPool.Remove(cn)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -202,7 +202,7 @@ func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) initConn(cn *pool.Conn) error {
|
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||||
if cn.Inited {
|
if cn.Inited {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -215,7 +215,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
conn := newConn(c.opt, cn)
|
conn := newConn(ctx, c.opt, cn)
|
||||||
_, err := conn.Pipelined(func(pipe Pipeliner) error {
|
_, err := conn.Pipelined(func(pipe Pipeliner) error {
|
||||||
if c.opt.Password != "" {
|
if c.opt.Password != "" {
|
||||||
pipe.Auth(c.opt.Password)
|
pipe.Auth(c.opt.Password)
|
||||||
|
@ -547,6 +547,7 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
|
|
||||||
func (c *Client) Pipeline() Pipeliner {
|
func (c *Client) Pipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processPipeline,
|
exec: c.processPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
@ -560,6 +561,7 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||||
func (c *Client) TxPipeline() Pipeliner {
|
func (c *Client) TxPipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processTxPipeline,
|
exec: c.processTxPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
@ -625,19 +627,27 @@ func (c *Client) PSubscribe(channels ...string) *PubSub {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// Conn is like Client, but its pool contains single connection.
|
type conn struct {
|
||||||
type Conn struct {
|
|
||||||
baseClient
|
baseClient
|
||||||
cmdable
|
cmdable
|
||||||
statefulCmdable
|
statefulCmdable
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConn(opt *Options, cn *pool.Conn) *Conn {
|
// Conn is like Client, but its pool contains single connection.
|
||||||
|
type Conn struct {
|
||||||
|
*conn
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConn(ctx context.Context, opt *Options, cn *pool.Conn) *Conn {
|
||||||
c := Conn{
|
c := Conn{
|
||||||
baseClient: baseClient{
|
conn: &conn{
|
||||||
opt: opt,
|
baseClient: baseClient{
|
||||||
connPool: pool.NewSingleConnPool(cn),
|
opt: opt,
|
||||||
|
connPool: pool.NewSingleConnPool(cn),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
ctx: ctx,
|
||||||
}
|
}
|
||||||
c.cmdable = c.Process
|
c.cmdable = c.Process
|
||||||
c.statefulCmdable = c.Process
|
c.statefulCmdable = c.Process
|
||||||
|
@ -645,7 +655,7 @@ func newConn(opt *Options, cn *pool.Conn) *Conn {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Process(cmd Cmder) error {
|
func (c *Conn) Process(cmd Cmder) error {
|
||||||
return c.ProcessContext(context.TODO(), cmd)
|
return c.ProcessContext(c.ctx, cmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error {
|
func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error {
|
||||||
|
@ -658,6 +668,7 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
|
|
||||||
func (c *Conn) Pipeline() Pipeliner {
|
func (c *Conn) Pipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processPipeline,
|
exec: c.processPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
@ -671,6 +682,7 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||||
func (c *Conn) TxPipeline() Pipeliner {
|
func (c *Conn) TxPipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processTxPipeline,
|
exec: c.processTxPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
|
2
ring.go
2
ring.go
|
@ -579,6 +579,7 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
|
|
||||||
func (c *Ring) Pipeline() Pipeliner {
|
func (c *Ring) Pipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processPipeline,
|
exec: c.processPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
@ -597,6 +598,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
|
|
||||||
func (c *Ring) TxPipeline() Pipeliner {
|
func (c *Ring) TxPipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processTxPipeline,
|
exec: c.processTxPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
|
5
tx.go
5
tx.go
|
@ -93,7 +93,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd {
|
||||||
args[1+i] = key
|
args[1+i] = key
|
||||||
}
|
}
|
||||||
cmd := NewStatusCmd(args...)
|
cmd := NewStatusCmd(args...)
|
||||||
c.Process(cmd)
|
_ = c.Process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,13 +105,14 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
|
||||||
args[1+i] = key
|
args[1+i] = key
|
||||||
}
|
}
|
||||||
cmd := NewStatusCmd(args...)
|
cmd := NewStatusCmd(args...)
|
||||||
c.Process(cmd)
|
_ = c.Process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pipeline creates a new pipeline. It is more convenient to use Pipelined.
|
// Pipeline creates a new pipeline. It is more convenient to use Pipelined.
|
||||||
func (c *Tx) Pipeline() Pipeliner {
|
func (c *Tx) Pipeline() Pipeliner {
|
||||||
pipe := Pipeline{
|
pipe := Pipeline{
|
||||||
|
ctx: c.ctx,
|
||||||
exec: c.processTxPipeline,
|
exec: c.processTxPipeline,
|
||||||
}
|
}
|
||||||
pipe.init()
|
pipe.init()
|
||||||
|
|
Loading…
Reference in New Issue