forked from mirror/redis
Merge pull request #2243 from go-redis/feat/context-timeout-enabled
feat: add ContextTimeoutEnabled to respect context timeouts and deadl…
This commit is contained in:
commit
d07457b6cf
16
cluster.go
16
cluster.go
|
@ -69,6 +69,7 @@ type ClusterOptions struct {
|
|||
DialTimeout time.Duration
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
ContextTimeoutEnabled bool
|
||||
|
||||
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
|
||||
PoolFIFO bool
|
||||
|
@ -1259,14 +1260,14 @@ func (c *ClusterClient) _processPipelineNode(
|
|||
) {
|
||||
_ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
|
||||
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmds(wr, cmds)
|
||||
}); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
|
||||
})
|
||||
})
|
||||
|
@ -1421,14 +1422,14 @@ func (c *ClusterClient) _processTxPipelineNode(
|
|||
_ = node.Client.hooks.processTxPipeline(
|
||||
ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
|
||||
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmds(wr, cmds)
|
||||
}); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
statusCmd := cmds[0].(*StatusCmd)
|
||||
// Trim multi and exec.
|
||||
trimmedCmds := cmds[1 : len(cmds)-1]
|
||||
|
@ -1788,6 +1789,13 @@ func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client,
|
|||
return node.Client, err
|
||||
}
|
||||
|
||||
func (c *ClusterClient) context(ctx context.Context) context.Context {
|
||||
if c.opt.ContextTimeoutEnabled {
|
||||
return ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
|
||||
for _, n := range nodes {
|
||||
if n == node {
|
||||
|
|
|
@ -9,7 +9,7 @@ replace github.com/go-redis/redis/extra/rediscmd/v9 => ../rediscmd
|
|||
require (
|
||||
github.com/go-redis/redis/extra/rediscmd/v9 v9.0.0-beta.2
|
||||
github.com/go-redis/redis/v9 v9.0.0-beta.2
|
||||
go.opentelemetry.io/otel v1.10.0
|
||||
go.opentelemetry.io/otel/sdk v1.10.0
|
||||
go.opentelemetry.io/otel/trace v1.10.0
|
||||
go.opentelemetry.io/otel v1.9.0
|
||||
go.opentelemetry.io/otel/sdk v1.9.0
|
||||
go.opentelemetry.io/otel/trace v1.9.0
|
||||
)
|
||||
|
|
|
@ -54,23 +54,25 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl
|
|||
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
|
||||
github.com/onsi/gomega v1.20.0/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo=
|
||||
github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo=
|
||||
github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY=
|
||||
github.com/onsi/gomega v1.20.2/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc=
|
||||
github.com/onsi/gomega v1.21.1 h1:OB/euWYIExnPBohllTicTHmGTrMaqJ67nIu80j0/uEM=
|
||||
github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4=
|
||||
go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ=
|
||||
go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY=
|
||||
go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE=
|
||||
go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E=
|
||||
go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM=
|
||||
go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw=
|
||||
go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo=
|
||||
go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo=
|
||||
go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4=
|
||||
go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc=
|
||||
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
|
|
|
@ -63,7 +63,9 @@ func (cn *Conn) RemoteAddr() net.Addr {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
|
||||
func (cn *Conn) WithReader(
|
||||
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
|
||||
) error {
|
||||
if timeout >= 0 {
|
||||
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
|
||||
return err
|
||||
|
|
|
@ -126,6 +126,7 @@ func redisOptions() *redis.Options {
|
|||
DialTimeout: 10 * time.Second,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
ContextTimeoutEnabled: true,
|
||||
|
||||
MaxRetries: -1,
|
||||
|
||||
|
|
30
options.go
30
options.go
|
@ -83,10 +83,14 @@ type Options struct {
|
|||
// - `-1` - no timeout (block indefinitely).
|
||||
// - `-2` - disables SetWriteDeadline calls completely.
|
||||
WriteTimeout time.Duration
|
||||
// ContextTimeoutEnabled controls whether the client respects context timeouts and deadlines.
|
||||
// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
|
||||
ContextTimeoutEnabled bool
|
||||
|
||||
// Type of connection pool.
|
||||
// true for FIFO pool, false for LIFO pool.
|
||||
// Note that fifo has higher overhead compared to lifo.
|
||||
// Note that FIFO has slightly higher overhead compared to LIFO,
|
||||
// but it helps closing idle connections faster reducing the pool size.
|
||||
PoolFIFO bool
|
||||
// Maximum number of socket connections.
|
||||
// Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS.
|
||||
|
@ -100,22 +104,30 @@ type Options struct {
|
|||
MinIdleConns int
|
||||
// Maximum number of idle connections.
|
||||
MaxIdleConns int
|
||||
// Amount of time after which client closes idle connections.
|
||||
// ConnMaxIdleTime is the maximum amount of time a connection may be idle.
|
||||
// Should be less than server's timeout.
|
||||
//
|
||||
// Expired connections may be closed lazily before reuse.
|
||||
// If d <= 0, connections are not closed due to a connection's idle time.
|
||||
//
|
||||
// Default is 5 minutes. -1 disables idle timeout check.
|
||||
ConnMaxIdleTime time.Duration
|
||||
// Connection age at which client retires (closes) the connection.
|
||||
// Default is to not close aged connections.
|
||||
// ConnMaxLifetime is the maximum amount of time a connection may be reused.
|
||||
//
|
||||
// Expired connections may be closed lazily before reuse.
|
||||
// If <= 0, connections are not closed due to a connection's age.
|
||||
//
|
||||
// Default is to not close idle connections.
|
||||
ConnMaxLifetime time.Duration
|
||||
|
||||
// Enables read only queries on slave nodes.
|
||||
readOnly bool
|
||||
|
||||
// TLS Config to use. When set TLS will be negotiated.
|
||||
// TLS Config to use. When set, TLS will be negotiated.
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// Limiter interface used to implemented circuit breaker or rate limiter.
|
||||
// Limiter interface used to implement circuit breaker or rate limiter.
|
||||
Limiter Limiter
|
||||
|
||||
// Enables read only queries on slave/follower nodes.
|
||||
readOnly bool
|
||||
}
|
||||
|
||||
func (opt *Options) init() {
|
||||
|
|
|
@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er
|
|||
}
|
||||
|
||||
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
|
||||
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmd(wr, cmd)
|
||||
})
|
||||
}
|
||||
|
@ -408,7 +408,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error {
|
||||
err = cn.WithReader(context.Background(), timeout, func(rd *proto.Reader) error {
|
||||
return c.cmd.readReply(rd)
|
||||
})
|
||||
|
||||
|
|
34
redis.go
34
redis.go
|
@ -316,17 +316,15 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
|||
}
|
||||
|
||||
retryTimeout := uint32(0)
|
||||
err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmd(wr, cmd)
|
||||
})
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
atomic.StoreUint32(&retryTimeout, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
|
||||
if err != nil {
|
||||
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
|
||||
if cmd.readTimeout() == nil {
|
||||
atomic.StoreUint32(&retryTimeout, 1)
|
||||
} else {
|
||||
|
@ -336,15 +334,14 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
|||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
}); err != nil {
|
||||
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
|
||||
return retry, err
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *baseClient) retryBackoff(attempt int) time.Duration {
|
||||
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
|
||||
}
|
||||
|
@ -430,14 +427,14 @@ func (c *baseClient) _generalProcessPipeline(
|
|||
func (c *baseClient) pipelineProcessCmds(
|
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
||||
) (bool, error) {
|
||||
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmds(wr, cmds)
|
||||
}); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return true, err
|
||||
}
|
||||
|
||||
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
return pipelineReadCmds(rd, cmds)
|
||||
}); err != nil {
|
||||
return true, err
|
||||
|
@ -462,14 +459,14 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
|
|||
func (c *baseClient) txPipelineProcessCmds(
|
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
||||
) (bool, error) {
|
||||
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmds(wr, cmds)
|
||||
}); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return true, err
|
||||
}
|
||||
|
||||
if err := cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
||||
statusCmd := cmds[0].(*StatusCmd)
|
||||
// Trim multi and exec.
|
||||
trimmedCmds := cmds[1 : len(cmds)-1]
|
||||
|
@ -527,6 +524,13 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *baseClient) context(ctx context.Context) context.Context {
|
||||
if c.opt.ContextTimeoutEnabled {
|
||||
return ctx
|
||||
}
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Client is a Redis client representing a pool of zero or more underlying connections.
|
||||
|
|
|
@ -62,6 +62,7 @@ type FailoverOptions struct {
|
|||
DialTimeout time.Duration
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
ContextTimeoutEnabled bool
|
||||
|
||||
PoolFIFO bool
|
||||
|
||||
|
@ -93,6 +94,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
|
|||
DialTimeout: opt.DialTimeout,
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
|
||||
|
||||
PoolFIFO: opt.PoolFIFO,
|
||||
PoolSize: opt.PoolSize,
|
||||
|
|
|
@ -35,6 +35,7 @@ type UniversalOptions struct {
|
|||
DialTimeout time.Duration
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
ContextTimeoutEnabled bool
|
||||
|
||||
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
|
||||
PoolFIFO bool
|
||||
|
@ -87,6 +88,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
|
|||
DialTimeout: o.DialTimeout,
|
||||
ReadTimeout: o.ReadTimeout,
|
||||
WriteTimeout: o.WriteTimeout,
|
||||
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
|
||||
|
||||
PoolFIFO: o.PoolFIFO,
|
||||
|
||||
|
@ -127,6 +129,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
|
|||
DialTimeout: o.DialTimeout,
|
||||
ReadTimeout: o.ReadTimeout,
|
||||
WriteTimeout: o.WriteTimeout,
|
||||
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
|
||||
|
||||
PoolFIFO: o.PoolFIFO,
|
||||
PoolSize: o.PoolSize,
|
||||
|
@ -163,6 +166,7 @@ func (o *UniversalOptions) Simple() *Options {
|
|||
DialTimeout: o.DialTimeout,
|
||||
ReadTimeout: o.ReadTimeout,
|
||||
WriteTimeout: o.WriteTimeout,
|
||||
ContextTimeoutEnabled: o.ContextTimeoutEnabled,
|
||||
|
||||
PoolFIFO: o.PoolFIFO,
|
||||
PoolSize: o.PoolSize,
|
||||
|
|
Loading…
Reference in New Issue