diff --git a/cluster.go b/cluster.go index eb7127ab..bbb505b0 100644 --- a/cluster.go +++ b/cluster.go @@ -42,6 +42,7 @@ func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { } client.commandable.process = client.process client.reloadIfDue() + go client.reaper(time.NewTicker(5 * time.Minute)) return client, nil } @@ -220,6 +221,20 @@ func (c *ClusterClient) update(infos []ClusterSlotInfo) { } } +// reaper closes idle connections to the cluster. +func (c *ClusterClient) reaper(ticker *time.Ticker) { + for _ = range ticker.C { + for _, client := range c.conns { + pool := client.connPool + // pool.First removes idle connections from the pool for us. So + // just put returned connection back. + if cn := pool.First(); cn != nil { + pool.Put(cn) + } + } + } +} + //------------------------------------------------------------------------------ var errNoAddrs = errors.New("redis: no addresses") diff --git a/pool.go b/pool.go index e26c3f2f..7681836b 100644 --- a/pool.go +++ b/pool.go @@ -23,13 +23,13 @@ var ( ) type pool interface { + First() *conn Get() (*conn, bool, error) Put(*conn) error Remove(*conn) error Len() int Size() int Close() error - Filter(func(*conn) bool) } //------------------------------------------------------------------------------ @@ -105,8 +105,8 @@ type connPool struct { dial func() (*conn, error) rl *ratelimit.RateLimiter - opt *options - conns chan *conn + opt *options + freeConns chan *conn size int32 closed int32 @@ -119,22 +119,24 @@ func newConnPool(dial func() (*conn, error), opt *options) *connPool { dial: dial, rl: ratelimit.New(2*opt.PoolSize, time.Second), - opt: opt, - conns: make(chan *conn, opt.PoolSize), + opt: opt, + freeConns: make(chan *conn, opt.PoolSize), } } func (p *connPool) isClosed() bool { return atomic.LoadInt32(&p.closed) > 0 } -// First available connection, non-blocking -func (p *connPool) first() *conn { +// First returns first non-idle connection from the pool or nil if +// there are no connections. +func (p *connPool) First() *conn { for { select { - case cn := <-p.conns: - if !cn.isIdle(p.opt.IdleTimeout) { - return cn + case cn := <-p.freeConns: + if cn.isIdle(p.opt.IdleTimeout) { + p.remove(cn) + continue } - p.remove(cn) + return cn default: return nil } @@ -142,18 +144,19 @@ func (p *connPool) first() *conn { panic("not reached") } -// Wait for available connection, blocking -func (p *connPool) wait() (*conn, error) { - deadline := time.After(p.opt.PoolTimeout) +// wait waits for free non-idle connection. It returns nil on timeout. +func (p *connPool) wait(timeout time.Duration) *conn { + deadline := time.After(timeout) for { select { - case cn := <-p.conns: - if !cn.isIdle(p.opt.IdleTimeout) { - return cn, nil + case cn := <-p.freeConns: + if cn.isIdle(p.opt.IdleTimeout) { + p.remove(cn) + continue } - p.remove(cn) + return cn case <-deadline: - return nil, errPoolTimeout + return nil } } panic("not reached") @@ -175,13 +178,15 @@ func (p *connPool) new() (*conn, error) { return cn, err } +// Get returns existed connection from the pool or creates a new one +// if needed. func (p *connPool) Get() (*conn, bool, error) { if p.isClosed() { return nil, false, errClosed } // Fetch first non-idle connection, if available - if cn := p.first(); cn != nil { + if cn := p.First(); cn != nil { return cn, false, nil } @@ -197,8 +202,11 @@ func (p *connPool) Get() (*conn, bool, error) { atomic.AddInt32(&p.size, -1) // Otherwise, wait for the available connection - cn, err := p.wait() - return cn, false, err + if cn := p.wait(p.opt.PoolTimeout); cn != nil { + return cn, false, nil + } + + return nil, false, errPoolTimeout } func (p *connPool) Put(cn *conn) error { @@ -214,7 +222,7 @@ func (p *connPool) Put(cn *conn) error { if p.opt.IdleTimeout > 0 { cn.usedAt = time.Now() } - p.conns <- cn + p.freeConns <- cn return nil } @@ -232,7 +240,7 @@ func (p *connPool) remove(cn *conn) error { // Len returns number of idle connections. func (p *connPool) Len() int { - return len(p.conns) + return len(p.freeConns) } // Size returns number of connections in the pool. @@ -240,34 +248,23 @@ func (p *connPool) Size() int { return int(atomic.LoadInt32(&p.size)) } -func (p *connPool) Filter(f func(*conn) bool) { - for { - select { - case cn := <-p.conns: - if !f(cn) { - p.remove(cn) - } - default: - return - } - } - panic("not reached") -} - -func (p *connPool) Close() (err error) { +func (p *connPool) Close() (retErr error) { if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) { return nil } - for { - if p.Size() < 1 { + // Wait until pool has no connections + for p.Size() > 0 { + cn := p.wait(p.opt.PoolTimeout) + if cn == nil { break } - if e := p.remove(<-p.conns); e != nil { - err = e + if err := p.remove(cn); err != nil { + retErr = err } } - return + + return retErr } //------------------------------------------------------------------------------ @@ -296,6 +293,12 @@ func (p *singleConnPool) SetConn(cn *conn) { p.cnMtx.Unlock() } +func (p *singleConnPool) First() *conn { + defer p.cnMtx.Unlock() + p.cnMtx.Lock() + return p.cn +} + func (p *singleConnPool) Get() (*conn, bool, error) { defer p.cnMtx.Unlock() p.cnMtx.Lock() @@ -373,16 +376,6 @@ func (p *singleConnPool) Size() int { return 1 } -func (p *singleConnPool) Filter(f func(*conn) bool) { - p.cnMtx.Lock() - if p.cn != nil { - if !f(p.cn) { - p.remove() - } - } - p.cnMtx.Unlock() -} - func (p *singleConnPool) Close() error { defer p.cnMtx.Unlock() p.cnMtx.Lock() diff --git a/sentinel.go b/sentinel.go index cacf52cf..496e2cb4 100644 --- a/sentinel.go +++ b/sentinel.go @@ -249,6 +249,34 @@ func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { } } +// closeOldConns closes connections to the old master after failover switch. +func (d *sentinelFailover) closeOldConns(newMaster string) { + // Good connections that should be put back to the pool. They + // can't be put immediately, because pool.First will return them + // again on next iteration. + cnsToPut := make([]*conn, 0) + + for { + cn := d.pool.First() + if cn == nil { + break + } + if cn.RemoteAddr().String() != newMaster { + log.Printf( + "redis-sentinel: closing connection to the old master %s", + cn.RemoteAddr(), + ) + d.pool.Remove(cn) + } else { + cnsToPut = append(cnsToPut, cn) + } + } + + for _, cn := range cnsToPut { + d.pool.Put(cn) + } +} + func (d *sentinelFailover) listen() { var pubsub *PubSub for { @@ -284,16 +312,8 @@ func (d *sentinelFailover) listen() { "redis-sentinel: new %q addr is %s", d.masterName, addr, ) - d.pool.Filter(func(cn *conn) bool { - if cn.RemoteAddr().String() != addr { - log.Printf( - "redis-sentinel: closing connection to old master %s", - cn.RemoteAddr(), - ) - return false - } - return true - }) + + d.closeOldConns(addr) default: log.Printf("redis-sentinel: unsupported message: %s", msg) }