Merge pull request #1116 from go-redis/feature/retry-init-conn

Feature/retry init conn
This commit is contained in:
Vladimir Mihailenco 2019-08-08 13:46:23 +03:00 committed by GitHub
commit 3ad2955728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 68 additions and 55 deletions

View File

@ -809,7 +809,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
continue continue
} }
if isRetryableError(err, true) { if isRetryableError(err, cmd.readTimeout() == nil) {
// First retry the same node. // First retry the same node.
if attempt == 0 { if attempt == 0 {
continue continue
@ -1075,7 +1075,7 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
} }
err = c.pipelineProcessCmds(ctx, node, cn, cmds, failedCmds) err = c.pipelineProcessCmds(ctx, node, cn, cmds, failedCmds)
node.Client.releaseConnStrict(cn, err) node.Client.releaseConn(cn, err)
}(node, cmds) }(node, cmds)
} }
@ -1282,7 +1282,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
} }
err = c.txPipelineProcessCmds(ctx, node, cn, cmds, failedCmds) err = c.txPipelineProcessCmds(ctx, node, cn, cmds, failedCmds)
node.Client.releaseConnStrict(cn, err) node.Client.releaseConn(cn, err)
}(node, cmds) }(node, cmds)
} }

View File

@ -6,13 +6,12 @@ import (
"net" "net"
"strings" "strings"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/proto"
) )
func isRetryableError(err error, retryTimeout bool) bool { func isRetryableError(err error, retryTimeout bool) bool {
switch err { switch err {
case nil, context.Canceled, context.DeadlineExceeded, pool.ErrBadConn: case nil, context.Canceled, context.DeadlineExceeded:
return false return false
case io.EOF: case io.EOF:
return true return true
@ -46,14 +45,13 @@ func isRedisError(err error) bool {
} }
func isBadConn(err error, allowTimeout bool) bool { func isBadConn(err error, allowTimeout bool) bool {
switch err { if err == nil {
case nil:
return false return false
case pool.ErrBadConn:
return true
} }
if isRedisError(err) { if isRedisError(err) {
return isReadOnlyError(err) // #790 // Close connections in read only state in case domain addr is used
// and domain resolves to a different Redis Server. See #790.
return isReadOnlyError(err)
} }
if allowTimeout { if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { if netErr, ok := err.(net.Error); ok && netErr.Timeout() {

View File

@ -86,7 +86,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
connPool.Remove(cn) connPool.Remove(cn, nil)
} }
}) })
}) })

View File

@ -39,7 +39,7 @@ type Pooler interface {
Get(context.Context) (*Conn, error) Get(context.Context) (*Conn, error)
Put(*Conn) Put(*Conn)
Remove(*Conn) Remove(*Conn, error)
Len() int Len() int
IdleLen() int IdleLen() int
@ -311,7 +311,7 @@ func (p *ConnPool) popIdle() *Conn {
func (p *ConnPool) Put(cn *Conn) { func (p *ConnPool) Put(cn *Conn) {
if !cn.pooled { if !cn.pooled {
p.Remove(cn) p.Remove(cn, nil)
return return
} }
@ -322,7 +322,7 @@ func (p *ConnPool) Put(cn *Conn) {
p.freeTurn() p.freeTurn()
} }
func (p *ConnPool) Remove(cn *Conn) { func (p *ConnPool) Remove(cn *Conn, reason error) {
p.removeConnWithLock(cn) p.removeConnWithLock(cn)
p.freeTurn() p.freeTurn()
_ = p.closeConn(cn) _ = p.closeConn(cn)

View File

@ -12,7 +12,17 @@ const (
stateClosed = 2 stateClosed = 2
) )
var ErrBadConn = fmt.Errorf("pg: Conn is in a bad state") type BadConnError struct {
wrapped error
}
func (e BadConnError) Error() string {
return "pg: Conn is in a bad state"
}
func (e BadConnError) Unwrap() error {
return e.wrapped
}
type SingleConnPool struct { type SingleConnPool struct {
pool Pooler pool Pooler
@ -21,7 +31,7 @@ type SingleConnPool struct {
ch chan *Conn ch chan *Conn
level int32 // atomic level int32 // atomic
_hasBadConn uint32 // atomic _badConnError atomic.Value
} }
var _ Pooler = (*SingleConnPool)(nil) var _ Pooler = (*SingleConnPool)(nil)
@ -66,10 +76,10 @@ func (p *SingleConnPool) Get(c context.Context) (*Conn, error) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil return cn, nil
} }
p.pool.Remove(cn) p.pool.Remove(cn, ErrClosed)
case stateInited: case stateInited:
if p.hasBadConn() { if err := p.badConnError(); err != nil {
return nil, ErrBadConn return nil, err
} }
cn, ok := <-p.ch cn, ok := <-p.ch
if !ok { if !ok {
@ -95,20 +105,20 @@ func (p *SingleConnPool) Put(cn *Conn) {
} }
func (p *SingleConnPool) freeConn(cn *Conn) { func (p *SingleConnPool) freeConn(cn *Conn) {
if p.hasBadConn() { if err := p.badConnError(); err != nil {
p.pool.Remove(cn) p.pool.Remove(cn, err)
} else { } else {
p.pool.Put(cn) p.pool.Put(cn)
} }
} }
func (p *SingleConnPool) Remove(cn *Conn) { func (p *SingleConnPool) Remove(cn *Conn, reason error) {
defer func() { defer func() {
if recover() != nil { if recover() != nil {
p.pool.Remove(cn) p.pool.Remove(cn, ErrClosed)
} }
}() }()
atomic.StoreUint32(&p._hasBadConn, 1) p._badConnError.Store(BadConnError{wrapped: reason})
p.ch <- cn p.ch <- cn
} }
@ -158,7 +168,7 @@ func (p *SingleConnPool) Close() error {
} }
func (p *SingleConnPool) Reset() error { func (p *SingleConnPool) Reset() error {
if !atomic.CompareAndSwapUint32(&p._hasBadConn, 1, 0) { if p.badConnError() == nil {
return nil return nil
} }
@ -167,7 +177,8 @@ func (p *SingleConnPool) Reset() error {
if !ok { if !ok {
return ErrClosed return ErrClosed
} }
p.pool.Remove(cn) p.pool.Remove(cn, ErrClosed)
p._badConnError.Store(nil)
default: default:
return fmt.Errorf("pg: SingleConnPool does not have a Conn") return fmt.Errorf("pg: SingleConnPool does not have a Conn")
} }
@ -180,6 +191,9 @@ func (p *SingleConnPool) Reset() error {
return nil return nil
} }
func (p *SingleConnPool) hasBadConn() bool { func (p *SingleConnPool) badConnError() error {
return atomic.LoadUint32(&p._hasBadConn) == 1 if v := p._badConnError.Load(); v != nil {
return v.(BadConnError)
}
return nil
} }

View File

@ -58,13 +58,13 @@ func (p *StickyConnPool) putUpstream() {
func (p *StickyConnPool) Put(cn *Conn) {} func (p *StickyConnPool) Put(cn *Conn) {}
func (p *StickyConnPool) removeUpstream() { func (p *StickyConnPool) removeUpstream(reason error) {
p.pool.Remove(p.cn) p.pool.Remove(p.cn, reason)
p.cn = nil p.cn = nil
} }
func (p *StickyConnPool) Remove(cn *Conn) { func (p *StickyConnPool) Remove(cn *Conn, reason error) {
p.removeUpstream() p.removeUpstream(reason)
} }
func (p *StickyConnPool) Len() int { func (p *StickyConnPool) Len() int {
@ -104,7 +104,7 @@ func (p *StickyConnPool) Close() error {
if p.reusable { if p.reusable {
p.putUpstream() p.putUpstream()
} else { } else {
p.removeUpstream() p.removeUpstream(ErrClosed)
} }
} }

View File

@ -65,7 +65,7 @@ var _ = Describe("ConnPool", func() {
// ok // ok
} }
connPool.Remove(cn) connPool.Remove(cn, nil)
// Check that Get is unblocked. // Check that Get is unblocked.
select { select {
@ -128,7 +128,7 @@ var _ = Describe("MinIdleConns", func() {
Context("after Remove", func() { Context("after Remove", func() {
BeforeEach(func() { BeforeEach(func() {
connPool.Remove(cn) connPool.Remove(cn, nil)
}) })
It("has idle connections", func() { It("has idle connections", func() {
@ -205,7 +205,7 @@ var _ = Describe("MinIdleConns", func() {
BeforeEach(func() { BeforeEach(func() {
perform(len(cns), func(i int) { perform(len(cns), func(i int) {
mu.RLock() mu.RLock()
connPool.Remove(cns[i]) connPool.Remove(cns[i], nil)
mu.RUnlock() mu.RUnlock()
}) })
@ -355,7 +355,7 @@ var _ = Describe("conns reaper", func() {
Expect(connPool.Len()).To(Equal(4)) Expect(connPool.Len()).To(Equal(4))
Expect(connPool.IdleLen()).To(Equal(0)) Expect(connPool.IdleLen()).To(Equal(0))
connPool.Remove(cn) connPool.Remove(cn, nil)
Expect(connPool.Len()).To(Equal(3)) Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0)) Expect(connPool.IdleLen()).To(Equal(0))
@ -413,7 +413,7 @@ var _ = Describe("race", func() {
cn, err := connPool.Get(c) cn, err := connPool.Get(c)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if err == nil { if err == nil {
connPool.Remove(cn) connPool.Remove(cn, nil)
} }
} }
}) })

View File

@ -44,3 +44,13 @@ func isLower(s string) bool {
} }
return true return true
} }
func Unwrap(err error) error {
u, ok := err.(interface {
Unwrap() error
})
if !ok {
return nil
}
return u.Unwrap()
}

View File

@ -171,7 +171,10 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
err = c.initConn(ctx, cn) err = c.initConn(ctx, cn)
if err != nil { if err != nil {
c.connPool.Remove(cn) c.connPool.Remove(cn, err)
if err := internal.Unwrap(err); err != nil {
return nil, err
}
return nil, err return nil, err
} }
@ -226,24 +229,12 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
} }
if isBadConn(err, false) { if isBadConn(err, false) {
c.connPool.Remove(cn) c.connPool.Remove(cn, err)
} else { } else {
c.connPool.Put(cn) c.connPool.Put(cn)
} }
} }
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
if c.limiter != nil {
c.limiter.ReportResult(err)
}
if err == nil || isRedisError(err) {
c.connPool.Put(cn)
} else {
c.connPool.Remove(cn)
}
}
func (c *baseClient) process(ctx context.Context, cmd Cmder) error { func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
@ -348,7 +339,7 @@ func (c *baseClient) generalProcessPipeline(
} }
canRetry, err := p(ctx, cn, cmds) canRetry, err := p(ctx, cn, cmds)
c.releaseConnStrict(cn, err) c.releaseConn(cn, err)
if !canRetry || !isRetryableError(err, true) { if !canRetry || !isRetryableError(err, true) {
break break

View File

@ -660,7 +660,7 @@ func (c *Ring) generalProcessPipeline(
} else { } else {
canRetry, err = shard.Client.pipelineProcessCmds(ctx, cn, cmds) canRetry, err = shard.Client.pipelineProcessCmds(ctx, cn, cmds)
} }
shard.Client.releaseConnStrict(cn, err) shard.Client.releaseConn(cn, err)
if canRetry && isRetryableError(err, true) { if canRetry && isRetryableError(err, true) {
mu.Lock() mu.Lock()