diff --git a/cluster.go b/cluster.go index f962684..c81bad5 100644 --- a/cluster.go +++ b/cluster.go @@ -66,9 +66,10 @@ type ClusterOptions struct { MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - DialTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + ContextTimeoutEnabled bool // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). PoolFIFO bool @@ -1259,14 +1260,14 @@ func (c *ClusterClient) _processPipelineNode( ) { _ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { - if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) }); err != nil { setCmdsErr(cmds, err) return err } - return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds) }) }) @@ -1421,14 +1422,14 @@ func (c *ClusterClient) _processTxPipelineNode( _ = node.Client.hooks.processTxPipeline( ctx, cmds, func(ctx context.Context, cmds []Cmder) error { return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { - if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) }); err != nil { setCmdsErr(cmds, err) return err } - return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { statusCmd := cmds[0].(*StatusCmd) // Trim multi and exec. trimmedCmds := cmds[1 : len(cmds)-1] @@ -1788,6 +1789,13 @@ func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, return node.Client, err } +func (c *ClusterClient) context(ctx context.Context) context.Context { + if c.opt.ContextTimeoutEnabled { + return ctx + } + return context.Background() +} + func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { for _, n := range nodes { if n == node { diff --git a/extra/redisotel/go.mod b/extra/redisotel/go.mod index 885dac2..c3cbdc9 100644 --- a/extra/redisotel/go.mod +++ b/extra/redisotel/go.mod @@ -9,7 +9,7 @@ replace github.com/go-redis/redis/extra/rediscmd/v9 => ../rediscmd require ( github.com/go-redis/redis/extra/rediscmd/v9 v9.0.0-beta.2 github.com/go-redis/redis/v9 v9.0.0-beta.2 - go.opentelemetry.io/otel v1.10.0 - go.opentelemetry.io/otel/sdk v1.10.0 - go.opentelemetry.io/otel/trace v1.10.0 + go.opentelemetry.io/otel v1.9.0 + go.opentelemetry.io/otel/sdk v1.9.0 + go.opentelemetry.io/otel/trace v1.9.0 ) diff --git a/extra/redisotel/go.sum b/extra/redisotel/go.sum index efdc47f..6e7704c 100644 --- a/extra/redisotel/go.sum +++ b/extra/redisotel/go.sum @@ -54,23 +54,25 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.20.0/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= -github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY= -github.com/onsi/gomega v1.20.2/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= +github.com/onsi/gomega v1.21.1 h1:OB/euWYIExnPBohllTicTHmGTrMaqJ67nIu80j0/uEM= +github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4= -go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= -go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY= -go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE= -go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E= -go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= +go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw= +go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo= +go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo= +go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4= +go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc= +go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/pool/conn.go b/internal/pool/conn.go index f160a9f..1682a23 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -63,7 +63,9 @@ func (cn *Conn) RemoteAddr() net.Addr { return nil } -func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { +func (cn *Conn) WithReader( + ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, +) error { if timeout >= 0 { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { return err diff --git a/main_test.go b/main_test.go index b63a09e..e35b17c 100644 --- a/main_test.go +++ b/main_test.go @@ -123,9 +123,10 @@ func redisOptions() *redis.Options { Addr: redisAddr, DB: 15, - DialTimeout: 10 * time.Second, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + DialTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + ContextTimeoutEnabled: true, MaxRetries: -1, diff --git a/options.go b/options.go index 5679128..6c2f853 100644 --- a/options.go +++ b/options.go @@ -83,10 +83,14 @@ type Options struct { // - `-1` - no timeout (block indefinitely). // - `-2` - disables SetWriteDeadline calls completely. WriteTimeout time.Duration + // ContextTimeoutEnabled controls whether the client respects context timeouts and deadlines. + // See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts + ContextTimeoutEnabled bool // Type of connection pool. // true for FIFO pool, false for LIFO pool. - // Note that fifo has higher overhead compared to lifo. + // Note that FIFO has slightly higher overhead compared to LIFO, + // but it helps closing idle connections faster reducing the pool size. PoolFIFO bool // Maximum number of socket connections. // Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS. @@ -100,22 +104,30 @@ type Options struct { MinIdleConns int // Maximum number of idle connections. MaxIdleConns int - // Amount of time after which client closes idle connections. + // ConnMaxIdleTime is the maximum amount of time a connection may be idle. // Should be less than server's timeout. + // + // Expired connections may be closed lazily before reuse. + // If d <= 0, connections are not closed due to a connection's idle time. + // // Default is 5 minutes. -1 disables idle timeout check. ConnMaxIdleTime time.Duration - // Connection age at which client retires (closes) the connection. - // Default is to not close aged connections. + // ConnMaxLifetime is the maximum amount of time a connection may be reused. + // + // Expired connections may be closed lazily before reuse. + // If <= 0, connections are not closed due to a connection's age. + // + // Default is to not close idle connections. ConnMaxLifetime time.Duration - // Enables read only queries on slave nodes. - readOnly bool - - // TLS Config to use. When set TLS will be negotiated. + // TLS Config to use. When set, TLS will be negotiated. TLSConfig *tls.Config - // Limiter interface used to implemented circuit breaker or rate limiter. + // Limiter interface used to implement circuit breaker or rate limiter. Limiter Limiter + + // Enables read only queries on slave/follower nodes. + readOnly bool } func (opt *Options) init() { diff --git a/pubsub.go b/pubsub.go index 6eede91..2bd156d 100644 --- a/pubsub.go +++ b/pubsub.go @@ -24,10 +24,10 @@ type PubSub struct { newConn func(ctx context.Context, channels []string) (*pool.Conn, error) closeConn func(*pool.Conn) error - mu sync.Mutex - cn *pool.Conn - channels map[string]struct{} - patterns map[string]struct{} + mu sync.Mutex + cn *pool.Conn + channels map[string]struct{} + patterns map[string]struct{} schannels map[string]struct{} closed bool @@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er } func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error { - return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { + return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmd) }) } @@ -408,7 +408,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int return nil, err } - err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error { + err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error { return c.cmd.readReply(rd) }) diff --git a/redis.go b/redis.go index 54790a2..a800f45 100644 --- a/redis.go +++ b/redis.go @@ -316,17 +316,15 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool } retryTimeout := uint32(0) - err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { - err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { + if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmd) - }) - if err != nil { + }); err != nil { atomic.StoreUint32(&retryTimeout, 1) return err } - err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply) - if err != nil { + if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil { if cmd.readTimeout() == nil { atomic.StoreUint32(&retryTimeout, 1) } else { @@ -336,13 +334,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool } return nil - }) - if err == nil { - return false, nil + }); err != nil { + retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) + return retry, err } - retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) - return retry, err + return false, nil } func (c *baseClient) retryBackoff(attempt int) time.Duration { @@ -430,14 +427,14 @@ func (c *baseClient) _generalProcessPipeline( func (c *baseClient) pipelineProcessCmds( ctx context.Context, cn *pool.Conn, cmds []Cmder, ) (bool, error) { - if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) }); err != nil { setCmdsErr(cmds, err) return true, err } - if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { return pipelineReadCmds(rd, cmds) }); err != nil { return true, err @@ -462,14 +459,14 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { func (c *baseClient) txPipelineProcessCmds( ctx context.Context, cn *pool.Conn, cmds []Cmder, ) (bool, error) { - if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) }); err != nil { setCmdsErr(cmds, err) return true, err } - if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { + if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { statusCmd := cmds[0].(*StatusCmd) // Trim multi and exec. trimmedCmds := cmds[1 : len(cmds)-1] @@ -527,6 +524,13 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) return nil } +func (c *baseClient) context(ctx context.Context) context.Context { + if c.opt.ContextTimeoutEnabled { + return ctx + } + return context.Background() +} + //------------------------------------------------------------------------------ // Client is a Redis client representing a pool of zero or more underlying connections. diff --git a/sentinel.go b/sentinel.go index bdf1c39..8b5fe88 100644 --- a/sentinel.go +++ b/sentinel.go @@ -59,9 +59,10 @@ type FailoverOptions struct { MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - DialTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + ContextTimeoutEnabled bool PoolFIFO bool @@ -90,9 +91,10 @@ func (opt *FailoverOptions) clientOptions() *Options { MinRetryBackoff: opt.MinRetryBackoff, MaxRetryBackoff: opt.MaxRetryBackoff, - DialTimeout: opt.DialTimeout, - ReadTimeout: opt.ReadTimeout, - WriteTimeout: opt.WriteTimeout, + DialTimeout: opt.DialTimeout, + ReadTimeout: opt.ReadTimeout, + WriteTimeout: opt.WriteTimeout, + ContextTimeoutEnabled: opt.ContextTimeoutEnabled, PoolFIFO: opt.PoolFIFO, PoolSize: opt.PoolSize, diff --git a/universal.go b/universal.go index f800935..73bbf17 100644 --- a/universal.go +++ b/universal.go @@ -32,9 +32,10 @@ type UniversalOptions struct { MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - DialTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + ContextTimeoutEnabled bool // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). PoolFIFO bool @@ -84,9 +85,10 @@ func (o *UniversalOptions) Cluster() *ClusterOptions { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ContextTimeoutEnabled: o.ContextTimeoutEnabled, PoolFIFO: o.PoolFIFO, @@ -124,9 +126,10 @@ func (o *UniversalOptions) Failover() *FailoverOptions { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ContextTimeoutEnabled: o.ContextTimeoutEnabled, PoolFIFO: o.PoolFIFO, PoolSize: o.PoolSize, @@ -160,9 +163,10 @@ func (o *UniversalOptions) Simple() *Options { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ContextTimeoutEnabled: o.ContextTimeoutEnabled, PoolFIFO: o.PoolFIFO, PoolSize: o.PoolSize,