Add MaxConnAge

This commit is contained in:
Vladimir Mihailenco 2018-08-12 10:08:21 +03:00
parent 52d9bc3a1e
commit 651e9fef1d
12 changed files with 170 additions and 123 deletions

View File

@ -1,5 +1,11 @@
# Changelog
## Unreleased
- New option MinIdleConns.
- New option MaxConnAge.
- PoolStats.FreeConns is renamed to PoolStats.IdleConns.
## v6.13
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards.

View File

@ -65,6 +65,8 @@ type ClusterOptions struct {
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@ -130,10 +132,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
TLSConfig: opt.TLSConfig,
@ -1106,7 +1109,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@ -1117,7 +1120,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}

View File

@ -557,13 +557,13 @@ var _ = Describe("ClusterClient", func() {
It("removes idle connections", func() {
stats := client.PoolStats()
Expect(stats.TotalConns).NotTo(BeZero())
Expect(stats.FreeConns).NotTo(BeZero())
Expect(stats.IdleConns).NotTo(BeZero())
time.Sleep(2 * time.Second)
stats = client.PoolStats()
Expect(stats.TotalConns).To(BeZero())
Expect(stats.FreeConns).To(BeZero())
Expect(stats.IdleConns).To(BeZero())
})
It("returns an error when there are no attempts left", func() {

View File

@ -42,7 +42,7 @@ var _ = Describe("Commands", func() {
Expect(stats.Misses).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
Expect(stats.TotalConns).To(Equal(uint32(1)))
Expect(stats.FreeConns).To(Equal(uint32(1)))
Expect(stats.IdleConns).To(Equal(uint32(1)))
})
It("should Echo", func() {

View File

@ -18,9 +18,9 @@ type Conn struct {
concurrentReadWrite bool
Inited bool
pooled bool
usedAt atomic.Value
InitedAt time.Time
pooled bool
usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
@ -47,10 +47,6 @@ func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.Rd.Reset(netConn)
}
func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt()) > timeout
}
func (cn *Conn) SetReadTimeout(timeout time.Duration) {
now := time.Now()
cn.SetUsedAt(now)

View File

@ -28,7 +28,6 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
FreeConns uint32 // deprecated - use IdleConns
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@ -54,6 +53,7 @@ type Options struct {
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@ -223,8 +223,8 @@ func (p *ConnPool) Get() (*Conn, error) {
break
}
if cn.IsStale(p.opt.IdleTimeout) {
p.CloseConn(cn)
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
@ -343,12 +343,12 @@ func (p *ConnPool) closeConn(cn *Conn) error {
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
n := p.poolSize
n := len(p.conns)
p.connsMu.Unlock()
return n
}
// FreeLen returns number of idle connections.
// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
p.connsMu.Lock()
n := p.idleConnsLen
@ -364,7 +364,6 @@ func (p *ConnPool) Stats() *Stats {
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
FreeConns: uint32(idleLen),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
@ -415,7 +414,7 @@ func (p *ConnPool) reapStaleConn() *Conn {
}
cn := p.idleConns[0]
if !cn.IsStale(p.opt.IdleTimeout) {
if !p.isStaleConn(cn) {
return nil
}
@ -466,3 +465,19 @@ func (p *ConnPool) reaper(frequency time.Duration) {
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
}
}
func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
return false
}
now := time.Now()
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true
}
if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
return true
}
return false
}

View File

@ -248,114 +248,128 @@ var _ = Describe("MinIdleConns", func() {
var _ = Describe("conns reaper", func() {
const idleTimeout = time.Minute
const maxAge = time.Hour
var connPool *pool.ConnPool
var conns, idleConns, closedConns []*pool.Conn
var conns, staleConns, closedConns []*pool.Conn
BeforeEach(func() {
conns = nil
closedConns = nil
assert := func(typ string) {
BeforeEach(func() {
closedConns = nil
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
IdleTimeout: idleTimeout,
MaxConnAge: maxAge,
PoolTimeout: time.Second,
IdleCheckFrequency: time.Hour,
OnClose: func(cn *pool.Conn) error {
closedConns = append(closedConns, cn)
return nil
},
})
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Second,
IdleTimeout: idleTimeout,
IdleCheckFrequency: time.Hour,
conns = nil
OnClose: func(cn *pool.Conn) error {
closedConns = append(closedConns, cn)
return nil
},
})
// add stale connections
idleConns = nil
for i := 0; i < 3; i++ {
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
conns = append(conns, cn)
idleConns = append(idleConns, cn)
}
// add fresh connections
for i := 0; i < 3; i++ {
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
conns = append(conns, cn)
}
for _, cn := range conns {
connPool.Put(cn)
}
Expect(connPool.Len()).To(Equal(6))
Expect(connPool.IdleLen()).To(Equal(6))
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(3))
})
AfterEach(func() {
_ = connPool.Close()
Expect(connPool.Len()).To(Equal(0))
Expect(connPool.IdleLen()).To(Equal(0))
Expect(len(closedConns)).To(Equal(len(conns)))
Expect(closedConns).To(ConsistOf(conns))
})
It("reaps stale connections", func() {
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(3))
})
It("does not reap fresh connections", func() {
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(0))
})
It("stale connections are closed", func() {
Expect(len(closedConns)).To(Equal(len(idleConns)))
Expect(closedConns).To(ConsistOf(idleConns))
})
It("pool is functional", func() {
for j := 0; j < 3; j++ {
var freeCns []*pool.Conn
// add stale connections
staleConns = nil
for i := 0; i < 3; i++ {
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil())
freeCns = append(freeCns, cn)
switch typ {
case "idle":
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
case "aged":
cn.InitedAt = time.Now().Add(-2 * maxAge)
}
conns = append(conns, cn)
staleConns = append(staleConns, cn)
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0))
// add fresh connections
for i := 0; i < 3; i++ {
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
conns = append(conns, cn)
}
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil())
conns = append(conns, cn)
Expect(connPool.Len()).To(Equal(4))
Expect(connPool.IdleLen()).To(Equal(0))
connPool.Remove(cn)
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0))
for _, cn := range freeCns {
for _, cn := range conns {
if cn.InitedAt.IsZero() {
cn.InitedAt = time.Now()
}
connPool.Put(cn)
}
Expect(connPool.Len()).To(Equal(6))
Expect(connPool.IdleLen()).To(Equal(6))
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(3))
})
AfterEach(func() {
_ = connPool.Close()
Expect(connPool.Len()).To(Equal(0))
Expect(connPool.IdleLen()).To(Equal(0))
Expect(len(closedConns)).To(Equal(len(conns)))
Expect(closedConns).To(ConsistOf(conns))
})
It("reaps stale connections", func() {
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(3))
}
})
})
It("does not reap fresh connections", func() {
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(0))
})
It("stale connections are closed", func() {
Expect(len(closedConns)).To(Equal(len(staleConns)))
Expect(closedConns).To(ConsistOf(staleConns))
})
It("pool is functional", func() {
for j := 0; j < 3; j++ {
var freeCns []*pool.Conn
for i := 0; i < 3; i++ {
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil())
freeCns = append(freeCns, cn)
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0))
cn, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil())
conns = append(conns, cn)
Expect(connPool.Len()).To(Equal(4))
Expect(connPool.IdleLen()).To(Equal(0))
connPool.Remove(cn)
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0))
for _, cn := range freeCns {
connPool.Put(cn)
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(3))
}
})
}
assert("idle")
assert("aged")
})
var _ = Describe("race", func() {

View File

@ -62,6 +62,9 @@ type Options struct {
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int
// Connection age at which client retires (closes) the connection.
// Default is to not close aged connections.
MaxConnAge time.Duration
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
@ -201,6 +204,7 @@ func newConnPool(opt *Options) *pool.ConnPool {
Dialer: opt.Dialer,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,

View File

@ -13,7 +13,10 @@ var _ = Describe("pool", func() {
var client *redis.Client
BeforeEach(func() {
client = redis.NewClient(redisOptions())
opt := redisOptions()
opt.MinIdleConns = 0
opt.MaxConnAge = 0
client = redis.NewClient(opt)
Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
})
@ -124,7 +127,6 @@ var _ = Describe("pool", func() {
Misses: 1,
Timeouts: 0,
TotalConns: 1,
FreeConns: 1,
IdleConns: 1,
StaleConns: 0,
}))
@ -137,7 +139,7 @@ var _ = Describe("pool", func() {
Misses: 1,
Timeouts: 0,
TotalConns: 0,
FreeConns: 0,
IdleConns: 0,
StaleConns: 1,
}))
})

View File

@ -16,7 +16,10 @@ var _ = Describe("PubSub", func() {
var client *redis.Client
BeforeEach(func() {
client = redis.NewClient(redisOptions())
opt := redisOptions()
opt.MinIdleConns = 0
opt.MaxConnAge = 0
client = redis.NewClient(opt)
Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
})

View File

@ -50,7 +50,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return nil, err
}
if !cn.Inited {
if cn.InitedAt.IsZero() {
if err := c.initConn(cn); err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
@ -66,7 +66,7 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
return nil, err
}
if !cn.Inited {
if cn.InitedAt.IsZero() {
err := c.initConn(cn)
if err != nil {
c.connPool.Remove(cn)
@ -88,7 +88,7 @@ func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
}
func (c *baseClient) initConn(cn *pool.Conn) error {
cn.Inited = true
cn.InitedAt = time.Now()
if c.opt.Password == "" &&
c.opt.DB == 0 &&

View File

@ -68,6 +68,8 @@ type RingOptions struct {
WriteTimeout time.Duration
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options {
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
@ -404,7 +408,7 @@ func (c *Ring) PoolStats() *PoolStats {
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
acc.IdleConns += s.IdleConns
}
return &acc
}