diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 39f4ed1..7a4eea9 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -19,6 +19,7 @@ type Conn struct { concurrentReadWrite bool Inited bool + pooled bool usedAt atomic.Value } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index b7b383b..2125acc 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -53,6 +53,7 @@ type Options struct { OnClose func(*Conn) error PoolSize int + MinIdleConns int PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -63,16 +64,16 @@ type ConnPool struct { dialErrorsNum uint32 // atomic - lastDialError error lastDialErrorMu sync.RWMutex + lastDialError error queue chan struct{} - connsMu sync.Mutex - conns []*Conn - - idleConnsMu sync.RWMutex - idleConns []*Conn + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int stats Stats @@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool { idleConns: make([]*Conn, 0, opt.PoolSize), } + for i := 0; i < opt.MinIdleConns; i++ { + p.checkMinIdleConns() + } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } @@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool { return p } +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go p.addIdleConn() + } +} + +func (p *ConnPool) addIdleConn() { + cn, err := p.newConn(true) + if err != nil { + return + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() +} + func (p *ConnPool) NewConn() (*Conn, error) { - cn, err := p.newConn() + return p._NewConn(false) +} + +func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) { + cn, err := p.newConn(pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) + if pooled { + if p.poolSize < p.opt.PoolSize { + p.poolSize++ + } else { + cn.pooled = false + } + } p.connsMu.Unlock() return cn, nil } -func (p *ConnPool) newConn() (*Conn, error) { +func (p *ConnPool) newConn(pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) { return nil, err } - return NewConn(netConn), nil + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil } func (p *ConnPool) tryDial() { @@ -174,9 +215,9 @@ func (p *ConnPool) Get() (*Conn, error) { } for { - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.popIdle() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn == nil { break @@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) { atomic.AddUint32(&p.stats.Misses, 1) - newcn, err := p.NewConn() + newcn, err := p._NewConn(true) if err != nil { p.freeTurn() return nil, err @@ -241,7 +282,8 @@ func (p *ConnPool) popIdle() *Conn { idx := len(p.idleConns) - 1 cn := p.idleConns[idx] p.idleConns = p.idleConns[:idx] - + p.idleConnsLen-- + p.checkMinIdleConns() return cn } @@ -253,9 +295,15 @@ func (p *ConnPool) Put(cn *Conn) { return } - p.idleConnsMu.Lock() + if !cn.pooled { + p.Remove(cn) + return + } + + p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) - p.idleConnsMu.Unlock() + p.idleConnsLen++ + p.connsMu.Unlock() p.freeTurn() } @@ -275,6 +323,10 @@ func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } break } } @@ -291,17 +343,17 @@ func (p *ConnPool) closeConn(cn *Conn) error { // Len returns total number of connections. func (p *ConnPool) Len() int { p.connsMu.Lock() - l := len(p.conns) + n := p.poolSize p.connsMu.Unlock() - return l + return n } // FreeLen returns number of idle connections. func (p *ConnPool) IdleLen() int { - p.idleConnsMu.RLock() - l := len(p.idleConns) - p.idleConnsMu.RUnlock() - return l + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n } func (p *ConnPool) Stats() *Stats { @@ -349,11 +401,10 @@ func (p *ConnPool) Close() error { } } p.conns = nil - p.connsMu.Unlock() - - p.idleConnsMu.Lock() + p.poolSize = 0 p.idleConns = nil - p.idleConnsMu.Unlock() + p.idleConnsLen = 0 + p.connsMu.Unlock() return firstErr } @@ -369,6 +420,7 @@ func (p *ConnPool) reapStaleConn() *Conn { } p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- return cn } @@ -378,9 +430,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) { for { p.getTurn() - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.reapStaleConn() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn != nil { p.removeConn(cn) diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 4960268..6f49afb 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -1,6 +1,7 @@ package pool_test import ( + "sync" "testing" "time" @@ -78,6 +79,173 @@ var _ = Describe("ConnPool", func() { }) }) +var _ = Describe("MinIdleConns", func() { + const poolSize = 100 + var minIdleConns int + var connPool *pool.ConnPool + + newConnPool := func() *pool.ConnPool { + connPool := pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: poolSize, + MinIdleConns: minIdleConns, + PoolTimeout: 100 * time.Millisecond, + IdleTimeout: -1, + IdleCheckFrequency: -1, + }) + Eventually(func() int { + return connPool.Len() + }).Should(Equal(minIdleConns)) + return connPool + } + + assert := func() { + It("has idle connections when created", func() { + Expect(connPool.Len()).To(Equal(minIdleConns)) + Expect(connPool.IdleLen()).To(Equal(minIdleConns)) + }) + + Context("after Get", func() { + var cn *pool.Conn + + BeforeEach(func() { + var err error + cn, err = connPool.Get() + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() int { + return connPool.Len() + }).Should(Equal(minIdleConns + 1)) + }) + + It("has idle connections", func() { + Expect(connPool.Len()).To(Equal(minIdleConns + 1)) + Expect(connPool.IdleLen()).To(Equal(minIdleConns)) + }) + + Context("after Remove", func() { + BeforeEach(func() { + connPool.Remove(cn) + }) + + It("has idle connections", func() { + Expect(connPool.Len()).To(Equal(minIdleConns)) + Expect(connPool.IdleLen()).To(Equal(minIdleConns)) + }) + }) + }) + + Describe("Get does not exceed pool size", func() { + var mu sync.RWMutex + var cns []*pool.Conn + + BeforeEach(func() { + cns = make([]*pool.Conn, 0) + + perform(poolSize, func(_ int) { + defer GinkgoRecover() + + cn, err := connPool.Get() + Expect(err).NotTo(HaveOccurred()) + mu.Lock() + cns = append(cns, cn) + mu.Unlock() + }) + + Eventually(func() int { + return connPool.Len() + }).Should(BeNumerically(">=", poolSize)) + }) + + It("Get is blocked", func() { + done := make(chan struct{}) + go func() { + connPool.Get() + close(done) + }() + + select { + case <-done: + Fail("Get is not blocked") + case <-time.After(time.Millisecond): + // ok + } + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("Get is not unblocked") + } + }) + + Context("after Put", func() { + BeforeEach(func() { + perform(len(cns), func(i int) { + mu.RLock() + connPool.Put(cns[i]) + mu.RUnlock() + }) + + Eventually(func() int { + return connPool.Len() + }).Should(Equal(poolSize)) + }) + + It("pool.Len is back to normal", func() { + Expect(connPool.Len()).To(Equal(poolSize)) + Expect(connPool.IdleLen()).To(Equal(poolSize)) + }) + }) + + Context("after Remove", func() { + BeforeEach(func() { + perform(len(cns), func(i int) { + mu.RLock() + connPool.Remove(cns[i]) + mu.RUnlock() + }) + + Eventually(func() int { + return connPool.Len() + }).Should(Equal(minIdleConns)) + }) + + It("has idle connections", func() { + Expect(connPool.Len()).To(Equal(minIdleConns)) + Expect(connPool.IdleLen()).To(Equal(minIdleConns)) + }) + }) + }) + } + + Context("minIdleConns = 1", func() { + BeforeEach(func() { + minIdleConns = 1 + connPool = newConnPool() + }) + + AfterEach(func() { + connPool.Close() + }) + + assert() + }) + + Context("minIdleConns = 32", func() { + BeforeEach(func() { + minIdleConns = 32 + connPool = newConnPool() + }) + + AfterEach(func() { + connPool.Close() + }) + + assert() + }) +}) + var _ = Describe("conns reaper", func() { const idleTimeout = time.Minute diff --git a/options.go b/options.go index 8a82d59..8fd7454 100644 --- a/options.go +++ b/options.go @@ -59,6 +59,9 @@ type Options struct { // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int + // Minimum number of idle connections which is useful when establishing + // new connection is slow. + MinIdleConns int // Amount of time client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. @@ -69,7 +72,8 @@ type Options struct { IdleTimeout time.Duration // Frequency of idle checks made by idle connections reaper. // Default is 1 minute. -1 disables idle connections reaper, - // but idle connections are still discarded by the client. + // but idle connections are still discarded by the client + // if IdleTimeout is set. IdleCheckFrequency time.Duration // Enables read only queries on slave nodes. @@ -196,6 +200,7 @@ func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{ Dialer: opt.Dialer, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency,