From 651e9fef1d0bb1fe18504bb7194ae8a9f396c7d7 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sun, 12 Aug 2018 10:08:21 +0300 Subject: [PATCH] Add MaxConnAge --- CHANGELOG.md | 6 ++ cluster.go | 15 +-- cluster_test.go | 4 +- commands_test.go | 2 +- internal/pool/conn.go | 10 +- internal/pool/pool.go | 29 ++++-- internal/pool/pool_test.go | 198 ++++++++++++++++++++----------------- options.go | 4 + pool_test.go | 8 +- pubsub_test.go | 5 +- redis.go | 6 +- ring.go | 6 +- 12 files changed, 170 insertions(+), 123 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb0e1b8e..02cb85de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +- New option MinIdleConns. +- New option MaxConnAge. +- PoolStats.FreeConns is renamed to PoolStats.IdleConns. + ## v6.13 - Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. diff --git a/cluster.go b/cluster.go index 8c404c9c..863e69a2 100644 --- a/cluster.go +++ b/cluster.go @@ -65,6 +65,8 @@ type ClusterOptions struct { // PoolSize applies per cluster node and not for the whole cluster. PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -130,10 +132,11 @@ func (opt *ClusterOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: disableIdleCheck, TLSConfig: opt.TLSConfig, @@ -1106,7 +1109,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1117,7 +1120,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } diff --git a/cluster_test.go b/cluster_test.go index 03f75337..a37f0561 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -557,13 +557,13 @@ var _ = Describe("ClusterClient", func() { It("removes idle connections", func() { stats := client.PoolStats() Expect(stats.TotalConns).NotTo(BeZero()) - Expect(stats.FreeConns).NotTo(BeZero()) + Expect(stats.IdleConns).NotTo(BeZero()) time.Sleep(2 * time.Second) stats = client.PoolStats() Expect(stats.TotalConns).To(BeZero()) - Expect(stats.FreeConns).To(BeZero()) + Expect(stats.IdleConns).To(BeZero()) }) It("returns an error when there are no attempts left", func() { diff --git a/commands_test.go b/commands_test.go index 9b3847e1..955621d5 100644 --- a/commands_test.go +++ b/commands_test.go @@ -42,7 +42,7 @@ var _ = Describe("Commands", func() { Expect(stats.Misses).To(Equal(uint32(1))) Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.TotalConns).To(Equal(uint32(1))) - Expect(stats.FreeConns).To(Equal(uint32(1))) + Expect(stats.IdleConns).To(Equal(uint32(1))) }) It("should Echo", func() { diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 7a4eea9f..4e3a608e 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -18,9 +18,9 @@ type Conn struct { concurrentReadWrite bool - Inited bool - pooled bool - usedAt atomic.Value + InitedAt time.Time + pooled bool + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { @@ -47,10 +47,6 @@ func (cn *Conn) SetNetConn(netConn net.Conn) { cn.Rd.Reset(netConn) } -func (cn *Conn) IsStale(timeout time.Duration) bool { - return timeout > 0 && time.Since(cn.UsedAt()) > timeout -} - func (cn *Conn) SetReadTimeout(timeout time.Duration) { now := time.Now() cn.SetUsedAt(now) diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 2125acca..c39ac9f5 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -28,7 +28,6 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // deprecated - use IdleConns IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -54,6 +53,7 @@ type Options struct { PoolSize int MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -223,8 +223,8 @@ func (p *ConnPool) Get() (*Conn, error) { break } - if cn.IsStale(p.opt.IdleTimeout) { - p.CloseConn(cn) + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) continue } @@ -343,12 +343,12 @@ func (p *ConnPool) closeConn(cn *Conn) error { // Len returns total number of connections. func (p *ConnPool) Len() int { p.connsMu.Lock() - n := p.poolSize + n := len(p.conns) p.connsMu.Unlock() return n } -// FreeLen returns number of idle connections. +// IdleLen returns number of idle connections. func (p *ConnPool) IdleLen() int { p.connsMu.Lock() n := p.idleConnsLen @@ -364,7 +364,6 @@ func (p *ConnPool) Stats() *Stats { Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(idleLen), IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } @@ -415,7 +414,7 @@ func (p *ConnPool) reapStaleConn() *Conn { } cn := p.idleConns[0] - if !cn.IsStale(p.opt.IdleTimeout) { + if !p.isStaleConn(cn) { return nil } @@ -466,3 +465,19 @@ func (p *ConnPool) reaper(frequency time.Duration) { atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 6f49afbd..0250f9d6 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -248,114 +248,128 @@ var _ = Describe("MinIdleConns", func() { var _ = Describe("conns reaper", func() { const idleTimeout = time.Minute + const maxAge = time.Hour var connPool *pool.ConnPool - var conns, idleConns, closedConns []*pool.Conn + var conns, staleConns, closedConns []*pool.Conn - BeforeEach(func() { - conns = nil - closedConns = nil + assert := func(typ string) { + BeforeEach(func() { + closedConns = nil + connPool = pool.NewConnPool(&pool.Options{ + Dialer: dummyDialer, + PoolSize: 10, + IdleTimeout: idleTimeout, + MaxConnAge: maxAge, + PoolTimeout: time.Second, + IdleCheckFrequency: time.Hour, + OnClose: func(cn *pool.Conn) error { + closedConns = append(closedConns, cn) + return nil + }, + }) - connPool = pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: 10, - PoolTimeout: time.Second, - IdleTimeout: idleTimeout, - IdleCheckFrequency: time.Hour, + conns = nil - OnClose: func(cn *pool.Conn) error { - closedConns = append(closedConns, cn) - return nil - }, - }) - - // add stale connections - idleConns = nil - for i := 0; i < 3; i++ { - cn, err := connPool.Get() - Expect(err).NotTo(HaveOccurred()) - cn.SetUsedAt(time.Now().Add(-2 * idleTimeout)) - conns = append(conns, cn) - idleConns = append(idleConns, cn) - } - - // add fresh connections - for i := 0; i < 3; i++ { - cn, err := connPool.Get() - Expect(err).NotTo(HaveOccurred()) - conns = append(conns, cn) - } - - for _, cn := range conns { - connPool.Put(cn) - } - - Expect(connPool.Len()).To(Equal(6)) - Expect(connPool.IdleLen()).To(Equal(6)) - - n, err := connPool.ReapStaleConns() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(3)) - }) - - AfterEach(func() { - _ = connPool.Close() - Expect(connPool.Len()).To(Equal(0)) - Expect(connPool.IdleLen()).To(Equal(0)) - Expect(len(closedConns)).To(Equal(len(conns))) - Expect(closedConns).To(ConsistOf(conns)) - }) - - It("reaps stale connections", func() { - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(3)) - }) - - It("does not reap fresh connections", func() { - n, err := connPool.ReapStaleConns() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(0)) - }) - - It("stale connections are closed", func() { - Expect(len(closedConns)).To(Equal(len(idleConns))) - Expect(closedConns).To(ConsistOf(idleConns)) - }) - - It("pool is functional", func() { - for j := 0; j < 3; j++ { - var freeCns []*pool.Conn + // add stale connections + staleConns = nil for i := 0; i < 3; i++ { cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) - Expect(cn).NotTo(BeNil()) - freeCns = append(freeCns, cn) + switch typ { + case "idle": + cn.SetUsedAt(time.Now().Add(-2 * idleTimeout)) + case "aged": + cn.InitedAt = time.Now().Add(-2 * maxAge) + } + conns = append(conns, cn) + staleConns = append(staleConns, cn) } - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(0)) + // add fresh connections + for i := 0; i < 3; i++ { + cn, err := connPool.Get() + Expect(err).NotTo(HaveOccurred()) + conns = append(conns, cn) + } - cn, err := connPool.Get() - Expect(err).NotTo(HaveOccurred()) - Expect(cn).NotTo(BeNil()) - conns = append(conns, cn) - - Expect(connPool.Len()).To(Equal(4)) - Expect(connPool.IdleLen()).To(Equal(0)) - - connPool.Remove(cn) - - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(0)) - - for _, cn := range freeCns { + for _, cn := range conns { + if cn.InitedAt.IsZero() { + cn.InitedAt = time.Now() + } connPool.Put(cn) } + Expect(connPool.Len()).To(Equal(6)) + Expect(connPool.IdleLen()).To(Equal(6)) + + n, err := connPool.ReapStaleConns() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(3)) + }) + + AfterEach(func() { + _ = connPool.Close() + Expect(connPool.Len()).To(Equal(0)) + Expect(connPool.IdleLen()).To(Equal(0)) + Expect(len(closedConns)).To(Equal(len(conns))) + Expect(closedConns).To(ConsistOf(conns)) + }) + + It("reaps stale connections", func() { Expect(connPool.Len()).To(Equal(3)) Expect(connPool.IdleLen()).To(Equal(3)) - } - }) + }) + + It("does not reap fresh connections", func() { + n, err := connPool.ReapStaleConns() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(0)) + }) + + It("stale connections are closed", func() { + Expect(len(closedConns)).To(Equal(len(staleConns))) + Expect(closedConns).To(ConsistOf(staleConns)) + }) + + It("pool is functional", func() { + for j := 0; j < 3; j++ { + var freeCns []*pool.Conn + for i := 0; i < 3; i++ { + cn, err := connPool.Get() + Expect(err).NotTo(HaveOccurred()) + Expect(cn).NotTo(BeNil()) + freeCns = append(freeCns, cn) + } + + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.IdleLen()).To(Equal(0)) + + cn, err := connPool.Get() + Expect(err).NotTo(HaveOccurred()) + Expect(cn).NotTo(BeNil()) + conns = append(conns, cn) + + Expect(connPool.Len()).To(Equal(4)) + Expect(connPool.IdleLen()).To(Equal(0)) + + connPool.Remove(cn) + + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.IdleLen()).To(Equal(0)) + + for _, cn := range freeCns { + connPool.Put(cn) + } + + Expect(connPool.Len()).To(Equal(3)) + Expect(connPool.IdleLen()).To(Equal(3)) + } + }) + } + + assert("idle") + assert("aged") }) var _ = Describe("race", func() { diff --git a/options.go b/options.go index 8fd74541..2b5bcb58 100644 --- a/options.go +++ b/options.go @@ -62,6 +62,9 @@ type Options struct { // Minimum number of idle connections which is useful when establishing // new connection is slow. MinIdleConns int + // Connection age at which client retires (closes) the connection. + // Default is to not close aged connections. + MaxConnAge time.Duration // Amount of time client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. @@ -201,6 +204,7 @@ func newConnPool(opt *Options) *pool.ConnPool { Dialer: opt.Dialer, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, diff --git a/pool_test.go b/pool_test.go index d7dc9d01..0e1580b3 100644 --- a/pool_test.go +++ b/pool_test.go @@ -13,7 +13,10 @@ var _ = Describe("pool", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(redisOptions()) + opt := redisOptions() + opt.MinIdleConns = 0 + opt.MaxConnAge = 0 + client = redis.NewClient(opt) Expect(client.FlushDB().Err()).NotTo(HaveOccurred()) }) @@ -124,7 +127,6 @@ var _ = Describe("pool", func() { Misses: 1, Timeouts: 0, TotalConns: 1, - FreeConns: 1, IdleConns: 1, StaleConns: 0, })) @@ -137,7 +139,7 @@ var _ = Describe("pool", func() { Misses: 1, Timeouts: 0, TotalConns: 0, - FreeConns: 0, + IdleConns: 0, StaleConns: 1, })) }) diff --git a/pubsub_test.go b/pubsub_test.go index 5f1fb543..7f0021fd 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -16,7 +16,10 @@ var _ = Describe("PubSub", func() { var client *redis.Client BeforeEach(func() { - client = redis.NewClient(redisOptions()) + opt := redisOptions() + opt.MinIdleConns = 0 + opt.MaxConnAge = 0 + client = redis.NewClient(opt) Expect(client.FlushDB().Err()).NotTo(HaveOccurred()) }) diff --git a/redis.go b/redis.go index c0f142cc..4747ff97 100644 --- a/redis.go +++ b/redis.go @@ -50,7 +50,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { if err := c.initConn(cn); err != nil { _ = c.connPool.CloseConn(cn) return nil, err @@ -66,7 +66,7 @@ func (c *baseClient) getConn() (*pool.Conn, error) { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { err := c.initConn(cn) if err != nil { c.connPool.Remove(cn) @@ -88,7 +88,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { } func (c *baseClient) initConn(cn *pool.Conn) error { - cn.Inited = true + cn.InitedAt = time.Now() if c.opt.Password == "" && c.opt.DB == 0 && diff --git a/ring.go b/ring.go index ef855115..74862026 100644 --- a/ring.go +++ b/ring.go @@ -68,6 +68,8 @@ type RingOptions struct { WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options { WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, @@ -404,7 +408,7 @@ func (c *Ring) PoolStats() *PoolStats { acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns } return &acc }