diff --git a/internal/pool/bench_test.go b/internal/pool/bench_test.go index 5c02169..e0bb524 100644 --- a/internal/pool/bench_test.go +++ b/internal/pool/bench_test.go @@ -8,7 +8,13 @@ import ( ) func benchmarkPoolGetPut(b *testing.B, poolSize int) { - connPool := pool.NewConnPool(dummyDialer, poolSize, time.Second, time.Hour, time.Hour) + connPool := pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: poolSize, + PoolTimeout: time.Second, + IdleTimeout: time.Hour, + IdleCheckFrequency: time.Hour, + }) b.ResetTimer() @@ -38,7 +44,13 @@ func BenchmarkPoolGetPut1000Conns(b *testing.B) { } func benchmarkPoolGetRemove(b *testing.B, poolSize int) { - connPool := pool.NewConnPool(dummyDialer, poolSize, time.Second, time.Hour, time.Hour) + connPool := pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: poolSize, + PoolTimeout: time.Second, + IdleTimeout: time.Hour, + IdleCheckFrequency: time.Hour, + }) b.ResetTimer() diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 88a252a..bef000e 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -46,14 +46,21 @@ type Pooler interface { Close() error } -type dialer func() (net.Conn, error) - -type ConnPool struct { - dial dialer +type Options struct { + Dialer func() (net.Conn, error) OnClose func(*Conn) error - poolTimeout time.Duration - idleTimeout time.Duration + PoolSize int + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration +} + +type ConnPool struct { + opt *Options + + dialErrorsNum uint32 // atomic + _lastDialError atomic.Value queue chan struct{} @@ -65,24 +72,21 @@ type ConnPool struct { stats Stats - _closed int32 // atomic + _closed uint32 // atomic } var _ Pooler = (*ConnPool)(nil) -func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout, idleCheckFrequency time.Duration) *ConnPool { +func NewConnPool(opt *Options) *ConnPool { p := &ConnPool{ - dial: dial, + opt: opt, - poolTimeout: poolTimeout, - idleTimeout: idleTimeout, - - queue: make(chan struct{}, poolSize), - conns: make([]*Conn, 0, poolSize), - freeConns: make([]*Conn, 0, poolSize), + queue: make(chan struct{}, opt.PoolSize), + conns: make([]*Conn, 0, opt.PoolSize), + freeConns: make([]*Conn, 0, opt.PoolSize), } - if idleTimeout > 0 && idleCheckFrequency > 0 { - go p.reaper(idleCheckFrequency) + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { + go p.reaper(opt.IdleCheckFrequency) } return p } @@ -92,8 +96,16 @@ func (p *ConnPool) NewConn() (*Conn, error) { return nil, ErrClosed } - netConn, err := p.dial() + if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { + return nil, p.lastDialError() + } + + netConn, err := p.opt.Dialer() if err != nil { + p.setLastDialError(err) + if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { + go p.tryDial() + } return nil, err } @@ -105,12 +117,35 @@ func (p *ConnPool) NewConn() (*Conn, error) { return cn, nil } +func (p *ConnPool) tryDial() { + for { + conn, err := p.opt.Dialer() + if err != nil { + p.setLastDialError(err) + time.Sleep(time.Second) + continue + } + + atomic.StoreUint32(&p.dialErrorsNum, 0) + _ = conn.Close() + return + } +} + +func (p *ConnPool) setLastDialError(err error) { + p._lastDialError.Store(err) +} + +func (p *ConnPool) lastDialError() error { + return p._lastDialError.Load().(error) +} + func (p *ConnPool) PopFree() *Conn { select { case p.queue <- struct{}{}: default: timer := timers.Get().(*time.Timer) - timer.Reset(p.poolTimeout) + timer.Reset(p.opt.PoolTimeout) select { case p.queue <- struct{}{}: @@ -158,7 +193,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) { case p.queue <- struct{}{}: default: timer := timers.Get().(*time.Timer) - timer.Reset(p.poolTimeout) + timer.Reset(p.opt.PoolTimeout) select { case p.queue <- struct{}{}: @@ -182,7 +217,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) { break } - if cn.IsStale(p.idleTimeout) { + if cn.IsStale(p.opt.IdleTimeout) { p.CloseConn(cn) continue } @@ -232,8 +267,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error { } func (p *ConnPool) closeConn(cn *Conn) error { - if p.OnClose != nil { - _ = p.OnClose(cn) + if p.opt.OnClose != nil { + _ = p.opt.OnClose(cn) } return cn.Close() } @@ -265,11 +300,11 @@ func (p *ConnPool) Stats() *Stats { } func (p *ConnPool) closed() bool { - return atomic.LoadInt32(&p._closed) == 1 + return atomic.LoadUint32(&p._closed) == 1 } func (p *ConnPool) Close() error { - if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) { + if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) { return ErrClosed } @@ -299,7 +334,7 @@ func (p *ConnPool) reapStaleConn() bool { } cn := p.freeConns[0] - if !cn.IsStale(p.idleTimeout) { + if !cn.IsStale(p.opt.IdleTimeout) { return false } diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index c8fbeb9..f86327a 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -14,8 +14,13 @@ var _ = Describe("ConnPool", func() { var connPool *pool.ConnPool BeforeEach(func() { - connPool = pool.NewConnPool( - dummyDialer, 10, time.Hour, time.Millisecond, time.Millisecond) + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 10, + PoolTimeout: time.Hour, + IdleTimeout: time.Millisecond, + IdleCheckFrequency: time.Millisecond, + }) }) AfterEach(func() { @@ -83,16 +88,21 @@ var _ = Describe("conns reaper", func() { var conns, idleConns, closedConns []*pool.Conn BeforeEach(func() { - connPool = pool.NewConnPool( - dummyDialer, 10, time.Second, idleTimeout, time.Hour) - - closedConns = nil - connPool.OnClose = func(cn *pool.Conn) error { - closedConns = append(closedConns, cn) - return nil - } - conns = nil + closedConns = nil + + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 10, + PoolTimeout: time.Second, + IdleTimeout: idleTimeout, + IdleCheckFrequency: time.Hour, + + OnClose: func(cn *pool.Conn) error { + closedConns = append(closedConns, cn) + return nil + }, + }) // add stale connections idleConns = nil @@ -202,8 +212,13 @@ var _ = Describe("race", func() { }) It("does not happen on Get, Put, and Remove", func() { - connPool = pool.NewConnPool( - dummyDialer, 10, time.Minute, time.Millisecond, time.Millisecond) + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 10, + PoolTimeout: time.Minute, + IdleTimeout: time.Millisecond, + IdleCheckFrequency: time.Millisecond, + }) perform(C, func(id int) { for i := 0; i < N; i++ { @@ -226,7 +241,13 @@ var _ = Describe("race", func() { It("does not happen on Get and PopFree", func() { connPool = pool.NewConnPool( - dummyDialer, 10, time.Minute, time.Second, time.Millisecond) + &pool.Options{ + Dialer: dummyDialer, + PoolSize: 10, + PoolTimeout: time.Minute, + IdleTimeout: time.Second, + IdleCheckFrequency: time.Millisecond, + }) perform(C, func(id int) { for i := 0; i < N; i++ { diff --git a/options.go b/options.go index d3ea254..cd6fa98 100644 --- a/options.go +++ b/options.go @@ -181,13 +181,13 @@ func ParseURL(redisURL string) (*Options, error) { } func newConnPool(opt *Options) *pool.ConnPool { - return pool.NewConnPool( - opt.Dialer, - opt.PoolSize, - opt.PoolTimeout, - opt.IdleTimeout, - opt.IdleCheckFrequency, - ) + return pool.NewConnPool(&pool.Options{ + Dialer: opt.Dialer, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + }) } // PoolStats contains pool state information and accumulated stats.