mirror of https://github.com/go-redis/redis.git
Merge pull request #101 from go-redis/feature/auto-retry-and-max-retries
Add auto-retry and MaxRetries option. Fixes #84.
This commit is contained in:
commit
b8b073f3bf
|
@ -47,6 +47,12 @@ func setCmdsErr(cmds []Cmder, e error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func resetCmds(cmds []Cmder) {
|
||||||
|
for _, cmd := range cmds {
|
||||||
|
cmd.reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func cmdString(cmd Cmder, val interface{}) string {
|
func cmdString(cmd Cmder, val interface{}) string {
|
||||||
s := strings.Join(cmd.args(), " ")
|
s := strings.Join(cmd.args(), " ")
|
||||||
if err := cmd.Err(); err != nil {
|
if err := cmd.Err(); err != nil {
|
||||||
|
|
16
conn.go
16
conn.go
|
@ -7,14 +7,18 @@ import (
|
||||||
"gopkg.in/bufio.v1"
|
"gopkg.in/bufio.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
zeroTime = time.Time{}
|
||||||
|
)
|
||||||
|
|
||||||
type conn struct {
|
type conn struct {
|
||||||
netcn net.Conn
|
netcn net.Conn
|
||||||
rd *bufio.Reader
|
rd *bufio.Reader
|
||||||
buf []byte
|
buf []byte
|
||||||
|
|
||||||
usedAt time.Time
|
usedAt time.Time
|
||||||
readTimeout time.Duration
|
ReadTimeout time.Duration
|
||||||
writeTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConnDialer(opt *options) func() (*conn, error) {
|
func newConnDialer(opt *options) func() (*conn, error) {
|
||||||
|
@ -70,8 +74,8 @@ func (cn *conn) writeCmds(cmds ...Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *conn) Read(b []byte) (int, error) {
|
func (cn *conn) Read(b []byte) (int, error) {
|
||||||
if cn.readTimeout != 0 {
|
if cn.ReadTimeout != 0 {
|
||||||
cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))
|
cn.netcn.SetReadDeadline(time.Now().Add(cn.ReadTimeout))
|
||||||
} else {
|
} else {
|
||||||
cn.netcn.SetReadDeadline(zeroTime)
|
cn.netcn.SetReadDeadline(zeroTime)
|
||||||
}
|
}
|
||||||
|
@ -79,8 +83,8 @@ func (cn *conn) Read(b []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *conn) Write(b []byte) (int, error) {
|
func (cn *conn) Write(b []byte) (int, error) {
|
||||||
if cn.writeTimeout != 0 {
|
if cn.WriteTimeout != 0 {
|
||||||
cn.netcn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
|
cn.netcn.SetWriteDeadline(time.Now().Add(cn.WriteTimeout))
|
||||||
} else {
|
} else {
|
||||||
cn.netcn.SetWriteDeadline(zeroTime)
|
cn.netcn.SetWriteDeadline(zeroTime)
|
||||||
}
|
}
|
||||||
|
|
10
error.go
10
error.go
|
@ -26,7 +26,7 @@ func (err redisError) Error() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func isNetworkError(err error) bool {
|
func isNetworkError(err error) bool {
|
||||||
if _, ok := err.(*net.OpError); ok || err == io.EOF {
|
if _, ok := err.(net.Error); ok || err == io.EOF {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -53,3 +53,11 @@ func isMovedError(err error) (moved bool, ask bool, addr string) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shouldRetry reports whether failed command should be retried.
|
||||||
|
func shouldRetry(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return isNetworkError(err)
|
||||||
|
}
|
||||||
|
|
|
@ -1,9 +1,15 @@
|
||||||
package redis
|
package redis
|
||||||
|
|
||||||
|
import "net"
|
||||||
|
|
||||||
func (c *baseClient) Pool() pool {
|
func (c *baseClient) Pool() pool {
|
||||||
return c.connPool
|
return c.connPool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cn *conn) SetNetConn(netcn net.Conn) {
|
||||||
|
cn.netcn = netcn
|
||||||
|
}
|
||||||
|
|
||||||
func HashSlot(key string) int {
|
func HashSlot(key string) int {
|
||||||
return hashSlot(key)
|
return hashSlot(key)
|
||||||
}
|
}
|
||||||
|
|
32
pipeline.go
32
pipeline.go
|
@ -50,26 +50,38 @@ func (c *Pipeline) Discard() error {
|
||||||
|
|
||||||
// Exec always returns list of commands and error of the first failed
|
// Exec always returns list of commands and error of the first failed
|
||||||
// command if any.
|
// command if any.
|
||||||
func (c *Pipeline) Exec() ([]Cmder, error) {
|
func (c *Pipeline) Exec() (cmds []Cmder, retErr error) {
|
||||||
if c.closed {
|
if c.closed {
|
||||||
return nil, errClosed
|
return nil, errClosed
|
||||||
}
|
}
|
||||||
if len(c.cmds) == 0 {
|
if len(c.cmds) == 0 {
|
||||||
return []Cmder{}, nil
|
return c.cmds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cmds := c.cmds
|
cmds = c.cmds
|
||||||
c.cmds = make([]Cmder, 0, 0)
|
c.cmds = make([]Cmder, 0, 0)
|
||||||
|
|
||||||
|
for i := 0; i <= c.client.opt.MaxRetries; i++ {
|
||||||
|
if i > 0 {
|
||||||
|
resetCmds(cmds)
|
||||||
|
}
|
||||||
|
|
||||||
cn, err := c.client.conn()
|
cn, err := c.client.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
return cmds, err
|
return cmds, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.execCmds(cn, cmds)
|
retErr = c.execCmds(cn, cmds)
|
||||||
c.client.putConn(cn, err)
|
c.client.putConn(cn, err)
|
||||||
return cmds, err
|
if shouldRetry(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmds, retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
|
func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
|
||||||
|
@ -79,17 +91,11 @@ func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var firstCmdErr error
|
var firstCmdErr error
|
||||||
for i, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
err := cmd.parseReply(cn.rd)
|
err := cmd.parseReply(cn.rd)
|
||||||
if err == nil {
|
if err != nil && firstCmdErr == nil {
|
||||||
continue
|
|
||||||
}
|
|
||||||
if firstCmdErr == nil {
|
|
||||||
firstCmdErr = err
|
firstCmdErr = err
|
||||||
}
|
}
|
||||||
if isNetworkError(err) {
|
|
||||||
setCmdsErr(cmds[i:], err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return firstCmdErr
|
return firstCmdErr
|
||||||
|
|
4
pool.go
4
pool.go
|
@ -16,10 +16,6 @@ var (
|
||||||
errPoolTimeout = errors.New("redis: connection pool timeout")
|
errPoolTimeout = errors.New("redis: connection pool timeout")
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
zeroTime = time.Time{}
|
|
||||||
)
|
|
||||||
|
|
||||||
type pool interface {
|
type pool interface {
|
||||||
First() *conn
|
First() *conn
|
||||||
Get() (*conn, error)
|
Get() (*conn, error)
|
||||||
|
|
|
@ -63,7 +63,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cn.readTimeout = timeout
|
cn.ReadTimeout = timeout
|
||||||
|
|
||||||
cmd := NewSliceCmd()
|
cmd := NewSliceCmd()
|
||||||
if err := cmd.parseReply(cn.rd); err != nil {
|
if err := cmd.parseReply(cn.rd); err != nil {
|
||||||
|
@ -92,6 +92,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
|
||||||
Payload: reply[3].(string),
|
Payload: reply[3].(string),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("redis: unsupported message name: %q", msgName)
|
return nil, fmt.Errorf("redis: unsupported message name: %q", msgName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
30
redis.go
30
redis.go
|
@ -32,6 +32,11 @@ func (c *baseClient) putConn(cn *conn, ei error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) process(cmd Cmder) {
|
func (c *baseClient) process(cmd Cmder) {
|
||||||
|
for i := 0; i <= c.opt.MaxRetries; i++ {
|
||||||
|
if i > 0 {
|
||||||
|
cmd.reset()
|
||||||
|
}
|
||||||
|
|
||||||
cn, err := c.conn()
|
cn, err := c.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
|
@ -39,25 +44,34 @@ func (c *baseClient) process(cmd Cmder) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if timeout := cmd.writeTimeout(); timeout != nil {
|
if timeout := cmd.writeTimeout(); timeout != nil {
|
||||||
cn.writeTimeout = *timeout
|
cn.WriteTimeout = *timeout
|
||||||
} else {
|
} else {
|
||||||
cn.writeTimeout = c.opt.WriteTimeout
|
cn.WriteTimeout = c.opt.WriteTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
if timeout := cmd.readTimeout(); timeout != nil {
|
if timeout := cmd.readTimeout(); timeout != nil {
|
||||||
cn.readTimeout = *timeout
|
cn.ReadTimeout = *timeout
|
||||||
} else {
|
} else {
|
||||||
cn.readTimeout = c.opt.ReadTimeout
|
cn.ReadTimeout = c.opt.ReadTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cn.writeCmds(cmd); err != nil {
|
if err := cn.writeCmds(cmd); err != nil {
|
||||||
c.putConn(cn, err)
|
c.putConn(cn, err)
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
|
if shouldRetry(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cmd.parseReply(cn.rd)
|
err = cmd.parseReply(cn.rd)
|
||||||
c.putConn(cn, err)
|
c.putConn(cn, err)
|
||||||
|
if shouldRetry(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the client, releasing any open resources.
|
// Close closes the client, releasing any open resources.
|
||||||
|
@ -105,6 +119,10 @@ type Options struct {
|
||||||
// than specified in this option.
|
// than specified in this option.
|
||||||
// Default: 0 = no eviction
|
// Default: 0 = no eviction
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
|
|
||||||
|
// MaxRetries specifies maximum number of times client will retry
|
||||||
|
// failed command. Default is to not retry failed command.
|
||||||
|
MaxRetries int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *Options) getDialer() func() (net.Conn, error) {
|
func (opt *Options) getDialer() func() (net.Conn, error) {
|
||||||
|
@ -157,6 +175,8 @@ func (opt *Options) options() *options {
|
||||||
DialTimeout: opt.getDialTimeout(),
|
DialTimeout: opt.getDialTimeout(),
|
||||||
ReadTimeout: opt.ReadTimeout,
|
ReadTimeout: opt.ReadTimeout,
|
||||||
WriteTimeout: opt.WriteTimeout,
|
WriteTimeout: opt.WriteTimeout,
|
||||||
|
|
||||||
|
MaxRetries: opt.MaxRetries,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +192,8 @@ type options struct {
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
ReadTimeout time.Duration
|
ReadTimeout time.Duration
|
||||||
WriteTimeout time.Duration
|
WriteTimeout time.Duration
|
||||||
|
|
||||||
|
MaxRetries int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *options) connPoolOptions() *connPoolOptions {
|
func (opt *options) connPoolOptions() *connPoolOptions {
|
||||||
|
|
|
@ -124,6 +124,23 @@ var _ = Describe("Client", func() {
|
||||||
Expect(db1.FlushDb().Err()).NotTo(HaveOccurred())
|
Expect(db1.FlushDb().Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should retry command on network error", func() {
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
client = redis.NewClient(&redis.Options{
|
||||||
|
Addr: redisAddr,
|
||||||
|
MaxRetries: 1,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Put bad connection in the pool.
|
||||||
|
cn, err := client.Pool().Get()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
cn.SetNetConn(newBadNetConn())
|
||||||
|
Expect(client.Pool().Put(cn)).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.Ping().Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -266,6 +283,24 @@ func BenchmarkPipeline(b *testing.B) {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type badNetConn struct {
|
||||||
|
net.TCPConn
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ net.Conn = &badNetConn{}
|
||||||
|
|
||||||
|
func newBadNetConn() net.Conn {
|
||||||
|
return &badNetConn{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (badNetConn) Read([]byte) (int, error) {
|
||||||
|
return 0, net.UnknownNetworkError("badNetConn")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (badNetConn) Write([]byte) (int, error) {
|
||||||
|
return 0, net.UnknownNetworkError("badNetConn")
|
||||||
|
}
|
||||||
|
|
||||||
// Replaces ginkgo's Eventually.
|
// Replaces ginkgo's Eventually.
|
||||||
func waitForSubstring(fn func() string, substr string, timeout time.Duration) error {
|
func waitForSubstring(fn func() string, substr string, timeout time.Duration) error {
|
||||||
var s string
|
var s string
|
||||||
|
|
Loading…
Reference in New Issue