forked from mirror/redis
Added backoff retry
This commit is contained in:
parent
368f0ea0ba
commit
406e882c43
|
@ -13,7 +13,7 @@ type RedisError string
|
|||
func (e RedisError) Error() string { return string(e) }
|
||||
|
||||
func IsRetryableError(err error) bool {
|
||||
return IsNetworkError(err)
|
||||
return IsNetworkError(err) || err.Error() == "ERR max number of clients reached"
|
||||
}
|
||||
|
||||
func IsInternalError(err error) bool {
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
const retryBackoff = 8 * time.Millisecond
|
||||
|
||||
// Retry backoff with jitter sleep to prevent overloaded conditions during intervals
|
||||
// https://www.awsarchitectureblog.com/2015/03/backoff.html
|
||||
func RetryBackoff(retry int, maxRetryBackoff time.Duration) time.Duration {
|
||||
if retry < 0 {
|
||||
retry = 0
|
||||
}
|
||||
|
||||
backoff := retryBackoff << uint(retry)
|
||||
if backoff > maxRetryBackoff {
|
||||
backoff = maxRetryBackoff
|
||||
}
|
||||
|
||||
return time.Duration(rand.Int63n(int64(backoff)))
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
. "github.com/onsi/gomega"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRetryBackoff(t *testing.T) {
|
||||
RegisterTestingT(t)
|
||||
|
||||
for i := -1; i<= 8; i++ {
|
||||
backoff := RetryBackoff(i, 512*time.Millisecond)
|
||||
Expect(backoff >= 0).To(BeTrue())
|
||||
Expect(backoff <= 512*time.Millisecond).To(BeTrue())
|
||||
}
|
||||
}
|
24
options.go
24
options.go
|
@ -34,6 +34,10 @@ type Options struct {
|
|||
// Default is to not retry failed commands.
|
||||
MaxRetries int
|
||||
|
||||
// Retry using exponential backoff wait algorithm between each retry
|
||||
// Default is 512 milliseconds; set to -1 to disable any backoff sleep
|
||||
MaxRetryBackoff time.Duration
|
||||
|
||||
// Dial timeout for establishing new connections.
|
||||
// Default is 5 seconds.
|
||||
DialTimeout time.Duration
|
||||
|
@ -89,15 +93,17 @@ func (opt *Options) init() {
|
|||
if opt.DialTimeout == 0 {
|
||||
opt.DialTimeout = 5 * time.Second
|
||||
}
|
||||
if opt.ReadTimeout == 0 {
|
||||
opt.ReadTimeout = 3 * time.Second
|
||||
} else if opt.ReadTimeout == -1 {
|
||||
switch opt.ReadTimeout {
|
||||
case -1:
|
||||
opt.ReadTimeout = 0
|
||||
case 0:
|
||||
opt.ReadTimeout = 3 * time.Second
|
||||
}
|
||||
if opt.WriteTimeout == 0 {
|
||||
opt.WriteTimeout = opt.ReadTimeout
|
||||
} else if opt.WriteTimeout == -1 {
|
||||
switch opt.WriteTimeout {
|
||||
case -1:
|
||||
opt.WriteTimeout = 0
|
||||
case 0:
|
||||
opt.WriteTimeout = opt.ReadTimeout
|
||||
}
|
||||
if opt.PoolTimeout == 0 {
|
||||
opt.PoolTimeout = opt.ReadTimeout + time.Second
|
||||
|
@ -108,6 +114,12 @@ func (opt *Options) init() {
|
|||
if opt.IdleCheckFrequency == 0 {
|
||||
opt.IdleCheckFrequency = time.Minute
|
||||
}
|
||||
switch opt.MaxRetryBackoff {
|
||||
case -1:
|
||||
opt.MaxRetryBackoff = 0
|
||||
case 0:
|
||||
opt.MaxRetryBackoff = 512 * time.Millisecond
|
||||
}
|
||||
}
|
||||
|
||||
// ParseURL parses a redis URL into options that can be used to connect to redis
|
||||
|
|
9
redis.go
9
redis.go
|
@ -96,9 +96,16 @@ func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(
|
|||
|
||||
func (c *baseClient) defaultProcess(cmd Cmder) error {
|
||||
for i := 0; i <= c.opt.MaxRetries; i++ {
|
||||
if i > 0 {
|
||||
time.Sleep(internal.RetryBackoff(i, c.opt.MaxRetryBackoff))
|
||||
}
|
||||
|
||||
cn, _, err := c.conn()
|
||||
if err != nil {
|
||||
cmd.setErr(err)
|
||||
if internal.IsRetryableError(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -106,7 +113,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
|
|||
if err := writeCmd(cn, cmd); err != nil {
|
||||
c.putConn(cn, err)
|
||||
cmd.setErr(err)
|
||||
if err != nil && internal.IsRetryableError(err) {
|
||||
if internal.IsRetryableError(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
|
|
|
@ -156,6 +156,48 @@ var _ = Describe("Client", func() {
|
|||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should retry with backoff", func() {
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
|
||||
// use up all the available connections to force a fail
|
||||
connectionHogClient := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddr,
|
||||
MaxRetries: 1,
|
||||
})
|
||||
defer connectionHogClient.Close()
|
||||
|
||||
for i := 0; i <= 1002; i++ {
|
||||
connectionHogClient.Pool().NewConn()
|
||||
}
|
||||
|
||||
clientNoRetry := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddr,
|
||||
PoolSize: 1,
|
||||
MaxRetryBackoff: -1,
|
||||
})
|
||||
defer clientNoRetry.Close()
|
||||
|
||||
clientRetry := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddr,
|
||||
MaxRetries: 5,
|
||||
PoolSize: 1,
|
||||
MaxRetryBackoff: 128 * time.Millisecond,
|
||||
})
|
||||
defer clientRetry.Close()
|
||||
|
||||
startNoRetry := time.Now()
|
||||
err := clientNoRetry.Ping().Err()
|
||||
Expect(err).To(HaveOccurred())
|
||||
elapseNoRetry := time.Since(startNoRetry)
|
||||
|
||||
startRetry := time.Now()
|
||||
err = clientRetry.Ping().Err()
|
||||
Expect(err).To(HaveOccurred())
|
||||
elapseRetry := time.Since(startRetry)
|
||||
|
||||
Expect(elapseRetry > elapseNoRetry).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should update conn.UsedAt on read/write", func() {
|
||||
cn, _, err := client.Pool().Get()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
|
|
@ -7,3 +7,4 @@ save ""
|
|||
appendonly yes
|
||||
cluster-config-file nodes.conf
|
||||
cluster-node-timeout 30000
|
||||
maxclients 1001
|
Loading…
Reference in New Issue