Merge pull request #83 from go-redis/fix/add-idle-connections-reaper

Add reaper that closes idle connections to the cluster.
This commit is contained in:
Vladimir Mihailenco 2015-04-06 11:43:44 +03:00
commit 6bf048a36c
3 changed files with 92 additions and 64 deletions

View File

@ -42,6 +42,7 @@ func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) {
} }
client.commandable.process = client.process client.commandable.process = client.process
client.reloadIfDue() client.reloadIfDue()
go client.reaper(time.NewTicker(5 * time.Minute))
return client, nil return client, nil
} }
@ -220,6 +221,20 @@ func (c *ClusterClient) update(infos []ClusterSlotInfo) {
} }
} }
// reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper(ticker *time.Ticker) {
for _ = range ticker.C {
for _, client := range c.conns {
pool := client.connPool
// pool.First removes idle connections from the pool for us. So
// just put returned connection back.
if cn := pool.First(); cn != nil {
pool.Put(cn)
}
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
var errNoAddrs = errors.New("redis: no addresses") var errNoAddrs = errors.New("redis: no addresses")

97
pool.go
View File

@ -23,13 +23,13 @@ var (
) )
type pool interface { type pool interface {
First() *conn
Get() (*conn, bool, error) Get() (*conn, bool, error)
Put(*conn) error Put(*conn) error
Remove(*conn) error Remove(*conn) error
Len() int Len() int
Size() int Size() int
Close() error Close() error
Filter(func(*conn) bool)
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -106,7 +106,7 @@ type connPool struct {
rl *ratelimit.RateLimiter rl *ratelimit.RateLimiter
opt *options opt *options
conns chan *conn freeConns chan *conn
size int32 size int32
closed int32 closed int32
@ -120,21 +120,23 @@ func newConnPool(dial func() (*conn, error), opt *options) *connPool {
rl: ratelimit.New(2*opt.PoolSize, time.Second), rl: ratelimit.New(2*opt.PoolSize, time.Second),
opt: opt, opt: opt,
conns: make(chan *conn, opt.PoolSize), freeConns: make(chan *conn, opt.PoolSize),
} }
} }
func (p *connPool) isClosed() bool { return atomic.LoadInt32(&p.closed) > 0 } func (p *connPool) isClosed() bool { return atomic.LoadInt32(&p.closed) > 0 }
// First available connection, non-blocking // First returns first non-idle connection from the pool or nil if
func (p *connPool) first() *conn { // there are no connections.
func (p *connPool) First() *conn {
for { for {
select { select {
case cn := <-p.conns: case cn := <-p.freeConns:
if !cn.isIdle(p.opt.IdleTimeout) { if cn.isIdle(p.opt.IdleTimeout) {
return cn
}
p.remove(cn) p.remove(cn)
continue
}
return cn
default: default:
return nil return nil
} }
@ -142,18 +144,19 @@ func (p *connPool) first() *conn {
panic("not reached") panic("not reached")
} }
// Wait for available connection, blocking // wait waits for free non-idle connection. It returns nil on timeout.
func (p *connPool) wait() (*conn, error) { func (p *connPool) wait(timeout time.Duration) *conn {
deadline := time.After(p.opt.PoolTimeout) deadline := time.After(timeout)
for { for {
select { select {
case cn := <-p.conns: case cn := <-p.freeConns:
if !cn.isIdle(p.opt.IdleTimeout) { if cn.isIdle(p.opt.IdleTimeout) {
return cn, nil
}
p.remove(cn) p.remove(cn)
continue
}
return cn
case <-deadline: case <-deadline:
return nil, errPoolTimeout return nil
} }
} }
panic("not reached") panic("not reached")
@ -175,13 +178,15 @@ func (p *connPool) new() (*conn, error) {
return cn, err return cn, err
} }
// Get returns existed connection from the pool or creates a new one
// if needed.
func (p *connPool) Get() (*conn, bool, error) { func (p *connPool) Get() (*conn, bool, error) {
if p.isClosed() { if p.isClosed() {
return nil, false, errClosed return nil, false, errClosed
} }
// Fetch first non-idle connection, if available // Fetch first non-idle connection, if available
if cn := p.first(); cn != nil { if cn := p.First(); cn != nil {
return cn, false, nil return cn, false, nil
} }
@ -197,8 +202,11 @@ func (p *connPool) Get() (*conn, bool, error) {
atomic.AddInt32(&p.size, -1) atomic.AddInt32(&p.size, -1)
// Otherwise, wait for the available connection // Otherwise, wait for the available connection
cn, err := p.wait() if cn := p.wait(p.opt.PoolTimeout); cn != nil {
return cn, false, err return cn, false, nil
}
return nil, false, errPoolTimeout
} }
func (p *connPool) Put(cn *conn) error { func (p *connPool) Put(cn *conn) error {
@ -214,7 +222,7 @@ func (p *connPool) Put(cn *conn) error {
if p.opt.IdleTimeout > 0 { if p.opt.IdleTimeout > 0 {
cn.usedAt = time.Now() cn.usedAt = time.Now()
} }
p.conns <- cn p.freeConns <- cn
return nil return nil
} }
@ -232,7 +240,7 @@ func (p *connPool) remove(cn *conn) error {
// Len returns number of idle connections. // Len returns number of idle connections.
func (p *connPool) Len() int { func (p *connPool) Len() int {
return len(p.conns) return len(p.freeConns)
} }
// Size returns number of connections in the pool. // Size returns number of connections in the pool.
@ -240,34 +248,23 @@ func (p *connPool) Size() int {
return int(atomic.LoadInt32(&p.size)) return int(atomic.LoadInt32(&p.size))
} }
func (p *connPool) Filter(f func(*conn) bool) { func (p *connPool) Close() (retErr error) {
for {
select {
case cn := <-p.conns:
if !f(cn) {
p.remove(cn)
}
default:
return
}
}
panic("not reached")
}
func (p *connPool) Close() (err error) {
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) { if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
return nil return nil
} }
for { // Wait until pool has no connections
if p.Size() < 1 { for p.Size() > 0 {
cn := p.wait(p.opt.PoolTimeout)
if cn == nil {
break break
} }
if e := p.remove(<-p.conns); e != nil { if err := p.remove(cn); err != nil {
err = e retErr = err
} }
} }
return
return retErr
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -296,6 +293,12 @@ func (p *singleConnPool) SetConn(cn *conn) {
p.cnMtx.Unlock() p.cnMtx.Unlock()
} }
func (p *singleConnPool) First() *conn {
defer p.cnMtx.Unlock()
p.cnMtx.Lock()
return p.cn
}
func (p *singleConnPool) Get() (*conn, bool, error) { func (p *singleConnPool) Get() (*conn, bool, error) {
defer p.cnMtx.Unlock() defer p.cnMtx.Unlock()
p.cnMtx.Lock() p.cnMtx.Lock()
@ -373,16 +376,6 @@ func (p *singleConnPool) Size() int {
return 1 return 1
} }
func (p *singleConnPool) Filter(f func(*conn) bool) {
p.cnMtx.Lock()
if p.cn != nil {
if !f(p.cn) {
p.remove()
}
}
p.cnMtx.Unlock()
}
func (p *singleConnPool) Close() error { func (p *singleConnPool) Close() error {
defer p.cnMtx.Unlock() defer p.cnMtx.Unlock()
p.cnMtx.Lock() p.cnMtx.Lock()

View File

@ -249,6 +249,34 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
} }
} }
// closeOldConns closes connections to the old master after failover switch.
func (d *sentinelFailover) closeOldConns(newMaster string) {
// Good connections that should be put back to the pool. They
// can't be put immediately, because pool.First will return them
// again on next iteration.
cnsToPut := make([]*conn, 0)
for {
cn := d.pool.First()
if cn == nil {
break
}
if cn.RemoteAddr().String() != newMaster {
log.Printf(
"redis-sentinel: closing connection to the old master %s",
cn.RemoteAddr(),
)
d.pool.Remove(cn)
} else {
cnsToPut = append(cnsToPut, cn)
}
}
for _, cn := range cnsToPut {
d.pool.Put(cn)
}
}
func (d *sentinelFailover) listen() { func (d *sentinelFailover) listen() {
var pubsub *PubSub var pubsub *PubSub
for { for {
@ -284,16 +312,8 @@ func (d *sentinelFailover) listen() {
"redis-sentinel: new %q addr is %s", "redis-sentinel: new %q addr is %s",
d.masterName, addr, d.masterName, addr,
) )
d.pool.Filter(func(cn *conn) bool {
if cn.RemoteAddr().String() != addr { d.closeOldConns(addr)
log.Printf(
"redis-sentinel: closing connection to old master %s",
cn.RemoteAddr(),
)
return false
}
return true
})
default: default:
log.Printf("redis-sentinel: unsupported message: %s", msg) log.Printf("redis-sentinel: unsupported message: %s", msg)
} }