Merge pull request #836 from go-redis/fix/min-idle-conns2

Add MinIdleConns
This commit is contained in:
Vladimir Mihailenco 2018-08-12 08:50:12 +03:00 committed by GitHub
commit 52d9bc3a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 254 additions and 28 deletions

View File

@ -19,6 +19,7 @@ type Conn struct {
concurrentReadWrite bool concurrentReadWrite bool
Inited bool Inited bool
pooled bool
usedAt atomic.Value usedAt atomic.Value
} }

View File

@ -53,6 +53,7 @@ type Options struct {
OnClose func(*Conn) error OnClose func(*Conn) error
PoolSize int PoolSize int
MinIdleConns int
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration IdleTimeout time.Duration
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
@ -63,16 +64,16 @@ type ConnPool struct {
dialErrorsNum uint32 // atomic dialErrorsNum uint32 // atomic
lastDialError error
lastDialErrorMu sync.RWMutex lastDialErrorMu sync.RWMutex
lastDialError error
queue chan struct{} queue chan struct{}
connsMu sync.Mutex connsMu sync.Mutex
conns []*Conn conns []*Conn
idleConns []*Conn
idleConnsMu sync.RWMutex poolSize int
idleConns []*Conn idleConnsLen int
stats Stats stats Stats
@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool {
idleConns: make([]*Conn, 0, opt.PoolSize), idleConns: make([]*Conn, 0, opt.PoolSize),
} }
for i := 0; i < opt.MinIdleConns; i++ {
p.checkMinIdleConns()
}
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency) go p.reaper(opt.IdleCheckFrequency)
} }
@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool {
return p return p
} }
func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 {
return
}
if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
p.poolSize++
p.idleConnsLen++
go p.addIdleConn()
}
}
func (p *ConnPool) addIdleConn() {
cn, err := p.newConn(true)
if err != nil {
return
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
}
func (p *ConnPool) NewConn() (*Conn, error) { func (p *ConnPool) NewConn() (*Conn, error) {
cn, err := p.newConn() return p._NewConn(false)
}
func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
cn, err := p.newConn(pooled)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.connsMu.Lock() p.connsMu.Lock()
p.conns = append(p.conns, cn) p.conns = append(p.conns, cn)
if pooled {
if p.poolSize < p.opt.PoolSize {
p.poolSize++
} else {
cn.pooled = false
}
}
p.connsMu.Unlock() p.connsMu.Unlock()
return cn, nil return cn, nil
} }
func (p *ConnPool) newConn() (*Conn, error) { func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
if p.closed() { if p.closed() {
return nil, ErrClosed return nil, ErrClosed
} }
@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) {
return nil, err return nil, err
} }
return NewConn(netConn), nil cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil
} }
func (p *ConnPool) tryDial() { func (p *ConnPool) tryDial() {
@ -174,9 +215,9 @@ func (p *ConnPool) Get() (*Conn, error) {
} }
for { for {
p.idleConnsMu.Lock() p.connsMu.Lock()
cn := p.popIdle() cn := p.popIdle()
p.idleConnsMu.Unlock() p.connsMu.Unlock()
if cn == nil { if cn == nil {
break break
@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) {
atomic.AddUint32(&p.stats.Misses, 1) atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.NewConn() newcn, err := p._NewConn(true)
if err != nil { if err != nil {
p.freeTurn() p.freeTurn()
return nil, err return nil, err
@ -241,7 +282,8 @@ func (p *ConnPool) popIdle() *Conn {
idx := len(p.idleConns) - 1 idx := len(p.idleConns) - 1
cn := p.idleConns[idx] cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx] p.idleConns = p.idleConns[:idx]
p.idleConnsLen--
p.checkMinIdleConns()
return cn return cn
} }
@ -253,9 +295,15 @@ func (p *ConnPool) Put(cn *Conn) {
return return
} }
p.idleConnsMu.Lock() if !cn.pooled {
p.Remove(cn)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn) p.idleConns = append(p.idleConns, cn)
p.idleConnsMu.Unlock() p.idleConnsLen++
p.connsMu.Unlock()
p.freeTurn() p.freeTurn()
} }
@ -275,6 +323,10 @@ func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns { for i, c := range p.conns {
if c == cn { if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...) p.conns = append(p.conns[:i], p.conns[i+1:]...)
if cn.pooled {
p.poolSize--
p.checkMinIdleConns()
}
break break
} }
} }
@ -291,17 +343,17 @@ func (p *ConnPool) closeConn(cn *Conn) error {
// Len returns total number of connections. // Len returns total number of connections.
func (p *ConnPool) Len() int { func (p *ConnPool) Len() int {
p.connsMu.Lock() p.connsMu.Lock()
l := len(p.conns) n := p.poolSize
p.connsMu.Unlock() p.connsMu.Unlock()
return l return n
} }
// FreeLen returns number of idle connections. // FreeLen returns number of idle connections.
func (p *ConnPool) IdleLen() int { func (p *ConnPool) IdleLen() int {
p.idleConnsMu.RLock() p.connsMu.Lock()
l := len(p.idleConns) n := p.idleConnsLen
p.idleConnsMu.RUnlock() p.connsMu.Unlock()
return l return n
} }
func (p *ConnPool) Stats() *Stats { func (p *ConnPool) Stats() *Stats {
@ -349,11 +401,10 @@ func (p *ConnPool) Close() error {
} }
} }
p.conns = nil p.conns = nil
p.connsMu.Unlock() p.poolSize = 0
p.idleConnsMu.Lock()
p.idleConns = nil p.idleConns = nil
p.idleConnsMu.Unlock() p.idleConnsLen = 0
p.connsMu.Unlock()
return firstErr return firstErr
} }
@ -369,6 +420,7 @@ func (p *ConnPool) reapStaleConn() *Conn {
} }
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
return cn return cn
} }
@ -378,9 +430,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
for { for {
p.getTurn() p.getTurn()
p.idleConnsMu.Lock() p.connsMu.Lock()
cn := p.reapStaleConn() cn := p.reapStaleConn()
p.idleConnsMu.Unlock() p.connsMu.Unlock()
if cn != nil { if cn != nil {
p.removeConn(cn) p.removeConn(cn)

View File

@ -1,6 +1,7 @@
package pool_test package pool_test
import ( import (
"sync"
"testing" "testing"
"time" "time"
@ -78,6 +79,173 @@ var _ = Describe("ConnPool", func() {
}) })
}) })
var _ = Describe("MinIdleConns", func() {
const poolSize = 100
var minIdleConns int
var connPool *pool.ConnPool
newConnPool := func() *pool.ConnPool {
connPool := pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: poolSize,
MinIdleConns: minIdleConns,
PoolTimeout: 100 * time.Millisecond,
IdleTimeout: -1,
IdleCheckFrequency: -1,
})
Eventually(func() int {
return connPool.Len()
}).Should(Equal(minIdleConns))
return connPool
}
assert := func() {
It("has idle connections when created", func() {
Expect(connPool.Len()).To(Equal(minIdleConns))
Expect(connPool.IdleLen()).To(Equal(minIdleConns))
})
Context("after Get", func() {
var cn *pool.Conn
BeforeEach(func() {
var err error
cn, err = connPool.Get()
Expect(err).NotTo(HaveOccurred())
Eventually(func() int {
return connPool.Len()
}).Should(Equal(minIdleConns + 1))
})
It("has idle connections", func() {
Expect(connPool.Len()).To(Equal(minIdleConns + 1))
Expect(connPool.IdleLen()).To(Equal(minIdleConns))
})
Context("after Remove", func() {
BeforeEach(func() {
connPool.Remove(cn)
})
It("has idle connections", func() {
Expect(connPool.Len()).To(Equal(minIdleConns))
Expect(connPool.IdleLen()).To(Equal(minIdleConns))
})
})
})
Describe("Get does not exceed pool size", func() {
var mu sync.RWMutex
var cns []*pool.Conn
BeforeEach(func() {
cns = make([]*pool.Conn, 0)
perform(poolSize, func(_ int) {
defer GinkgoRecover()
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
mu.Lock()
cns = append(cns, cn)
mu.Unlock()
})
Eventually(func() int {
return connPool.Len()
}).Should(BeNumerically(">=", poolSize))
})
It("Get is blocked", func() {
done := make(chan struct{})
go func() {
connPool.Get()
close(done)
}()
select {
case <-done:
Fail("Get is not blocked")
case <-time.After(time.Millisecond):
// ok
}
select {
case <-done:
// ok
case <-time.After(time.Second):
Fail("Get is not unblocked")
}
})
Context("after Put", func() {
BeforeEach(func() {
perform(len(cns), func(i int) {
mu.RLock()
connPool.Put(cns[i])
mu.RUnlock()
})
Eventually(func() int {
return connPool.Len()
}).Should(Equal(poolSize))
})
It("pool.Len is back to normal", func() {
Expect(connPool.Len()).To(Equal(poolSize))
Expect(connPool.IdleLen()).To(Equal(poolSize))
})
})
Context("after Remove", func() {
BeforeEach(func() {
perform(len(cns), func(i int) {
mu.RLock()
connPool.Remove(cns[i])
mu.RUnlock()
})
Eventually(func() int {
return connPool.Len()
}).Should(Equal(minIdleConns))
})
It("has idle connections", func() {
Expect(connPool.Len()).To(Equal(minIdleConns))
Expect(connPool.IdleLen()).To(Equal(minIdleConns))
})
})
})
}
Context("minIdleConns = 1", func() {
BeforeEach(func() {
minIdleConns = 1
connPool = newConnPool()
})
AfterEach(func() {
connPool.Close()
})
assert()
})
Context("minIdleConns = 32", func() {
BeforeEach(func() {
minIdleConns = 32
connPool = newConnPool()
})
AfterEach(func() {
connPool.Close()
})
assert()
})
})
var _ = Describe("conns reaper", func() { var _ = Describe("conns reaper", func() {
const idleTimeout = time.Minute const idleTimeout = time.Minute

View File

@ -59,6 +59,9 @@ type Options struct {
// Maximum number of socket connections. // Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU. // Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int PoolSize int
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int
// Amount of time client waits for connection if all connections // Amount of time client waits for connection if all connections
// are busy before returning an error. // are busy before returning an error.
// Default is ReadTimeout + 1 second. // Default is ReadTimeout + 1 second.
@ -69,7 +72,8 @@ type Options struct {
IdleTimeout time.Duration IdleTimeout time.Duration
// Frequency of idle checks made by idle connections reaper. // Frequency of idle checks made by idle connections reaper.
// Default is 1 minute. -1 disables idle connections reaper, // Default is 1 minute. -1 disables idle connections reaper,
// but idle connections are still discarded by the client. // but idle connections are still discarded by the client
// if IdleTimeout is set.
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes. // Enables read only queries on slave nodes.
@ -196,6 +200,7 @@ func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{ return pool.NewConnPool(&pool.Options{
Dialer: opt.Dialer, Dialer: opt.Dialer,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency, IdleCheckFrequency: opt.IdleCheckFrequency,