From 1bb55e3a9a686c082ec9c8e44e8298a6ceb011b2 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Sat, 12 Mar 2016 13:39:50 +0200 Subject: [PATCH] Make free-connection stack a LIFO. --- internal/pool/conn.go | 4 +++ internal/pool/conn_stack.go | 72 +++++++++++++++++++++++++++++++++++++ internal/pool/pool.go | 64 ++++++++++++++------------------- 3 files changed, 103 insertions(+), 37 deletions(-) create mode 100644 internal/pool/conn_stack.go diff --git a/internal/pool/conn.go b/internal/pool/conn.go index c5a539b..1f1d4a9 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -35,6 +35,10 @@ func NewConn(netConn net.Conn) *Conn { return cn } +func (cn *Conn) IsStale(timeout time.Duration) bool { + return timeout > 0 && time.Since(cn.UsedAt) > timeout +} + func (cn *Conn) SetNetConn(netConn net.Conn) { cn.netConn = netConn cn.UsedAt = time.Now() diff --git a/internal/pool/conn_stack.go b/internal/pool/conn_stack.go new file mode 100644 index 0000000..047bc85 --- /dev/null +++ b/internal/pool/conn_stack.go @@ -0,0 +1,72 @@ +package pool + +import ( + "sync" + "time" +) + +// connStack is used as a LIFO to maintain free connections +type connStack struct { + cns []*Conn + free chan struct{} + mu sync.Mutex +} + +func newConnStack(max int) *connStack { + return &connStack{ + cns: make([]*Conn, 0, max), + free: make(chan struct{}, max), + } +} + +func (s *connStack) Len() int { return len(s.free) } + +func (s *connStack) Push(cn *Conn) { + s.mu.Lock() + s.cns = append(s.cns, cn) + s.mu.Unlock() + s.free <- struct{}{} +} + +func (s *connStack) ShiftStale(timeout time.Duration) *Conn { + select { + case <-s.free: + s.mu.Lock() + defer s.mu.Unlock() + + if cn := s.cns[0]; cn.IsStale(timeout) { + copy(s.cns, s.cns[1:]) + s.cns = s.cns[:len(s.cns)-1] + return cn + } + return nil + default: + return nil + } +} + +func (s *connStack) Pop() *Conn { + select { + case <-s.free: + return s.pop() + default: + return nil + } +} + +func (s *connStack) PopWithTimeout(d time.Duration) *Conn { + select { + case <-s.free: + return s.pop() + case <-time.After(d): + return nil + } +} + +func (s *connStack) pop() (cn *Conn) { + s.mu.Lock() + ci := len(s.cns) - 1 + cn, s.cns = s.cns[ci], s.cns[:ci] + s.mu.Unlock() + return +} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 60b9c40..d5c91db 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -50,7 +50,7 @@ type ConnPool struct { idleTimeout time.Duration conns *connList - freeConns chan *Conn + freeConns *connStack stats PoolStats _closed int32 @@ -67,7 +67,7 @@ func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout time.Durati idleTimeout: idleTimeout, conns: newConnList(poolSize), - freeConns: make(chan *Conn, poolSize), + freeConns: newConnStack(poolSize), } if idleTimeout > 0 { go p.reaper() @@ -87,44 +87,33 @@ func (p *ConnPool) isIdle(cn *Conn) bool { // there are no connections. func (p *ConnPool) First() *Conn { for { - select { - case cn := <-p.freeConns: - if p.isIdle(cn) { - var err error - cn, err = p.replace(cn) - if err != nil { - Logger.Printf("pool.replace failed: %s", err) - continue - } + cn := p.freeConns.Pop() + if cn != nil && cn.IsStale(p.idleTimeout) { + var err error + cn, err = p.replace(cn) + if err != nil { + Logger.Printf("pool.replace failed: %s", err) + continue } - return cn - default: - return nil } + return cn } - panic("not reached") } // wait waits for free non-idle connection. It returns nil on timeout. func (p *ConnPool) wait() *Conn { - deadline := time.After(p.poolTimeout) for { - select { - case cn := <-p.freeConns: - if p.isIdle(cn) { - var err error - cn, err = p.replace(cn) - if err != nil { - Logger.Printf("pool.replace failed: %s", err) - continue - } + cn := p.freeConns.PopWithTimeout(p.poolTimeout) + if cn != nil && cn.IsStale(p.idleTimeout) { + var err error + cn, err = p.replace(cn) + if err != nil { + Logger.Printf("pool.replace failed: %s", err) + continue } - return cn - case <-deadline: - return nil } + return cn } - panic("not reached") } func (p *ConnPool) dial() (net.Conn, error) { @@ -198,7 +187,7 @@ func (p *ConnPool) Put(cn *Conn) error { Logger.Print(err) return p.Replace(cn, err) } - p.freeConns <- cn + p.freeConns.Push(cn) return nil } @@ -223,7 +212,7 @@ func (p *ConnPool) Replace(cn *Conn, reason error) error { if err != nil { return err } - p.freeConns <- newcn + p.freeConns.Push(newcn) return nil } @@ -234,7 +223,7 @@ func (p *ConnPool) Len() int { // FreeLen returns number of free connections. func (p *ConnPool) FreeLen() int { - return len(p.freeConns) + return p.freeConns.Len() } func (p *ConnPool) Stats() *PoolStats { @@ -273,11 +262,12 @@ func (p *ConnPool) reaper() { break } - // pool.First removes idle connections from the pool and - // returns first non-idle connection. So just put returned - // connection back. - if cn := p.First(); cn != nil { - p.Put(cn) + for { + cn := p.freeConns.ShiftStale(p.idleTimeout) + if cn == nil { + break + } + _ = p.conns.Remove(cn) } } }