diff --git a/ring.go b/ring.go index 3ded2806..61050cab 100644 --- a/ring.go +++ b/ring.go @@ -342,6 +342,7 @@ type Ring struct { shards *ringShards cmdsInfoCache *cmdsInfoCache + process func(Cmder) error processPipeline func([]Cmder) error } @@ -354,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring { } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -526,19 +528,34 @@ func (c *Ring) Do(args ...interface{}) *Cmd { func (c *Ring) WrapProcess( fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, ) { - c.ForEachShard(func(c *Client) error { - c.WrapProcess(fn) - return nil - }) + c.process = fn(c.process) } func (c *Ring) Process(cmd Cmder) error { - shard, err := c.cmdShard(cmd) - if err != nil { - cmd.setErr(err) - return err + return c.process(cmd) +} + +func (c *Ring) defaultProcess(cmd Cmder) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + shard, err := c.cmdShard(cmd) + if err != nil { + cmd.setErr(err) + return err + } + + err = shard.Client.Process(cmd) + if err == nil { + return nil + } + if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { + return err + } } - return shard.Client.Process(cmd) + return cmd.Err() } func (c *Ring) Pipeline() Pipeliner {