Cleanup pool

This commit is contained in:
Vladimir Mihailenco 2018-05-28 17:27:24 +03:00
parent 0f9028adf0
commit faf5666fbd
12 changed files with 219 additions and 207 deletions

View File

@ -1172,7 +1172,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder) failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap { for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn() cn, err := node.Client.getConn()
if err != nil { if err != nil {
if err == pool.ErrClosed { if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds) c.remapCmds(cmds, failedCmds)
@ -1184,9 +1184,9 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) { if err == nil || internal.IsRedisError(err) {
_ = node.Client.connPool.Put(cn) node.Client.connPool.Put(cn)
} else { } else {
_ = node.Client.connPool.Remove(cn) node.Client.connPool.Remove(cn)
} }
} }
@ -1336,7 +1336,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder) failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap { for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn() cn, err := node.Client.getConn()
if err != nil { if err != nil {
if err == pool.ErrClosed { if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds) c.remapCmds(cmds, failedCmds)
@ -1348,9 +1348,9 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
if err == nil || internal.IsRedisError(err) { if err == nil || internal.IsRedisError(err) {
_ = node.Client.connPool.Put(cn) node.Client.connPool.Put(cn)
} else { } else {
_ = node.Client.connPool.Remove(cn) node.Client.connPool.Remove(cn)
} }
} }

View File

@ -20,13 +20,11 @@ func benchmarkPoolGetPut(b *testing.B, poolSize int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
cn, _, err := connPool.Get() cn, err := connPool.Get()
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
if err = connPool.Put(cn); err != nil { connPool.Put(cn)
b.Fatal(err)
}
} }
}) })
} }
@ -56,13 +54,11 @@ func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
cn, _, err := connPool.Get() cn, err := connPool.Get()
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
if err := connPool.Remove(cn); err != nil { connPool.Remove(cn)
b.Fatal(err)
}
} }
}) })
} }

View File

@ -28,7 +28,8 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool TotalConns uint32 // number of total connections in the pool
FreeConns uint32 // number of free connections in the pool FreeConns uint32 // deprecated - use IdleConns
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool StaleConns uint32 // number of stale connections removed from the pool
} }
@ -36,12 +37,12 @@ type Pooler interface {
NewConn() (*Conn, error) NewConn() (*Conn, error)
CloseConn(*Conn) error CloseConn(*Conn) error
Get() (*Conn, bool, error) Get() (*Conn, error)
Put(*Conn) error Put(*Conn)
Remove(*Conn) error Remove(*Conn)
Len() int Len() int
FreeLen() int IdleLen() int
Stats() *Stats Stats() *Stats
Close() error Close() error
@ -70,8 +71,8 @@ type ConnPool struct {
connsMu sync.Mutex connsMu sync.Mutex
conns []*Conn conns []*Conn
freeConnsMu sync.Mutex idleConnsMu sync.RWMutex
freeConns []*Conn idleConns []*Conn
stats Stats stats Stats
@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool {
queue: make(chan struct{}, opt.PoolSize), queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize),
freeConns: make([]*Conn, 0, opt.PoolSize), idleConns: make([]*Conn, 0, opt.PoolSize),
} }
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency) go p.reaper(opt.IdleCheckFrequency)
} }
return p return p
} }
func (p *ConnPool) NewConn() (*Conn, error) { func (p *ConnPool) NewConn() (*Conn, error) {
cn, err := p.newConn()
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.connsMu.Unlock()
return cn, nil
}
func (p *ConnPool) newConn() (*Conn, error) {
if p.closed() { if p.closed() {
return nil, ErrClosed return nil, ErrClosed
} }
@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) {
return nil, err return nil, err
} }
cn := NewConn(netConn) return NewConn(netConn), nil
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.connsMu.Unlock()
return cn, nil
} }
func (p *ConnPool) tryDial() { func (p *ConnPool) tryDial() {
@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error {
} }
// Get returns existed connection from the pool or creates a new one. // Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, bool, error) { func (p *ConnPool) Get() (*Conn, error) {
if p.closed() { if p.closed() {
return nil, false, ErrClosed return nil, ErrClosed
} }
select { err := p.waitTurn()
case p.queue <- struct{}{}: if err != nil {
default: return nil, err
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return nil, false, ErrPoolTimeout
}
} }
for { for {
p.freeConnsMu.Lock() p.idleConnsMu.Lock()
cn := p.popFree() cn := p.popIdle()
p.freeConnsMu.Unlock() p.idleConnsMu.Unlock()
if cn == nil { if cn == nil {
break break
@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
} }
atomic.AddUint32(&p.stats.Hits, 1) atomic.AddUint32(&p.stats.Hits, 1)
return cn, false, nil return cn, nil
} }
atomic.AddUint32(&p.stats.Misses, 1) atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.NewConn() newcn, err := p.NewConn()
if err != nil { if err != nil {
p.freeTurn()
return nil, err
}
return newcn, nil
}
func (p *ConnPool) getTurn() {
p.queue <- struct{}{}
}
func (p *ConnPool) waitTurn() error {
select {
case p.queue <- struct{}{}:
return nil
default:
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return nil
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
}
func (p *ConnPool) freeTurn() {
<-p.queue <-p.queue
return nil, false, err
} }
return newcn, true, nil func (p *ConnPool) popIdle() *Conn {
} if len(p.idleConns) == 0 {
func (p *ConnPool) popFree() *Conn {
if len(p.freeConns) == 0 {
return nil return nil
} }
idx := len(p.freeConns) - 1 idx := len(p.idleConns) - 1
cn := p.freeConns[idx] cn := p.idleConns[idx]
p.freeConns = p.freeConns[:idx] p.idleConns = p.idleConns[:idx]
return cn return cn
} }
func (p *ConnPool) Put(cn *Conn) error { func (p *ConnPool) Put(cn *Conn) {
if data := cn.Rd.PeekBuffered(); data != nil { buf := cn.Rd.PeekBuffered()
internal.Logf("connection has unread data: %q", data) if buf != nil {
return p.Remove(cn) internal.Logf("connection has unread data: %.100q", buf)
} p.Remove(cn)
p.freeConnsMu.Lock() return
p.freeConns = append(p.freeConns, cn)
p.freeConnsMu.Unlock()
<-p.queue
return nil
} }
func (p *ConnPool) Remove(cn *Conn) error { p.idleConnsMu.Lock()
_ = p.CloseConn(cn) p.idleConns = append(p.idleConns, cn)
<-p.queue p.idleConnsMu.Unlock()
return nil p.freeTurn()
}
func (p *ConnPool) Remove(cn *Conn) {
p.removeConn(cn)
p.freeTurn()
_ = p.closeConn(cn)
} }
func (p *ConnPool) CloseConn(cn *Conn) error { func (p *ConnPool) CloseConn(cn *Conn) error {
p.removeConn(cn)
return p.closeConn(cn)
}
func (p *ConnPool) removeConn(cn *Conn) {
p.connsMu.Lock() p.connsMu.Lock()
for i, c := range p.conns { for i, c := range p.conns {
if c == cn { if c == cn {
@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
} }
} }
p.connsMu.Unlock() p.connsMu.Unlock()
return p.closeConn(cn)
} }
func (p *ConnPool) closeConn(cn *Conn) error { func (p *ConnPool) closeConn(cn *Conn) error {
@ -263,22 +296,24 @@ func (p *ConnPool) Len() int {
return l return l
} }
// FreeLen returns number of free connections. // FreeLen returns number of idle connections.
func (p *ConnPool) FreeLen() int { func (p *ConnPool) IdleLen() int {
p.freeConnsMu.Lock() p.idleConnsMu.RLock()
l := len(p.freeConns) l := len(p.idleConns)
p.freeConnsMu.Unlock() p.idleConnsMu.RUnlock()
return l return l
} }
func (p *ConnPool) Stats() *Stats { func (p *ConnPool) Stats() *Stats {
idleLen := p.IdleLen()
return &Stats{ return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits), Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses), Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts), Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()), TotalConns: uint32(p.Len()),
FreeConns: uint32(p.FreeLen()), FreeConns: uint32(idleLen),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns), StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
} }
} }
@ -316,41 +351,45 @@ func (p *ConnPool) Close() error {
p.conns = nil p.conns = nil
p.connsMu.Unlock() p.connsMu.Unlock()
p.freeConnsMu.Lock() p.idleConnsMu.Lock()
p.freeConns = nil p.idleConns = nil
p.freeConnsMu.Unlock() p.idleConnsMu.Unlock()
return firstErr return firstErr
} }
func (p *ConnPool) reapStaleConn() bool { func (p *ConnPool) reapStaleConn() *Conn {
if len(p.freeConns) == 0 { if len(p.idleConns) == 0 {
return false return nil
} }
cn := p.freeConns[0] cn := p.idleConns[0]
if !cn.IsStale(p.opt.IdleTimeout) { if !cn.IsStale(p.opt.IdleTimeout) {
return false return nil
} }
p.CloseConn(cn) p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
return true return cn
} }
func (p *ConnPool) ReapStaleConns() (int, error) { func (p *ConnPool) ReapStaleConns() (int, error) {
var n int var n int
for { for {
p.queue <- struct{}{} p.getTurn()
p.freeConnsMu.Lock()
reaped := p.reapStaleConn() p.idleConnsMu.Lock()
cn := p.reapStaleConn()
p.idleConnsMu.Unlock()
p.freeConnsMu.Unlock() if cn != nil {
<-p.queue p.removeConn(cn)
}
if reaped { p.freeTurn()
if cn != nil {
p.closeConn(cn)
n++ n++
} else { } else {
break break

View File

@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error {
panic("not implemented") panic("not implemented")
} }
func (p *SingleConnPool) Get() (*Conn, bool, error) { func (p *SingleConnPool) Get() (*Conn, error) {
return p.cn, false, nil return p.cn, nil
} }
func (p *SingleConnPool) Put(cn *Conn) error { func (p *SingleConnPool) Put(cn *Conn) {
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
return nil
} }
func (p *SingleConnPool) Remove(cn *Conn) error { func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn { if p.cn != cn {
panic("p.cn != cn") panic("p.cn != cn")
} }
return nil
} }
func (p *SingleConnPool) Len() int { func (p *SingleConnPool) Len() int {
return 1 return 1
} }
func (p *SingleConnPool) FreeLen() int { func (p *SingleConnPool) IdleLen() int {
return 0 return 0
} }

View File

@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error {
panic("not implemented") panic("not implemented")
} }
func (p *StickyConnPool) Get() (*Conn, bool, error) { func (p *StickyConnPool) Get() (*Conn, error) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.closed { if p.closed {
return nil, false, ErrClosed return nil, ErrClosed
} }
if p.cn != nil { if p.cn != nil {
return p.cn, false, nil return p.cn, nil
} }
cn, _, err := p.pool.Get() cn, err := p.pool.Get()
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
p.cn = cn p.cn = cn
return cn, true, nil return cn, nil
} }
func (p *StickyConnPool) putUpstream() (err error) { func (p *StickyConnPool) putUpstream() {
err = p.pool.Put(p.cn) p.pool.Put(p.cn)
p.cn = nil p.cn = nil
return err
} }
func (p *StickyConnPool) Put(cn *Conn) error { func (p *StickyConnPool) Put(cn *Conn) {}
p.mu.Lock()
defer p.mu.Unlock()
if p.closed { func (p *StickyConnPool) removeUpstream() {
return ErrClosed p.pool.Remove(p.cn)
}
return nil
}
func (p *StickyConnPool) removeUpstream() error {
err := p.pool.Remove(p.cn)
p.cn = nil p.cn = nil
return err
} }
func (p *StickyConnPool) Remove(cn *Conn) error { func (p *StickyConnPool) Remove(cn *Conn) {
p.mu.Lock() p.removeUpstream()
defer p.mu.Unlock()
if p.closed {
return nil
}
return p.removeUpstream()
} }
func (p *StickyConnPool) Len() int { func (p *StickyConnPool) Len() int {
@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int {
return 1 return 1
} }
func (p *StickyConnPool) FreeLen() int { func (p *StickyConnPool) IdleLen() int {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error {
return ErrClosed return ErrClosed
} }
p.closed = true p.closed = true
var err error
if p.cn != nil { if p.cn != nil {
if p.reusable { if p.reusable {
err = p.putUpstream() p.putUpstream()
} else { } else {
err = p.removeUpstream() p.removeUpstream()
} }
} }
return err
return nil
} }

View File

@ -29,13 +29,13 @@ var _ = Describe("ConnPool", func() {
It("should unblock client when conn is removed", func() { It("should unblock client when conn is removed", func() {
// Reserve one connection. // Reserve one connection.
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Reserve all other connections. // Reserve all other connections.
var cns []*pool.Conn var cns []*pool.Conn
for i := 0; i < 9; i++ { for i := 0; i < 9; i++ {
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cns = append(cns, cn) cns = append(cns, cn)
} }
@ -46,12 +46,11 @@ var _ = Describe("ConnPool", func() {
defer GinkgoRecover() defer GinkgoRecover()
started <- true started <- true
_, _, err := connPool.Get() _, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
done <- true done <- true
err = connPool.Put(cn) connPool.Put(cn)
Expect(err).NotTo(HaveOccurred())
}() }()
<-started <-started
@ -59,14 +58,13 @@ var _ = Describe("ConnPool", func() {
select { select {
case <-done: case <-done:
Fail("Get is not blocked") Fail("Get is not blocked")
default: case <-time.After(time.Millisecond):
// ok // ok
} }
err = connPool.Remove(cn) connPool.Remove(cn)
Expect(err).NotTo(HaveOccurred())
// Check that Ping is unblocked. // Check that Get is unblocked.
select { select {
case <-done: case <-done:
// ok // ok
@ -75,8 +73,7 @@ var _ = Describe("ConnPool", func() {
} }
for _, cn := range cns { for _, cn := range cns {
err = connPool.Put(cn) connPool.Put(cn)
Expect(err).NotTo(HaveOccurred())
} }
}) })
}) })
@ -107,7 +104,7 @@ var _ = Describe("conns reaper", func() {
// add stale connections // add stale connections
idleConns = nil idleConns = nil
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout)) cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
conns = append(conns, cn) conns = append(conns, cn)
@ -116,17 +113,17 @@ var _ = Describe("conns reaper", func() {
// add fresh connections // add fresh connections
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
conns = append(conns, cn) conns = append(conns, cn)
} }
for _, cn := range conns { for _, cn := range conns {
Expect(connPool.Put(cn)).NotTo(HaveOccurred()) connPool.Put(cn)
} }
Expect(connPool.Len()).To(Equal(6)) Expect(connPool.Len()).To(Equal(6))
Expect(connPool.FreeLen()).To(Equal(6)) Expect(connPool.IdleLen()).To(Equal(6))
n, err := connPool.ReapStaleConns() n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -136,14 +133,14 @@ var _ = Describe("conns reaper", func() {
AfterEach(func() { AfterEach(func() {
_ = connPool.Close() _ = connPool.Close()
Expect(connPool.Len()).To(Equal(0)) Expect(connPool.Len()).To(Equal(0))
Expect(connPool.FreeLen()).To(Equal(0)) Expect(connPool.IdleLen()).To(Equal(0))
Expect(len(closedConns)).To(Equal(len(conns))) Expect(len(closedConns)).To(Equal(len(conns)))
Expect(closedConns).To(ConsistOf(conns)) Expect(closedConns).To(ConsistOf(conns))
}) })
It("reaps stale connections", func() { It("reaps stale connections", func() {
Expect(connPool.Len()).To(Equal(3)) Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(3)) Expect(connPool.IdleLen()).To(Equal(3))
}) })
It("does not reap fresh connections", func() { It("does not reap fresh connections", func() {
@ -161,36 +158,34 @@ var _ = Describe("conns reaper", func() {
for j := 0; j < 3; j++ { for j := 0; j < 3; j++ {
var freeCns []*pool.Conn var freeCns []*pool.Conn
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil()) Expect(cn).NotTo(BeNil())
freeCns = append(freeCns, cn) freeCns = append(freeCns, cn)
} }
Expect(connPool.Len()).To(Equal(3)) Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(0)) Expect(connPool.IdleLen()).To(Equal(0))
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil()) Expect(cn).NotTo(BeNil())
conns = append(conns, cn) conns = append(conns, cn)
Expect(connPool.Len()).To(Equal(4)) Expect(connPool.Len()).To(Equal(4))
Expect(connPool.FreeLen()).To(Equal(0)) Expect(connPool.IdleLen()).To(Equal(0))
err = connPool.Remove(cn) connPool.Remove(cn)
Expect(err).NotTo(HaveOccurred())
Expect(connPool.Len()).To(Equal(3)) Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(0)) Expect(connPool.IdleLen()).To(Equal(0))
for _, cn := range freeCns { for _, cn := range freeCns {
err := connPool.Put(cn) connPool.Put(cn)
Expect(err).NotTo(HaveOccurred())
} }
Expect(connPool.Len()).To(Equal(3)) Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(3)) Expect(connPool.IdleLen()).To(Equal(3))
} }
}) })
}) })
@ -222,18 +217,18 @@ var _ = Describe("race", func() {
perform(C, func(id int) { perform(C, func(id int) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if err == nil { if err == nil {
Expect(connPool.Put(cn)).NotTo(HaveOccurred()) connPool.Put(cn)
} }
} }
}, func(id int) { }, func(id int) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
cn, _, err := connPool.Get() cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if err == nil { if err == nil {
Expect(connPool.Remove(cn)).NotTo(HaveOccurred()) connPool.Remove(cn)
} }
} }
}) })

View File

@ -68,8 +68,7 @@ type Options struct {
// Default is 5 minutes. // Default is 5 minutes.
IdleTimeout time.Duration IdleTimeout time.Duration
// Frequency of idle checks. // Frequency of idle checks.
// Default is 1 minute. // Default is 1 minute. -1 disables idle check.
// When minus value is set, then idle check is disabled.
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes. // Enables read only queries on slave nodes.

View File

@ -30,8 +30,8 @@ var _ = Describe("pool", func() {
pool := client.Pool() pool := client.Pool()
Expect(pool.Len()).To(BeNumerically("<=", 10)) Expect(pool.Len()).To(BeNumerically("<=", 10))
Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
Expect(pool.Len()).To(Equal(pool.FreeLen())) Expect(pool.Len()).To(Equal(pool.IdleLen()))
}) })
It("respects max size on multi", func() { It("respects max size on multi", func() {
@ -55,8 +55,8 @@ var _ = Describe("pool", func() {
pool := client.Pool() pool := client.Pool()
Expect(pool.Len()).To(BeNumerically("<=", 10)) Expect(pool.Len()).To(BeNumerically("<=", 10))
Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
Expect(pool.Len()).To(Equal(pool.FreeLen())) Expect(pool.Len()).To(Equal(pool.IdleLen()))
}) })
It("respects max size on pipelines", func() { It("respects max size on pipelines", func() {
@ -73,15 +73,15 @@ var _ = Describe("pool", func() {
pool := client.Pool() pool := client.Pool()
Expect(pool.Len()).To(BeNumerically("<=", 10)) Expect(pool.Len()).To(BeNumerically("<=", 10))
Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
Expect(pool.Len()).To(Equal(pool.FreeLen())) Expect(pool.Len()).To(Equal(pool.IdleLen()))
}) })
It("removes broken connections", func() { It("removes broken connections", func() {
cn, _, err := client.Pool().Get() cn, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{}) cn.SetNetConn(&badConn{})
Expect(client.Pool().Put(cn)).NotTo(HaveOccurred()) client.Pool().Put(cn)
err = client.Ping().Err() err = client.Ping().Err()
Expect(err).To(MatchError("bad connection")) Expect(err).To(MatchError("bad connection"))
@ -92,7 +92,7 @@ var _ = Describe("pool", func() {
pool := client.Pool() pool := client.Pool()
Expect(pool.Len()).To(Equal(1)) Expect(pool.Len()).To(Equal(1))
Expect(pool.FreeLen()).To(Equal(1)) Expect(pool.IdleLen()).To(Equal(1))
stats := pool.Stats() stats := pool.Stats()
Expect(stats.Hits).To(Equal(uint32(2))) Expect(stats.Hits).To(Equal(uint32(2)))
@ -109,7 +109,7 @@ var _ = Describe("pool", func() {
pool := client.Pool() pool := client.Pool()
Expect(pool.Len()).To(Equal(1)) Expect(pool.Len()).To(Equal(1))
Expect(pool.FreeLen()).To(Equal(1)) Expect(pool.IdleLen()).To(Equal(1))
stats := pool.Stats() stats := pool.Stats()
Expect(stats.Hits).To(Equal(uint32(100))) Expect(stats.Hits).To(Equal(uint32(100)))
@ -125,6 +125,7 @@ var _ = Describe("pool", func() {
Timeouts: 0, Timeouts: 0,
TotalConns: 1, TotalConns: 1,
FreeConns: 1, FreeConns: 1,
IdleConns: 1,
StaleConns: 0, StaleConns: 0,
})) }))

View File

@ -60,29 +60,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return cn, nil return cn, nil
} }
func (c *baseClient) getConn() (*pool.Conn, bool, error) { func (c *baseClient) getConn() (*pool.Conn, error) {
cn, isNew, err := c.connPool.Get() cn, err := c.connPool.Get()
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
if !cn.Inited { if !cn.Inited {
if err := c.initConn(cn); err != nil { err := c.initConn(cn)
_ = c.connPool.Remove(cn) if err != nil {
return nil, false, err c.connPool.Remove(cn)
return nil, err
} }
} }
return cn, isNew, nil return cn, nil
} }
func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
if internal.IsBadConn(err, false) { if internal.IsBadConn(err, false) {
_ = c.connPool.Remove(cn) c.connPool.Remove(cn)
return false return false
} }
_ = c.connPool.Put(cn) c.connPool.Put(cn)
return true return true
} }
@ -137,7 +138,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
time.Sleep(c.retryBackoff(attempt)) time.Sleep(c.retryBackoff(attempt))
} }
cn, _, err := c.getConn() cn, err := c.getConn()
if err != nil { if err != nil {
cmd.setErr(err) cmd.setErr(err)
if internal.IsRetryableError(err, true) { if internal.IsRetryableError(err, true) {
@ -225,7 +226,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
time.Sleep(c.retryBackoff(attempt)) time.Sleep(c.retryBackoff(attempt))
} }
cn, _, err := c.getConn() cn, err := c.getConn()
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
@ -234,10 +235,10 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
canRetry, err := p(cn, cmds) canRetry, err := p(cn, cmds)
if err == nil || internal.IsRedisError(err) { if err == nil || internal.IsRedisError(err) {
_ = c.connPool.Put(cn) c.connPool.Put(cn)
break break
} }
_ = c.connPool.Remove(cn) c.connPool.Remove(cn)
if !canRetry || !internal.IsRetryableError(err, true) { if !canRetry || !internal.IsRetryableError(err, true) {
break break

View File

@ -145,12 +145,11 @@ var _ = Describe("Client", func() {
}) })
// Put bad connection in the pool. // Put bad connection in the pool.
cn, _, err := client.Pool().Get() cn, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{}) cn.SetNetConn(&badConn{})
err = client.Pool().Put(cn) client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())
err = client.Ping().Err() err = client.Ping().Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -184,19 +183,18 @@ var _ = Describe("Client", func() {
}) })
It("should update conn.UsedAt on read/write", func() { It("should update conn.UsedAt on read/write", func() {
cn, _, err := client.Pool().Get() cn, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cn.UsedAt).NotTo(BeZero()) Expect(cn.UsedAt).NotTo(BeZero())
createdAt := cn.UsedAt() createdAt := cn.UsedAt()
err = client.Pool().Put(cn) client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())
Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue()) Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
err = client.Ping().Err() err = client.Ping().Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cn, _, err = client.Pool().Get() cn, err = client.Pool().Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil()) Expect(cn).NotTo(BeNil())
Expect(cn.UsedAt().After(createdAt)).To(BeTrue()) Expect(cn.UsedAt().After(createdAt)).To(BeTrue())

View File

@ -525,7 +525,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
continue continue
} }
cn, _, err := shard.Client.getConn() cn, err := shard.Client.getConn()
if err != nil { if err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
continue continue
@ -533,10 +533,10 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
if err == nil || internal.IsRedisError(err) { if err == nil || internal.IsRedisError(err) {
_ = shard.Client.connPool.Put(cn) shard.Client.connPool.Put(cn)
continue continue
} }
_ = shard.Client.connPool.Remove(cn) shard.Client.connPool.Remove(cn)
if canRetry && internal.IsRetryableError(err, true) { if canRetry && internal.IsRetryableError(err, true) {
if failedCmdsMap == nil { if failedCmdsMap == nil {

View File

@ -124,12 +124,11 @@ var _ = Describe("Tx", func() {
It("should recover from bad connection", func() { It("should recover from bad connection", func() {
// Put bad connection in the pool. // Put bad connection in the pool.
cn, _, err := client.Pool().Get() cn, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{}) cn.SetNetConn(&badConn{})
err = client.Pool().Put(cn) client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())
do := func() error { do := func() error {
err := client.Watch(func(tx *redis.Tx) error { err := client.Watch(func(tx *redis.Tx) error {