ring: retry commands

This commit is contained in:
Vladimir Mihailenco 2018-09-07 11:45:56 +03:00
parent f3bba01df2
commit 0e7099cf69
1 changed files with 26 additions and 9 deletions

35
ring.go
View File

@ -342,6 +342,7 @@ type Ring struct {
shards *ringShards shards *ringShards
cmdsInfoCache *cmdsInfoCache cmdsInfoCache *cmdsInfoCache
process func(Cmder) error
processPipeline func([]Cmder) error processPipeline func([]Cmder) error
} }
@ -354,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring {
} }
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process) ring.cmdable.setProcessor(ring.Process)
@ -526,19 +528,34 @@ func (c *Ring) Do(args ...interface{}) *Cmd {
func (c *Ring) WrapProcess( func (c *Ring) WrapProcess(
fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
) { ) {
c.ForEachShard(func(c *Client) error { c.process = fn(c.process)
c.WrapProcess(fn)
return nil
})
} }
func (c *Ring) Process(cmd Cmder) error { func (c *Ring) Process(cmd Cmder) error {
shard, err := c.cmdShard(cmd) return c.process(cmd)
if err != nil { }
cmd.setErr(err)
return err func (c *Ring) defaultProcess(cmd Cmder) error {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}
shard, err := c.cmdShard(cmd)
if err != nil {
cmd.setErr(err)
return err
}
err = shard.Client.Process(cmd)
if err == nil {
return nil
}
if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
return err
}
} }
return shard.Client.Process(cmd) return cmd.Err()
} }
func (c *Ring) Pipeline() Pipeliner { func (c *Ring) Pipeline() Pipeliner {