Merge pull request #277 from go-redis/feature/conn-stack

Make free-connection stack a LIFO.
This commit is contained in:
Vladimir Mihailenco 2016-03-12 15:21:05 +02:00
commit 4665ad860f
5 changed files with 227 additions and 39 deletions

View File

@ -254,7 +254,7 @@ func ExamplePubSub_Receive() {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
// ReceiveTimeout is a low level API. Use ReceiveMessage instead. // ReceiveTimeout is a low level API. Use ReceiveMessage instead.
msgi, err := pubsub.ReceiveTimeout(500 * time.Millisecond) msgi, err := pubsub.ReceiveTimeout(time.Second)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -35,6 +35,10 @@ func NewConn(netConn net.Conn) *Conn {
return cn return cn
} }
func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt) > timeout
}
func (cn *Conn) SetNetConn(netConn net.Conn) { func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn cn.netConn = netConn
cn.UsedAt = time.Now() cn.UsedAt = time.Now()

View File

@ -0,0 +1,74 @@
package pool
import (
"sync"
"time"
)
// connStack is used as a LIFO to maintain free connections
type connStack struct {
cns []*Conn
free chan struct{}
mu sync.Mutex
}
func newConnStack(max int) *connStack {
return &connStack{
cns: make([]*Conn, 0, max),
free: make(chan struct{}, max),
}
}
func (s *connStack) Len() int { return len(s.free) }
func (s *connStack) Push(cn *Conn) {
s.mu.Lock()
s.cns = append(s.cns, cn)
s.mu.Unlock()
s.free <- struct{}{}
}
func (s *connStack) ShiftStale(idleTimeout time.Duration) *Conn {
select {
case <-s.free:
s.mu.Lock()
defer s.mu.Unlock()
if cn := s.cns[0]; cn.IsStale(idleTimeout) {
copy(s.cns, s.cns[1:])
s.cns = s.cns[:len(s.cns)-1]
return cn
}
s.free <- struct{}{}
return nil
default:
return nil
}
}
func (s *connStack) Pop() *Conn {
select {
case <-s.free:
return s.pop()
default:
return nil
}
}
func (s *connStack) PopWithTimeout(d time.Duration) *Conn {
select {
case <-s.free:
return s.pop()
case <-time.After(d):
return nil
}
}
func (s *connStack) pop() (cn *Conn) {
s.mu.Lock()
ci := len(s.cns) - 1
cn, s.cns = s.cns[ci], s.cns[:ci]
s.mu.Unlock()
return
}

View File

@ -50,7 +50,7 @@ type ConnPool struct {
idleTimeout time.Duration idleTimeout time.Duration
conns *connList conns *connList
freeConns chan *Conn freeConns *connStack
stats PoolStats stats PoolStats
_closed int32 _closed int32
@ -67,7 +67,7 @@ func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout time.Durati
idleTimeout: idleTimeout, idleTimeout: idleTimeout,
conns: newConnList(poolSize), conns: newConnList(poolSize),
freeConns: make(chan *Conn, poolSize), freeConns: newConnStack(poolSize),
} }
if idleTimeout > 0 { if idleTimeout > 0 {
go p.reaper() go p.reaper()
@ -83,48 +83,46 @@ func (p *ConnPool) isIdle(cn *Conn) bool {
return p.idleTimeout > 0 && time.Since(cn.UsedAt) > p.idleTimeout return p.idleTimeout > 0 && time.Since(cn.UsedAt) > p.idleTimeout
} }
func (p *ConnPool) Add(cn *Conn) bool {
if !p.conns.Reserve() {
return false
}
p.conns.Add(cn)
p.Put(cn)
return true
}
// First returns first non-idle connection from the pool or nil if // First returns first non-idle connection from the pool or nil if
// there are no connections. // there are no connections.
func (p *ConnPool) First() *Conn { func (p *ConnPool) First() *Conn {
for { for {
select { cn := p.freeConns.Pop()
case cn := <-p.freeConns: if cn != nil && cn.IsStale(p.idleTimeout) {
if p.isIdle(cn) { var err error
var err error cn, err = p.replace(cn)
cn, err = p.replace(cn) if err != nil {
if err != nil { Logger.Printf("pool.replace failed: %s", err)
Logger.Printf("pool.replace failed: %s", err) continue
continue
}
} }
return cn
default:
return nil
} }
return cn
} }
panic("not reached")
} }
// wait waits for free non-idle connection. It returns nil on timeout. // wait waits for free non-idle connection. It returns nil on timeout.
func (p *ConnPool) wait() *Conn { func (p *ConnPool) wait() *Conn {
deadline := time.After(p.poolTimeout)
for { for {
select { cn := p.freeConns.PopWithTimeout(p.poolTimeout)
case cn := <-p.freeConns: if cn != nil && cn.IsStale(p.idleTimeout) {
if p.isIdle(cn) { var err error
var err error cn, err = p.replace(cn)
cn, err = p.replace(cn) if err != nil {
if err != nil { Logger.Printf("pool.replace failed: %s", err)
Logger.Printf("pool.replace failed: %s", err) continue
continue
}
} }
return cn
case <-deadline:
return nil
} }
return cn
} }
panic("not reached")
} }
func (p *ConnPool) dial() (net.Conn, error) { func (p *ConnPool) dial() (net.Conn, error) {
@ -198,7 +196,7 @@ func (p *ConnPool) Put(cn *Conn) error {
Logger.Print(err) Logger.Print(err)
return p.Replace(cn, err) return p.Replace(cn, err)
} }
p.freeConns <- cn p.freeConns.Push(cn)
return nil return nil
} }
@ -223,10 +221,16 @@ func (p *ConnPool) Replace(cn *Conn, reason error) error {
if err != nil { if err != nil {
return err return err
} }
p.freeConns <- newcn p.freeConns.Push(newcn)
return nil return nil
} }
func (p *ConnPool) Remove(cn *Conn, reason error) error {
p.storeLastErr(reason.Error())
_ = cn.Close()
return p.conns.Remove(cn)
}
// Len returns total number of connections. // Len returns total number of connections.
func (p *ConnPool) Len() int { func (p *ConnPool) Len() int {
return p.conns.Len() return p.conns.Len()
@ -234,7 +238,7 @@ func (p *ConnPool) Len() int {
// FreeLen returns number of free connections. // FreeLen returns number of free connections.
func (p *ConnPool) FreeLen() int { func (p *ConnPool) FreeLen() int {
return len(p.freeConns) return p.freeConns.Len()
} }
func (p *ConnPool) Stats() *PoolStats { func (p *ConnPool) Stats() *PoolStats {
@ -264,6 +268,20 @@ func (p *ConnPool) Close() (retErr error) {
return retErr return retErr
} }
func (p *ConnPool) ReapStaleConns() (n int, err error) {
for {
cn := p.freeConns.ShiftStale(p.idleTimeout)
if cn == nil {
break
}
if err = p.Remove(cn, errors.New("connection is stale")); err != nil {
return
}
n++
}
return
}
func (p *ConnPool) reaper() { func (p *ConnPool) reaper() {
ticker := time.NewTicker(time.Minute) ticker := time.NewTicker(time.Minute)
defer ticker.Stop() defer ticker.Stop()
@ -272,12 +290,11 @@ func (p *ConnPool) reaper() {
if p.closed() { if p.closed() {
break break
} }
n, err := p.ReapStaleConns()
// pool.First removes idle connections from the pool and if err != nil {
// returns first non-idle connection. So just put returned Logger.Printf("ReapStaleConns failed: %s", err)
// connection back. } else if n > 0 {
if cn := p.First(); cn != nil { Logger.Printf("removed %d stale connections", n)
p.Put(cn)
} }
} }
} }

View File

@ -0,0 +1,93 @@
package pool_test
import (
"errors"
"net"
"testing"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"gopkg.in/redis.v3/internal/pool"
)
func TestGinkgoSuite(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "pool")
}
var _ = Describe("conns reapser", func() {
var connPool *pool.ConnPool
BeforeEach(func() {
dial := func() (net.Conn, error) {
return &net.TCPConn{}, nil
}
connPool = pool.NewConnPool(dial, 10, 0, time.Minute)
// add stale connections
for i := 0; i < 3; i++ {
cn := pool.NewConn(&net.TCPConn{})
cn.UsedAt = time.Now().Add(-2 * time.Minute)
Expect(connPool.Add(cn)).To(BeTrue())
}
// add fresh connections
for i := 0; i < 3; i++ {
cn := pool.NewConn(&net.TCPConn{})
Expect(connPool.Add(cn)).To(BeTrue())
}
Expect(connPool.Len()).To(Equal(6))
Expect(connPool.FreeLen()).To(Equal(6))
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(3))
})
It("reaps stale connections", func() {
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(3))
})
It("pool is functional", func() {
for j := 0; j < 3; j++ {
var freeCns []*pool.Conn
for i := 0; i < 3; i++ {
cn := connPool.First()
Expect(cn).NotTo(BeNil())
freeCns = append(freeCns, cn)
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(0))
cn := connPool.First()
Expect(cn).To(BeNil())
cn, isNew, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
Expect(isNew).To(BeTrue())
Expect(cn).NotTo(BeNil())
Expect(connPool.Len()).To(Equal(4))
Expect(connPool.FreeLen()).To(Equal(0))
err = connPool.Remove(cn, errors.New("test"))
Expect(err).NotTo(HaveOccurred())
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(0))
for _, cn := range freeCns {
err := connPool.Put(cn)
Expect(err).NotTo(HaveOccurred())
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.FreeLen()).To(Equal(3))
}
})
})