Merge pull request #100 from go-redis/fix/pool-close-gracef

pool: gracefully close pool by giving users time to free connection.
This commit is contained in:
Vladimir Mihailenco 2015-05-11 13:04:43 +03:00
commit ab2f697b53
1 changed files with 23 additions and 9 deletions

32
pool.go
View File

@ -66,6 +66,7 @@ func (l *connList) Add(cn *conn) {
l.mx.Unlock() l.mx.Unlock()
} }
// Remove closes connection and removes it from the list.
func (l *connList) Remove(cn *conn) error { func (l *connList) Remove(cn *conn) error {
defer l.mx.Unlock() defer l.mx.Unlock()
l.mx.Lock() l.mx.Lock()
@ -172,8 +173,8 @@ func (p *connPool) First() *conn {
} }
// wait waits for free non-idle connection. It returns nil on timeout. // wait waits for free non-idle connection. It returns nil on timeout.
func (p *connPool) wait(timeout time.Duration) *conn { func (p *connPool) wait() *conn {
deadline := time.After(timeout) deadline := time.After(p.opt.PoolTimeout)
for { for {
select { select {
case cn := <-p.freeConns: case cn := <-p.freeConns:
@ -208,14 +209,13 @@ func (p *connPool) new() (*conn, error) {
return cn, nil return cn, nil
} }
// Get returns existed connection from the pool or creates a new one // Get returns existed connection from the pool or creates a new one.
// if needed.
func (p *connPool) Get() (*conn, error) { func (p *connPool) Get() (*conn, error) {
if p.closed() { if p.closed() {
return nil, errClosed return nil, errClosed
} }
// Fetch first non-idle connection, if available // Fetch first non-idle connection, if available.
if cn := p.First(); cn != nil { if cn := p.First(); cn != nil {
return cn, nil return cn, nil
} }
@ -231,8 +231,8 @@ func (p *connPool) Get() (*conn, error) {
return cn, nil return cn, nil
} }
// Otherwise, wait for the available connection // Otherwise, wait for the available connection.
if cn := p.wait(p.opt.PoolTimeout); cn != nil { if cn := p.wait(); cn != nil {
return cn, nil return cn, nil
} }
@ -277,11 +277,25 @@ func (p *connPool) FreeLen() int {
return len(p.freeConns) return len(p.freeConns)
} }
func (p *connPool) Close() error { func (p *connPool) Close() (retErr error) {
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) { if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
return errClosed return errClosed
} }
return p.conns.Close() // First close free connections.
for p.Len() > 0 {
cn := p.wait()
if cn == nil {
break
}
if err := p.conns.Remove(cn); err != nil {
retErr = err
}
}
// Then close the rest.
if err := p.conns.Close(); err != nil {
retErr = err
}
return retErr
} }
func (p *connPool) reaper() { func (p *connPool) reaper() {