diff --git a/cluster.go b/cluster.go index b693fbe3..6d311f05 100644 --- a/cluster.go +++ b/cluster.go @@ -754,7 +754,9 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { var err error if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } if node == nil { @@ -1049,7 +1051,9 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } failedCmds := newCmdsMap() @@ -1254,7 +1258,9 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } failedCmds := newCmdsMap() @@ -1376,6 +1382,10 @@ func (c *ClusterClient) txPipelineReadQueued( } func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { + return c.WatchContext(c.ctx, fn, keys...) +} + +func (c *ClusterClient) WatchContext(ctx context.Context, fn func(*Tx) error, keys ...string) error { if len(keys) == 0 { return fmt.Errorf("redis: Watch requires at least one key") } @@ -1395,10 +1405,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } - err = node.Client.Watch(fn, keys...) + err = node.Client.WatchContext(ctx, fn, keys...) if err == nil { break } diff --git a/internal/error.go b/internal/error.go index f9c9ebb6..cd90d0d8 100644 --- a/internal/error.go +++ b/internal/error.go @@ -1,6 +1,7 @@ package internal import ( + "context" "io" "net" "strings" @@ -9,10 +10,10 @@ import ( ) func IsRetryableError(err error, retryTimeout bool) bool { - if err == nil { + switch err { + case nil, context.Canceled, context.DeadlineExceeded: return false - } - if err == io.EOF { + case io.EOF: return true } if netErr, ok := err.(net.Error); ok { diff --git a/internal/util.go b/internal/util.go index ffd2353e..6e471400 100644 --- a/internal/util.go +++ b/internal/util.go @@ -1,6 +1,23 @@ package internal -import "github.com/go-redis/redis/internal/util" +import ( + "context" + "time" + + "github.com/go-redis/redis/internal/util" +) + +func Sleep(ctx context.Context, dur time.Duration) error { + t := time.NewTimer(dur) + defer t.Stop() + + select { + case <-t.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} func ToLower(s string) string { if isLower(s) { diff --git a/redis.go b/redis.go index abdda7e9..e2bf5e91 100644 --- a/redis.go +++ b/redis.go @@ -244,7 +244,9 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { func (c *baseClient) process(ctx context.Context, cmd Cmder) error { for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } cn, err := c.getConn(ctx) @@ -331,7 +333,9 @@ func (c *baseClient) generalProcessPipeline( ) error { for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } cn, err := c.getConn(ctx) diff --git a/ring.go b/ring.go index 2cdb01a7..94ea77b6 100644 --- a/ring.go +++ b/ring.go @@ -553,7 +553,9 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { func (c *Ring) process(ctx context.Context, cmd Cmder) error { for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } shard, err := c.cmdShard(cmd) @@ -626,7 +628,9 @@ func (c *Ring) generalProcessPipeline( for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + return err + } } var mu sync.Mutex diff --git a/tx.go b/tx.go index 38a88555..327f1ab8 100644 --- a/tx.go +++ b/tx.go @@ -22,13 +22,13 @@ type Tx struct { ctx context.Context } -func (c *Client) newTx() *Tx { +func (c *Client) newTx(ctx context.Context) *Tx { tx := Tx{ baseClient: baseClient{ opt: c.opt, connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true), }, - ctx: c.ctx, + ctx: ctx, } tx.init() return &tx @@ -65,7 +65,11 @@ func (c *Tx) ProcessContext(ctx context.Context, cmd Cmder) error { // // The transaction is automatically closed when fn exits. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { - tx := c.newTx() + return c.WatchContext(c.ctx, fn, keys...) +} + +func (c *Client) WatchContext(ctx context.Context, fn func(*Tx) error, keys ...string) error { + tx := c.newTx(ctx) if len(keys) > 0 { if err := tx.Watch(keys...).Err(); err != nil { _ = tx.Close()