From 0e7099cf69db0d14d37ed60d0622aaf7ff8dbb91 Mon Sep 17 00:00:00 2001
From: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Date: Fri, 7 Sep 2018 11:45:56 +0300
Subject: [PATCH] ring: retry commands

---
 ring.go | 35 ++++++++++++++++++++++++++---------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git a/ring.go b/ring.go
index 3ded2806..61050cab 100644
--- a/ring.go
+++ b/ring.go
@@ -342,6 +342,7 @@ type Ring struct {
 	shards        *ringShards
 	cmdsInfoCache *cmdsInfoCache
 
+	process         func(Cmder) error
 	processPipeline func([]Cmder) error
 }
 
@@ -354,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring {
 	}
 	ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
 
+	ring.process = ring.defaultProcess
 	ring.processPipeline = ring.defaultProcessPipeline
 	ring.cmdable.setProcessor(ring.Process)
 
@@ -526,19 +528,34 @@ func (c *Ring) Do(args ...interface{}) *Cmd {
 func (c *Ring) WrapProcess(
 	fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
 ) {
-	c.ForEachShard(func(c *Client) error {
-		c.WrapProcess(fn)
-		return nil
-	})
+	c.process = fn(c.process)
 }
 
 func (c *Ring) Process(cmd Cmder) error {
-	shard, err := c.cmdShard(cmd)
-	if err != nil {
-		cmd.setErr(err)
-		return err
+	return c.process(cmd)
+}
+
+func (c *Ring) defaultProcess(cmd Cmder) error {
+	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+		if attempt > 0 {
+			time.Sleep(c.retryBackoff(attempt))
+		}
+
+		shard, err := c.cmdShard(cmd)
+		if err != nil {
+			cmd.setErr(err)
+			return err
+		}
+
+		err = shard.Client.Process(cmd)
+		if err == nil {
+			return nil
+		}
+		if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
+			return err
+		}
 	}
-	return shard.Client.Process(cmd)
+	return cmd.Err()
 }
 
 func (c *Ring) Pipeline() Pipeliner {