From 1bb55e3a9a686c082ec9c8e44e8298a6ceb011b2 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Sat, 12 Mar 2016 13:39:50 +0200 Subject: [PATCH 1/2] Make free-connection stack a LIFO. --- internal/pool/conn.go | 4 +++ internal/pool/conn_stack.go | 72 +++++++++++++++++++++++++++++++++++++ internal/pool/pool.go | 64 ++++++++++++++------------------- 3 files changed, 103 insertions(+), 37 deletions(-) create mode 100644 internal/pool/conn_stack.go diff --git a/internal/pool/conn.go b/internal/pool/conn.go index c5a539b..1f1d4a9 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -35,6 +35,10 @@ func NewConn(netConn net.Conn) *Conn { return cn } +func (cn *Conn) IsStale(timeout time.Duration) bool { + return timeout > 0 && time.Since(cn.UsedAt) > timeout +} + func (cn *Conn) SetNetConn(netConn net.Conn) { cn.netConn = netConn cn.UsedAt = time.Now() diff --git a/internal/pool/conn_stack.go b/internal/pool/conn_stack.go new file mode 100644 index 0000000..047bc85 --- /dev/null +++ b/internal/pool/conn_stack.go @@ -0,0 +1,72 @@ +package pool + +import ( + "sync" + "time" +) + +// connStack is used as a LIFO to maintain free connections +type connStack struct { + cns []*Conn + free chan struct{} + mu sync.Mutex +} + +func newConnStack(max int) *connStack { + return &connStack{ + cns: make([]*Conn, 0, max), + free: make(chan struct{}, max), + } +} + +func (s *connStack) Len() int { return len(s.free) } + +func (s *connStack) Push(cn *Conn) { + s.mu.Lock() + s.cns = append(s.cns, cn) + s.mu.Unlock() + s.free <- struct{}{} +} + +func (s *connStack) ShiftStale(timeout time.Duration) *Conn { + select { + case <-s.free: + s.mu.Lock() + defer s.mu.Unlock() + + if cn := s.cns[0]; cn.IsStale(timeout) { + copy(s.cns, s.cns[1:]) + s.cns = s.cns[:len(s.cns)-1] + return cn + } + return nil + default: + return nil + } +} + +func (s *connStack) Pop() *Conn { + select { + case <-s.free: + return s.pop() + default: + return nil + } +} + +func (s *connStack) PopWithTimeout(d time.Duration) *Conn { + select { + case <-s.free: + return s.pop() + case <-time.After(d): + return nil + } +} + +func (s *connStack) pop() (cn *Conn) { + s.mu.Lock() + ci := len(s.cns) - 1 + cn, s.cns = s.cns[ci], s.cns[:ci] + s.mu.Unlock() + return +} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 60b9c40..d5c91db 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -50,7 +50,7 @@ type ConnPool struct { idleTimeout time.Duration conns *connList - freeConns chan *Conn + freeConns *connStack stats PoolStats _closed int32 @@ -67,7 +67,7 @@ func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout time.Durati idleTimeout: idleTimeout, conns: newConnList(poolSize), - freeConns: make(chan *Conn, poolSize), + freeConns: newConnStack(poolSize), } if idleTimeout > 0 { go p.reaper() @@ -87,44 +87,33 @@ func (p *ConnPool) isIdle(cn *Conn) bool { // there are no connections. func (p *ConnPool) First() *Conn { for { - select { - case cn := <-p.freeConns: - if p.isIdle(cn) { - var err error - cn, err = p.replace(cn) - if err != nil { - Logger.Printf("pool.replace failed: %s", err) - continue - } + cn := p.freeConns.Pop() + if cn != nil && cn.IsStale(p.idleTimeout) { + var err error + cn, err = p.replace(cn) + if err != nil { + Logger.Printf("pool.replace failed: %s", err) + continue } - return cn - default: - return nil } + return cn } - panic("not reached") } // wait waits for free non-idle connection. It returns nil on timeout. func (p *ConnPool) wait() *Conn { - deadline := time.After(p.poolTimeout) for { - select { - case cn := <-p.freeConns: - if p.isIdle(cn) { - var err error - cn, err = p.replace(cn) - if err != nil { - Logger.Printf("pool.replace failed: %s", err) - continue - } + cn := p.freeConns.PopWithTimeout(p.poolTimeout) + if cn != nil && cn.IsStale(p.idleTimeout) { + var err error + cn, err = p.replace(cn) + if err != nil { + Logger.Printf("pool.replace failed: %s", err) + continue } - return cn - case <-deadline: - return nil } + return cn } - panic("not reached") } func (p *ConnPool) dial() (net.Conn, error) { @@ -198,7 +187,7 @@ func (p *ConnPool) Put(cn *Conn) error { Logger.Print(err) return p.Replace(cn, err) } - p.freeConns <- cn + p.freeConns.Push(cn) return nil } @@ -223,7 +212,7 @@ func (p *ConnPool) Replace(cn *Conn, reason error) error { if err != nil { return err } - p.freeConns <- newcn + p.freeConns.Push(newcn) return nil } @@ -234,7 +223,7 @@ func (p *ConnPool) Len() int { // FreeLen returns number of free connections. func (p *ConnPool) FreeLen() int { - return len(p.freeConns) + return p.freeConns.Len() } func (p *ConnPool) Stats() *PoolStats { @@ -273,11 +262,12 @@ func (p *ConnPool) reaper() { break } - // pool.First removes idle connections from the pool and - // returns first non-idle connection. So just put returned - // connection back. - if cn := p.First(); cn != nil { - p.Put(cn) + for { + cn := p.freeConns.ShiftStale(p.idleTimeout) + if cn == nil { + break + } + _ = p.conns.Remove(cn) } } } From ef5ccc12aef8eff5c5731537192b1b1cb8ba74c0 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 12 Mar 2016 14:42:12 +0200 Subject: [PATCH 2/2] Add tests for conn reaper. --- example_test.go | 2 +- internal/pool/conn_stack.go | 6 ++- internal/pool/pool.go | 41 +++++++++++++--- internal/pool/pool_test.go | 93 +++++++++++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 10 deletions(-) create mode 100644 internal/pool/pool_test.go diff --git a/example_test.go b/example_test.go index bb98fdd..c93e0b7 100644 --- a/example_test.go +++ b/example_test.go @@ -254,7 +254,7 @@ func ExamplePubSub_Receive() { for i := 0; i < 2; i++ { // ReceiveTimeout is a low level API. Use ReceiveMessage instead. - msgi, err := pubsub.ReceiveTimeout(500 * time.Millisecond) + msgi, err := pubsub.ReceiveTimeout(time.Second) if err != nil { panic(err) } diff --git a/internal/pool/conn_stack.go b/internal/pool/conn_stack.go index 047bc85..a26ab0e 100644 --- a/internal/pool/conn_stack.go +++ b/internal/pool/conn_stack.go @@ -28,17 +28,19 @@ func (s *connStack) Push(cn *Conn) { s.free <- struct{}{} } -func (s *connStack) ShiftStale(timeout time.Duration) *Conn { +func (s *connStack) ShiftStale(idleTimeout time.Duration) *Conn { select { case <-s.free: s.mu.Lock() defer s.mu.Unlock() - if cn := s.cns[0]; cn.IsStale(timeout) { + if cn := s.cns[0]; cn.IsStale(idleTimeout) { copy(s.cns, s.cns[1:]) s.cns = s.cns[:len(s.cns)-1] return cn } + + s.free <- struct{}{} return nil default: return nil diff --git a/internal/pool/pool.go b/internal/pool/pool.go index d5c91db..243ebea 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -83,6 +83,15 @@ func (p *ConnPool) isIdle(cn *Conn) bool { return p.idleTimeout > 0 && time.Since(cn.UsedAt) > p.idleTimeout } +func (p *ConnPool) Add(cn *Conn) bool { + if !p.conns.Reserve() { + return false + } + p.conns.Add(cn) + p.Put(cn) + return true +} + // First returns first non-idle connection from the pool or nil if // there are no connections. func (p *ConnPool) First() *Conn { @@ -216,6 +225,12 @@ func (p *ConnPool) Replace(cn *Conn, reason error) error { return nil } +func (p *ConnPool) Remove(cn *Conn, reason error) error { + p.storeLastErr(reason.Error()) + _ = cn.Close() + return p.conns.Remove(cn) +} + // Len returns total number of connections. func (p *ConnPool) Len() int { return p.conns.Len() @@ -253,6 +268,20 @@ func (p *ConnPool) Close() (retErr error) { return retErr } +func (p *ConnPool) ReapStaleConns() (n int, err error) { + for { + cn := p.freeConns.ShiftStale(p.idleTimeout) + if cn == nil { + break + } + if err = p.Remove(cn, errors.New("connection is stale")); err != nil { + return + } + n++ + } + return +} + func (p *ConnPool) reaper() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() @@ -261,13 +290,11 @@ func (p *ConnPool) reaper() { if p.closed() { break } - - for { - cn := p.freeConns.ShiftStale(p.idleTimeout) - if cn == nil { - break - } - _ = p.conns.Remove(cn) + n, err := p.ReapStaleConns() + if err != nil { + Logger.Printf("ReapStaleConns failed: %s", err) + } else if n > 0 { + Logger.Printf("removed %d stale connections", n) } } } diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go new file mode 100644 index 0000000..07d3a52 --- /dev/null +++ b/internal/pool/pool_test.go @@ -0,0 +1,93 @@ +package pool_test + +import ( + "errors" + "net" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "gopkg.in/redis.v3/internal/pool" +) + +func TestGinkgoSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "pool") +} + +var _ = Describe("conns reapser", func() { + var connPool *pool.ConnPool + + BeforeEach(func() { + dial := func() (net.Conn, error) { + return &net.TCPConn{}, nil + } + connPool = pool.NewConnPool(dial, 10, 0, time.Minute) + + // add stale connections + for i := 0; i < 3; i++ { + cn := pool.NewConn(&net.TCPConn{}) + cn.UsedAt = time.Now().Add(-2 * time.Minute) + Expect(connPool.Add(cn)).To(BeTrue()) + } + + // add fresh connections + for i := 0; i < 3; i++ { + cn := pool.NewConn(&net.TCPConn{}) + Expect(connPool.Add(cn)).To(BeTrue()) + } + + Expect(connPool.Len()).To(Equal(6)) + Expect(connPool.FreeLen()).To(Equal(6)) + + n, err := connPool.ReapStaleConns() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(3)) + }) + + It("reaps stale connections", func() { + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.FreeLen()).To(Equal(3)) + }) + + It("pool is functional", func() { + for j := 0; j < 3; j++ { + var freeCns []*pool.Conn + for i := 0; i < 3; i++ { + cn := connPool.First() + Expect(cn).NotTo(BeNil()) + freeCns = append(freeCns, cn) + } + + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.FreeLen()).To(Equal(0)) + + cn := connPool.First() + Expect(cn).To(BeNil()) + + cn, isNew, err := connPool.Get() + Expect(err).NotTo(HaveOccurred()) + Expect(isNew).To(BeTrue()) + Expect(cn).NotTo(BeNil()) + + Expect(connPool.Len()).To(Equal(4)) + Expect(connPool.FreeLen()).To(Equal(0)) + + err = connPool.Remove(cn, errors.New("test")) + Expect(err).NotTo(HaveOccurred()) + + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.FreeLen()).To(Equal(0)) + + for _, cn := range freeCns { + err := connPool.Put(cn) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.FreeLen()).To(Equal(3)) + } + }) +})