diff --git a/cluster.go b/cluster.go index bc453e35..c33b5bcb 100644 --- a/cluster.go +++ b/cluster.go @@ -1445,7 +1445,7 @@ func (c *ClusterClient) pubSub() *PubSub { return nil, err } - cn, err := node.Client.newConn() + cn, err := node.Client.newConn(context.TODO()) if err != nil { node = nil diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 32d1cd43..0b64ccfe 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -82,18 +82,28 @@ func (cn *Conn) Close() error { } func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time { - now := time.Now() - cn.SetUsedAt(now) + tm := time.Now() + cn.SetUsedAt(tm) + + if timeout > 0 { + tm = tm.Add(timeout) + } if ctx != nil { - tm, ok := ctx.Deadline() + deadline, ok := ctx.Deadline() if ok { + if timeout == 0 { + return deadline + } + if deadline.Before(tm) { + return deadline + } return tm } } if timeout > 0 { - return now.Add(timeout) + return tm } return noDeadline diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 1094a5b9..09fd5cea 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -34,7 +34,7 @@ type Stats struct { } type Pooler interface { - NewConn() (*Conn, error) + NewConn(context.Context) (*Conn, error) CloseConn(*Conn) error Get(context.Context) (*Conn, error) @@ -115,7 +115,7 @@ func (p *ConnPool) checkMinIdleConns() { } func (p *ConnPool) addIdleConn() { - cn, err := p.newConn(nil, true) + cn, err := p.newConn(context.TODO(), true) if err != nil { return } @@ -126,8 +126,8 @@ func (p *ConnPool) addIdleConn() { p.connsMu.Unlock() } -func (p *ConnPool) NewConn() (*Conn, error) { - return p._NewConn(nil, false) +func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error) { + return p._NewConn(ctx, false) } func (p *ConnPool) _NewConn(ctx context.Context, pooled bool) (*Conn, error) { diff --git a/internal/pool/pool_single.go b/internal/pool/pool_single.go index 778b124c..fda1c8aa 100644 --- a/internal/pool/pool_single.go +++ b/internal/pool/pool_single.go @@ -14,7 +14,7 @@ func NewSingleConnPool(cn *Conn) *SingleConnPool { } } -func (p *SingleConnPool) NewConn() (*Conn, error) { +func (p *SingleConnPool) NewConn(context.Context) (*Conn, error) { panic("not implemented") } diff --git a/internal/pool/pool_sticky.go b/internal/pool/pool_sticky.go index 174dc9c2..d2074d23 100644 --- a/internal/pool/pool_sticky.go +++ b/internal/pool/pool_sticky.go @@ -23,7 +23,7 @@ func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool { } } -func (p *StickyConnPool) NewConn() (*Conn, error) { +func (p *StickyConnPool) NewConn(context.Context) (*Conn, error) { panic("not implemented") } diff --git a/redis.go b/redis.go index d47c2b3a..54b85411 100644 --- a/redis.go +++ b/redis.go @@ -139,8 +139,8 @@ func (c *baseClient) String() string { return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) } -func (c *baseClient) newConn() (*pool.Conn, error) { - cn, err := c.connPool.NewConn() +func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) { + cn, err := c.connPool.NewConn(ctx) if err != nil { return nil, err } @@ -581,7 +581,7 @@ func (c *Client) pubSub() *PubSub { opt: c.opt, newConn: func(channels []string) (*pool.Conn, error) { - return c.newConn() + return c.newConn(context.TODO()) }, closeConn: c.connPool.CloseConn, } diff --git a/sentinel.go b/sentinel.go index 3f13cf6a..331d0048 100644 --- a/sentinel.go +++ b/sentinel.go @@ -150,7 +150,7 @@ func (c *SentinelClient) pubSub() *PubSub { opt: c.opt, newConn: func(channels []string) (*pool.Conn, error) { - return c.newConn() + return c.newConn(context.TODO()) }, closeConn: c.connPool.CloseConn, }