feat: add ContextTimeoutEnabled to respect context timeouts and deadlines

This commit is contained in:
Vladimir Mihailenco 2022-10-11 10:22:42 +03:00
parent 8319b1ebf0
commit 58f7149e38
8 changed files with 92 additions and 59 deletions

View File

@ -69,6 +69,7 @@ type ClusterOptions struct {
DialTimeout time.Duration DialTimeout time.Duration
ReadTimeout time.Duration ReadTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
ContextTimeoutEnabled bool
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool PoolFIFO bool
@ -1259,14 +1260,14 @@ func (c *ClusterClient) _processPipelineNode(
) { ) {
_ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { _ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
} }
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds) return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
}) })
}) })
@ -1421,14 +1422,14 @@ func (c *ClusterClient) _processTxPipelineNode(
_ = node.Client.hooks.processTxPipeline( _ = node.Client.hooks.processTxPipeline(
ctx, cmds, func(ctx context.Context, cmds []Cmder) error { ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
} }
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
statusCmd := cmds[0].(*StatusCmd) statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec. // Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1] trimmedCmds := cmds[1 : len(cmds)-1]
@ -1788,6 +1789,13 @@ func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client,
return node.Client, err return node.Client, err
} }
func (c *ClusterClient) context(ctx context.Context) context.Context {
if c.opt.ContextTimeoutEnabled {
return ctx
}
return context.Background()
}
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes { for _, n := range nodes {
if n == node { if n == node {

View File

@ -63,7 +63,9 @@ func (cn *Conn) RemoteAddr() net.Addr {
return nil return nil
} }
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
if timeout >= 0 { if timeout >= 0 {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err return err

View File

@ -126,6 +126,7 @@ func redisOptions() *redis.Options {
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
ContextTimeoutEnabled: true,
MaxRetries: -1, MaxRetries: -1,

View File

@ -83,10 +83,14 @@ type Options struct {
// - `-1` - no timeout (block indefinitely). // - `-1` - no timeout (block indefinitely).
// - `-2` - disables SetWriteDeadline calls completely. // - `-2` - disables SetWriteDeadline calls completely.
WriteTimeout time.Duration WriteTimeout time.Duration
// ContextTimeoutEnabled controls whether the client respects context timeouts and deadlines.
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
ContextTimeoutEnabled bool
// Type of connection pool. // Type of connection pool.
// true for FIFO pool, false for LIFO pool. // true for FIFO pool, false for LIFO pool.
// Note that fifo has higher overhead compared to lifo. // Note that FIFO has slightly higher overhead compared to LIFO,
// but it helps closing idle connections faster reducing the pool size.
PoolFIFO bool PoolFIFO bool
// Maximum number of socket connections. // Maximum number of socket connections.
// Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS. // Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS.
@ -100,22 +104,30 @@ type Options struct {
MinIdleConns int MinIdleConns int
// Maximum number of idle connections. // Maximum number of idle connections.
MaxIdleConns int MaxIdleConns int
// Amount of time after which client closes idle connections. // ConnMaxIdleTime is the maximum amount of time a connection may be idle.
// Should be less than server's timeout. // Should be less than server's timeout.
//
// Expired connections may be closed lazily before reuse.
// If d <= 0, connections are not closed due to a connection's idle time.
//
// Default is 5 minutes. -1 disables idle timeout check. // Default is 5 minutes. -1 disables idle timeout check.
ConnMaxIdleTime time.Duration ConnMaxIdleTime time.Duration
// Connection age at which client retires (closes) the connection. // ConnMaxLifetime is the maximum amount of time a connection may be reused.
// Default is to not close aged connections. //
// Expired connections may be closed lazily before reuse.
// If <= 0, connections are not closed due to a connection's age.
//
// Default is to not close idle connections.
ConnMaxLifetime time.Duration ConnMaxLifetime time.Duration
// Enables read only queries on slave nodes. // TLS Config to use. When set, TLS will be negotiated.
readOnly bool
// TLS Config to use. When set TLS will be negotiated.
TLSConfig *tls.Config TLSConfig *tls.Config
// Limiter interface used to implemented circuit breaker or rate limiter. // Limiter interface used to implement circuit breaker or rate limiter.
Limiter Limiter Limiter Limiter
// Enables read only queries on slave/follower nodes.
readOnly bool
} }
func (opt *Options) init() { func (opt *Options) init() {

View File

@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er
} }
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error { func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd) return writeCmd(wr, cmd)
}) })
} }
@ -408,7 +408,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
return nil, err return nil, err
} }
err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error { err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
return c.cmd.readReply(rd) return c.cmd.readReply(rd)
}) })

View File

@ -316,17 +316,15 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
} }
retryTimeout := uint32(0) retryTimeout := uint32(0)
err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd) return writeCmd(wr, cmd)
}) }); err != nil {
if err != nil {
atomic.StoreUint32(&retryTimeout, 1) atomic.StoreUint32(&retryTimeout, 1)
return err return err
} }
err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply) if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
if err != nil {
if cmd.readTimeout() == nil { if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1) atomic.StoreUint32(&retryTimeout, 1)
} else { } else {
@ -336,13 +334,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
} }
return nil return nil
}) }); err != nil {
if err == nil {
return false, nil
}
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
return retry, err return retry, err
}
return false, nil
} }
func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) retryBackoff(attempt int) time.Duration {
@ -430,14 +427,14 @@ func (c *baseClient) _generalProcessPipeline(
func (c *baseClient) pipelineProcessCmds( func (c *baseClient) pipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder, ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) { ) (bool, error) {
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return true, err return true, err
} }
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
return pipelineReadCmds(rd, cmds) return pipelineReadCmds(rd, cmds)
}); err != nil { }); err != nil {
return true, err return true, err
@ -462,14 +459,14 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
func (c *baseClient) txPipelineProcessCmds( func (c *baseClient) txPipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder, ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) { ) (bool, error) {
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return true, err return true, err
} }
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
statusCmd := cmds[0].(*StatusCmd) statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec. // Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1] trimmedCmds := cmds[1 : len(cmds)-1]
@ -527,6 +524,13 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
return nil return nil
} }
func (c *baseClient) context(ctx context.Context) context.Context {
if c.opt.ContextTimeoutEnabled {
return ctx
}
return context.Background()
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Client is a Redis client representing a pool of zero or more underlying connections. // Client is a Redis client representing a pool of zero or more underlying connections.

View File

@ -62,6 +62,7 @@ type FailoverOptions struct {
DialTimeout time.Duration DialTimeout time.Duration
ReadTimeout time.Duration ReadTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
ContextTimeoutEnabled bool
PoolFIFO bool PoolFIFO bool
@ -93,6 +94,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
DialTimeout: opt.DialTimeout, DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout, ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout, WriteTimeout: opt.WriteTimeout,
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,

View File

@ -35,6 +35,7 @@ type UniversalOptions struct {
DialTimeout time.Duration DialTimeout time.Duration
ReadTimeout time.Duration ReadTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
ContextTimeoutEnabled bool
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool PoolFIFO bool
@ -87,6 +88,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
DialTimeout: o.DialTimeout, DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout, ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout, WriteTimeout: o.WriteTimeout,
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
PoolFIFO: o.PoolFIFO, PoolFIFO: o.PoolFIFO,
@ -127,6 +129,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
DialTimeout: o.DialTimeout, DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout, ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout, WriteTimeout: o.WriteTimeout,
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
PoolFIFO: o.PoolFIFO, PoolFIFO: o.PoolFIFO,
PoolSize: o.PoolSize, PoolSize: o.PoolSize,
@ -163,6 +166,7 @@ func (o *UniversalOptions) Simple() *Options {
DialTimeout: o.DialTimeout, DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout, ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout, WriteTimeout: o.WriteTimeout,
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
PoolFIFO: o.PoolFIFO, PoolFIFO: o.PoolFIFO,
PoolSize: o.PoolSize, PoolSize: o.PoolSize,