Merge pull request #587 from go-redis/fix/pool-dial-errors

Gracefully handle situation when Redis Server is down
This commit is contained in:
Vladimir Mihailenco 2017-06-29 14:52:51 +03:00 committed by GitHub
commit 75ceb983b7
4 changed files with 117 additions and 49 deletions

View File

@ -8,7 +8,13 @@ import (
) )
func benchmarkPoolGetPut(b *testing.B, poolSize int) { func benchmarkPoolGetPut(b *testing.B, poolSize int) {
connPool := pool.NewConnPool(dummyDialer, poolSize, time.Second, time.Hour, time.Hour) connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: poolSize,
PoolTimeout: time.Second,
IdleTimeout: time.Hour,
IdleCheckFrequency: time.Hour,
})
b.ResetTimer() b.ResetTimer()
@ -38,7 +44,13 @@ func BenchmarkPoolGetPut1000Conns(b *testing.B) {
} }
func benchmarkPoolGetRemove(b *testing.B, poolSize int) { func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
connPool := pool.NewConnPool(dummyDialer, poolSize, time.Second, time.Hour, time.Hour) connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: poolSize,
PoolTimeout: time.Second,
IdleTimeout: time.Hour,
IdleCheckFrequency: time.Hour,
})
b.ResetTimer() b.ResetTimer()

View File

@ -46,14 +46,21 @@ type Pooler interface {
Close() error Close() error
} }
type dialer func() (net.Conn, error) type Options struct {
Dialer func() (net.Conn, error)
type ConnPool struct {
dial dialer
OnClose func(*Conn) error OnClose func(*Conn) error
poolTimeout time.Duration PoolSize int
idleTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
type ConnPool struct {
opt *Options
dialErrorsNum uint32 // atomic
_lastDialError atomic.Value
queue chan struct{} queue chan struct{}
@ -65,24 +72,21 @@ type ConnPool struct {
stats Stats stats Stats
_closed int32 // atomic _closed uint32 // atomic
} }
var _ Pooler = (*ConnPool)(nil) var _ Pooler = (*ConnPool)(nil)
func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout, idleCheckFrequency time.Duration) *ConnPool { func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{ p := &ConnPool{
dial: dial, opt: opt,
poolTimeout: poolTimeout, queue: make(chan struct{}, opt.PoolSize),
idleTimeout: idleTimeout, conns: make([]*Conn, 0, opt.PoolSize),
freeConns: make([]*Conn, 0, opt.PoolSize),
queue: make(chan struct{}, poolSize),
conns: make([]*Conn, 0, poolSize),
freeConns: make([]*Conn, 0, poolSize),
} }
if idleTimeout > 0 && idleCheckFrequency > 0 { if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(idleCheckFrequency) go p.reaper(opt.IdleCheckFrequency)
} }
return p return p
} }
@ -92,8 +96,16 @@ func (p *ConnPool) NewConn() (*Conn, error) {
return nil, ErrClosed return nil, ErrClosed
} }
netConn, err := p.dial() if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.lastDialError()
}
netConn, err := p.opt.Dialer()
if err != nil { if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
go p.tryDial()
}
return nil, err return nil, err
} }
@ -105,12 +117,35 @@ func (p *ConnPool) NewConn() (*Conn, error) {
return cn, nil return cn, nil
} }
func (p *ConnPool) tryDial() {
for {
conn, err := p.opt.Dialer()
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Close()
return
}
}
func (p *ConnPool) setLastDialError(err error) {
p._lastDialError.Store(err)
}
func (p *ConnPool) lastDialError() error {
return p._lastDialError.Load().(error)
}
func (p *ConnPool) PopFree() *Conn { func (p *ConnPool) PopFree() *Conn {
select { select {
case p.queue <- struct{}{}: case p.queue <- struct{}{}:
default: default:
timer := timers.Get().(*time.Timer) timer := timers.Get().(*time.Timer)
timer.Reset(p.poolTimeout) timer.Reset(p.opt.PoolTimeout)
select { select {
case p.queue <- struct{}{}: case p.queue <- struct{}{}:
@ -158,7 +193,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
case p.queue <- struct{}{}: case p.queue <- struct{}{}:
default: default:
timer := timers.Get().(*time.Timer) timer := timers.Get().(*time.Timer)
timer.Reset(p.poolTimeout) timer.Reset(p.opt.PoolTimeout)
select { select {
case p.queue <- struct{}{}: case p.queue <- struct{}{}:
@ -182,7 +217,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
break break
} }
if cn.IsStale(p.idleTimeout) { if cn.IsStale(p.opt.IdleTimeout) {
p.CloseConn(cn) p.CloseConn(cn)
continue continue
} }
@ -232,8 +267,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
} }
func (p *ConnPool) closeConn(cn *Conn) error { func (p *ConnPool) closeConn(cn *Conn) error {
if p.OnClose != nil { if p.opt.OnClose != nil {
_ = p.OnClose(cn) _ = p.opt.OnClose(cn)
} }
return cn.Close() return cn.Close()
} }
@ -265,11 +300,11 @@ func (p *ConnPool) Stats() *Stats {
} }
func (p *ConnPool) closed() bool { func (p *ConnPool) closed() bool {
return atomic.LoadInt32(&p._closed) == 1 return atomic.LoadUint32(&p._closed) == 1
} }
func (p *ConnPool) Close() error { func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) { if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
return ErrClosed return ErrClosed
} }
@ -299,7 +334,7 @@ func (p *ConnPool) reapStaleConn() bool {
} }
cn := p.freeConns[0] cn := p.freeConns[0]
if !cn.IsStale(p.idleTimeout) { if !cn.IsStale(p.opt.IdleTimeout) {
return false return false
} }

View File

@ -14,8 +14,13 @@ var _ = Describe("ConnPool", func() {
var connPool *pool.ConnPool var connPool *pool.ConnPool
BeforeEach(func() { BeforeEach(func() {
connPool = pool.NewConnPool( connPool = pool.NewConnPool(&pool.Options{
dummyDialer, 10, time.Hour, time.Millisecond, time.Millisecond) Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Hour,
IdleTimeout: time.Millisecond,
IdleCheckFrequency: time.Millisecond,
})
}) })
AfterEach(func() { AfterEach(func() {
@ -83,16 +88,21 @@ var _ = Describe("conns reaper", func() {
var conns, idleConns, closedConns []*pool.Conn var conns, idleConns, closedConns []*pool.Conn
BeforeEach(func() { BeforeEach(func() {
connPool = pool.NewConnPool(
dummyDialer, 10, time.Second, idleTimeout, time.Hour)
closedConns = nil
connPool.OnClose = func(cn *pool.Conn) error {
closedConns = append(closedConns, cn)
return nil
}
conns = nil conns = nil
closedConns = nil
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Second,
IdleTimeout: idleTimeout,
IdleCheckFrequency: time.Hour,
OnClose: func(cn *pool.Conn) error {
closedConns = append(closedConns, cn)
return nil
},
})
// add stale connections // add stale connections
idleConns = nil idleConns = nil
@ -202,8 +212,13 @@ var _ = Describe("race", func() {
}) })
It("does not happen on Get, Put, and Remove", func() { It("does not happen on Get, Put, and Remove", func() {
connPool = pool.NewConnPool( connPool = pool.NewConnPool(&pool.Options{
dummyDialer, 10, time.Minute, time.Millisecond, time.Millisecond) Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Minute,
IdleTimeout: time.Millisecond,
IdleCheckFrequency: time.Millisecond,
})
perform(C, func(id int) { perform(C, func(id int) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
@ -226,7 +241,13 @@ var _ = Describe("race", func() {
It("does not happen on Get and PopFree", func() { It("does not happen on Get and PopFree", func() {
connPool = pool.NewConnPool( connPool = pool.NewConnPool(
dummyDialer, 10, time.Minute, time.Second, time.Millisecond) &pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Minute,
IdleTimeout: time.Second,
IdleCheckFrequency: time.Millisecond,
})
perform(C, func(id int) { perform(C, func(id int) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {

View File

@ -181,13 +181,13 @@ func ParseURL(redisURL string) (*Options, error) {
} }
func newConnPool(opt *Options) *pool.ConnPool { func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool( return pool.NewConnPool(&pool.Options{
opt.Dialer, Dialer: opt.Dialer,
opt.PoolSize, PoolSize: opt.PoolSize,
opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
opt.IdleCheckFrequency, IdleCheckFrequency: opt.IdleCheckFrequency,
) })
} }
// PoolStats contains pool state information and accumulated stats. // PoolStats contains pool state information and accumulated stats.