forked from mirror/redis
Add reaper that closes idle connections to the cluster.
This commit is contained in:
parent
4fefa47d6d
commit
fe931fc851
15
cluster.go
15
cluster.go
|
@ -42,6 +42,7 @@ func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) {
|
|||
}
|
||||
client.commandable.process = client.process
|
||||
client.reloadIfDue()
|
||||
go client.reaper(time.NewTicker(5 * time.Minute))
|
||||
return client, nil
|
||||
}
|
||||
|
||||
|
@ -220,6 +221,20 @@ func (c *ClusterClient) update(infos []ClusterSlotInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// reaper closes idle connections to the cluster.
|
||||
func (c *ClusterClient) reaper(ticker *time.Ticker) {
|
||||
for _ = range ticker.C {
|
||||
for _, client := range c.conns {
|
||||
pool := client.connPool
|
||||
// pool.First removes idle connections from the pool for us. So
|
||||
// just put returned connection back.
|
||||
if cn := pool.First(); cn != nil {
|
||||
pool.Put(cn)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
var errNoAddrs = errors.New("redis: no addresses")
|
||||
|
|
97
pool.go
97
pool.go
|
@ -23,13 +23,13 @@ var (
|
|||
)
|
||||
|
||||
type pool interface {
|
||||
First() *conn
|
||||
Get() (*conn, bool, error)
|
||||
Put(*conn) error
|
||||
Remove(*conn) error
|
||||
Len() int
|
||||
Size() int
|
||||
Close() error
|
||||
Filter(func(*conn) bool)
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
@ -106,7 +106,7 @@ type connPool struct {
|
|||
rl *ratelimit.RateLimiter
|
||||
|
||||
opt *options
|
||||
conns chan *conn
|
||||
freeConns chan *conn
|
||||
|
||||
size int32
|
||||
closed int32
|
||||
|
@ -120,21 +120,23 @@ func newConnPool(dial func() (*conn, error), opt *options) *connPool {
|
|||
rl: ratelimit.New(2*opt.PoolSize, time.Second),
|
||||
|
||||
opt: opt,
|
||||
conns: make(chan *conn, opt.PoolSize),
|
||||
freeConns: make(chan *conn, opt.PoolSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *connPool) isClosed() bool { return atomic.LoadInt32(&p.closed) > 0 }
|
||||
|
||||
// First available connection, non-blocking
|
||||
func (p *connPool) first() *conn {
|
||||
// First returns first non-idle connection from the pool or nil if
|
||||
// there are no connections.
|
||||
func (p *connPool) First() *conn {
|
||||
for {
|
||||
select {
|
||||
case cn := <-p.conns:
|
||||
if !cn.isIdle(p.opt.IdleTimeout) {
|
||||
return cn
|
||||
}
|
||||
case cn := <-p.freeConns:
|
||||
if cn.isIdle(p.opt.IdleTimeout) {
|
||||
p.remove(cn)
|
||||
continue
|
||||
}
|
||||
return cn
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
@ -142,18 +144,19 @@ func (p *connPool) first() *conn {
|
|||
panic("not reached")
|
||||
}
|
||||
|
||||
// Wait for available connection, blocking
|
||||
func (p *connPool) wait() (*conn, error) {
|
||||
deadline := time.After(p.opt.PoolTimeout)
|
||||
// wait waits for free non-idle connection. It returns nil on timeout.
|
||||
func (p *connPool) wait(timeout time.Duration) *conn {
|
||||
deadline := time.After(timeout)
|
||||
for {
|
||||
select {
|
||||
case cn := <-p.conns:
|
||||
if !cn.isIdle(p.opt.IdleTimeout) {
|
||||
return cn, nil
|
||||
}
|
||||
case cn := <-p.freeConns:
|
||||
if cn.isIdle(p.opt.IdleTimeout) {
|
||||
p.remove(cn)
|
||||
continue
|
||||
}
|
||||
return cn
|
||||
case <-deadline:
|
||||
return nil, errPoolTimeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
panic("not reached")
|
||||
|
@ -175,13 +178,15 @@ func (p *connPool) new() (*conn, error) {
|
|||
return cn, err
|
||||
}
|
||||
|
||||
// Get returns existed connection from the pool or creates a new one
|
||||
// if needed.
|
||||
func (p *connPool) Get() (*conn, bool, error) {
|
||||
if p.isClosed() {
|
||||
return nil, false, errClosed
|
||||
}
|
||||
|
||||
// Fetch first non-idle connection, if available
|
||||
if cn := p.first(); cn != nil {
|
||||
if cn := p.First(); cn != nil {
|
||||
return cn, false, nil
|
||||
}
|
||||
|
||||
|
@ -197,8 +202,11 @@ func (p *connPool) Get() (*conn, bool, error) {
|
|||
atomic.AddInt32(&p.size, -1)
|
||||
|
||||
// Otherwise, wait for the available connection
|
||||
cn, err := p.wait()
|
||||
return cn, false, err
|
||||
if cn := p.wait(p.opt.PoolTimeout); cn != nil {
|
||||
return cn, false, nil
|
||||
}
|
||||
|
||||
return nil, false, errPoolTimeout
|
||||
}
|
||||
|
||||
func (p *connPool) Put(cn *conn) error {
|
||||
|
@ -214,7 +222,7 @@ func (p *connPool) Put(cn *conn) error {
|
|||
if p.opt.IdleTimeout > 0 {
|
||||
cn.usedAt = time.Now()
|
||||
}
|
||||
p.conns <- cn
|
||||
p.freeConns <- cn
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -232,7 +240,7 @@ func (p *connPool) remove(cn *conn) error {
|
|||
|
||||
// Len returns number of idle connections.
|
||||
func (p *connPool) Len() int {
|
||||
return len(p.conns)
|
||||
return len(p.freeConns)
|
||||
}
|
||||
|
||||
// Size returns number of connections in the pool.
|
||||
|
@ -240,34 +248,23 @@ func (p *connPool) Size() int {
|
|||
return int(atomic.LoadInt32(&p.size))
|
||||
}
|
||||
|
||||
func (p *connPool) Filter(f func(*conn) bool) {
|
||||
for {
|
||||
select {
|
||||
case cn := <-p.conns:
|
||||
if !f(cn) {
|
||||
p.remove(cn)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
panic("not reached")
|
||||
}
|
||||
|
||||
func (p *connPool) Close() (err error) {
|
||||
func (p *connPool) Close() (retErr error) {
|
||||
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
if p.Size() < 1 {
|
||||
// Wait until pool has no connections
|
||||
for p.Size() > 0 {
|
||||
cn := p.wait(p.opt.PoolTimeout)
|
||||
if cn == nil {
|
||||
break
|
||||
}
|
||||
if e := p.remove(<-p.conns); e != nil {
|
||||
err = e
|
||||
if err := p.remove(cn); err != nil {
|
||||
retErr = err
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
return retErr
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
@ -296,6 +293,12 @@ func (p *singleConnPool) SetConn(cn *conn) {
|
|||
p.cnMtx.Unlock()
|
||||
}
|
||||
|
||||
func (p *singleConnPool) First() *conn {
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
return p.cn
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Get() (*conn, bool, error) {
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
|
@ -373,16 +376,6 @@ func (p *singleConnPool) Size() int {
|
|||
return 1
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Filter(f func(*conn) bool) {
|
||||
p.cnMtx.Lock()
|
||||
if p.cn != nil {
|
||||
if !f(p.cn) {
|
||||
p.remove()
|
||||
}
|
||||
}
|
||||
p.cnMtx.Unlock()
|
||||
}
|
||||
|
||||
func (p *singleConnPool) Close() error {
|
||||
defer p.cnMtx.Unlock()
|
||||
p.cnMtx.Lock()
|
||||
|
|
40
sentinel.go
40
sentinel.go
|
@ -249,6 +249,34 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
|
|||
}
|
||||
}
|
||||
|
||||
// closeOldConns closes connections to the old master after failover switch.
|
||||
func (d *sentinelFailover) closeOldConns(newMaster string) {
|
||||
// Good connections that should be put back to the pool. They
|
||||
// can't be put immediately, because pool.First will return them
|
||||
// again on next iteration.
|
||||
cnsToPut := make([]*conn, 0)
|
||||
|
||||
for {
|
||||
cn := d.pool.First()
|
||||
if cn == nil {
|
||||
break
|
||||
}
|
||||
if cn.RemoteAddr().String() != newMaster {
|
||||
log.Printf(
|
||||
"redis-sentinel: closing connection to the old master %s",
|
||||
cn.RemoteAddr(),
|
||||
)
|
||||
d.pool.Remove(cn)
|
||||
} else {
|
||||
cnsToPut = append(cnsToPut, cn)
|
||||
}
|
||||
}
|
||||
|
||||
for _, cn := range cnsToPut {
|
||||
d.pool.Put(cn)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *sentinelFailover) listen() {
|
||||
var pubsub *PubSub
|
||||
for {
|
||||
|
@ -284,16 +312,8 @@ func (d *sentinelFailover) listen() {
|
|||
"redis-sentinel: new %q addr is %s",
|
||||
d.masterName, addr,
|
||||
)
|
||||
d.pool.Filter(func(cn *conn) bool {
|
||||
if cn.RemoteAddr().String() != addr {
|
||||
log.Printf(
|
||||
"redis-sentinel: closing connection to old master %s",
|
||||
cn.RemoteAddr(),
|
||||
)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
d.closeOldConns(addr)
|
||||
default:
|
||||
log.Printf("redis-sentinel: unsupported message: %s", msg)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue