Merge pull request #2171 from go-redis/fix/cleanup-pool-options

fix: remove conn reaper from the pool and uptrace option names
This commit is contained in:
Vladimir Mihailenco 2022-07-28 15:33:33 +03:00 committed by GitHub
commit 3d1e2e5bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 270 additions and 485 deletions

View File

@ -1,27 +1,44 @@
# [9.0.0-beta.1](https://github.com/go-redis/redis/compare/v8.11.5...v9.0.0-beta.1) (2022-06-04) # [9.0.0-beta.1](https://github.com/go-redis/redis/compare/v8.11.5...v9.0.0-beta.1) (2022-06-04)
### Bug Fixes ### Bug Fixes
* **#1943:** xInfoConsumer.Idle should be time.Duration instead of int64 ([#2052](https://github.com/go-redis/redis/issues/2052)) ([997ab5e](https://github.com/go-redis/redis/commit/997ab5e7e3ddf53837917013a4babbded73e944f)), closes [#1943](https://github.com/go-redis/redis/issues/1943) - **#1943:** xInfoConsumer.Idle should be time.Duration instead of int64
* add XInfoConsumers test ([6f1a1ac](https://github.com/go-redis/redis/commit/6f1a1ac284ea3f683eeb3b06a59969e8424b6376)) ([#2052](https://github.com/go-redis/redis/issues/2052))
* fix tests ([3a722be](https://github.com/go-redis/redis/commit/3a722be81180e4d2a9cf0a29dc9a1ee1421f5859)) ([997ab5e](https://github.com/go-redis/redis/commit/997ab5e7e3ddf53837917013a4babbded73e944f)),
* remove test(XInfoConsumer.idle), not a stable return value when tested. ([f5fbb36](https://github.com/go-redis/redis/commit/f5fbb367e7d9dfd7f391fc535a7387002232fa8a)) closes [#1943](https://github.com/go-redis/redis/issues/1943)
* update ChannelWithSubscriptions to accept options ([c98c5f0](https://github.com/go-redis/redis/commit/c98c5f0eebf8d254307183c2ce702a48256b718d)) - add XInfoConsumers test
* update COMMAND parser for Redis 7 ([b0bb514](https://github.com/go-redis/redis/commit/b0bb514059249e01ed7328c9094e5b8a439dfb12)) ([6f1a1ac](https://github.com/go-redis/redis/commit/6f1a1ac284ea3f683eeb3b06a59969e8424b6376))
* use redis over ssh channel([#2057](https://github.com/go-redis/redis/issues/2057)) ([#2060](https://github.com/go-redis/redis/issues/2060)) ([3961b95](https://github.com/go-redis/redis/commit/3961b9577f622a3079fe74f8fc8da12ba67a77ff)) - fix tests
([3a722be](https://github.com/go-redis/redis/commit/3a722be81180e4d2a9cf0a29dc9a1ee1421f5859))
- remove test(XInfoConsumer.idle), not a stable return value when tested.
([f5fbb36](https://github.com/go-redis/redis/commit/f5fbb367e7d9dfd7f391fc535a7387002232fa8a))
- update ChannelWithSubscriptions to accept options
([c98c5f0](https://github.com/go-redis/redis/commit/c98c5f0eebf8d254307183c2ce702a48256b718d))
- update COMMAND parser for Redis 7
([b0bb514](https://github.com/go-redis/redis/commit/b0bb514059249e01ed7328c9094e5b8a439dfb12))
- use redis over ssh channel([#2057](https://github.com/go-redis/redis/issues/2057))
([#2060](https://github.com/go-redis/redis/issues/2060))
([3961b95](https://github.com/go-redis/redis/commit/3961b9577f622a3079fe74f8fc8da12ba67a77ff))
### Features ### Features
* add ClientUnpause ([91171f5](https://github.com/go-redis/redis/commit/91171f5e19a261dc4cfbf8706626d461b6ba03e4)) - add ClientUnpause
* add NewXPendingResult for unit testing XPending ([#2066](https://github.com/go-redis/redis/issues/2066)) ([b7fd09e](https://github.com/go-redis/redis/commit/b7fd09e59479bc6ed5b3b13c4645a3620fd448a3)) ([91171f5](https://github.com/go-redis/redis/commit/91171f5e19a261dc4cfbf8706626d461b6ba03e4))
* add WriteArg and Scan net.IP([#2062](https://github.com/go-redis/redis/issues/2062)) ([7d5167e](https://github.com/go-redis/redis/commit/7d5167e8624ac1515e146ed183becb97dadb3d1a)) - add NewXPendingResult for unit testing XPending
* **pool:** add check for badConnection ([a8a7665](https://github.com/go-redis/redis/commit/a8a7665ddf8cc657c5226b1826a8ee83dab4b8c1)), closes [#2053](https://github.com/go-redis/redis/issues/2053) ([#2066](https://github.com/go-redis/redis/issues/2066))
* provide a username and password callback method, so that the plaintext username and password will not be stored in the memory, and the username and password will only be generated once when the CredentialsProvider is called. After the method is executed, the username and password strings on the stack will be released. ([#2097](https://github.com/go-redis/redis/issues/2097)) ([56a3dbc](https://github.com/go-redis/redis/commit/56a3dbc7b656525eb88e0735e239d56e04a23bee)) ([b7fd09e](https://github.com/go-redis/redis/commit/b7fd09e59479bc6ed5b3b13c4645a3620fd448a3))
* upgrade to Redis 7 ([d09c27e](https://github.com/go-redis/redis/commit/d09c27e6046129fd27b1d275e5a13a477bd7f778)) - add WriteArg and Scan net.IP([#2062](https://github.com/go-redis/redis/issues/2062))
([7d5167e](https://github.com/go-redis/redis/commit/7d5167e8624ac1515e146ed183becb97dadb3d1a))
- **pool:** add check for badConnection
([a8a7665](https://github.com/go-redis/redis/commit/a8a7665ddf8cc657c5226b1826a8ee83dab4b8c1)),
closes [#2053](https://github.com/go-redis/redis/issues/2053)
- provide a username and password callback method, so that the plaintext username and password will
not be stored in the memory, and the username and password will only be generated once when the
CredentialsProvider is called. After the method is executed, the username and password strings on
the stack will be released. ([#2097](https://github.com/go-redis/redis/issues/2097))
([56a3dbc](https://github.com/go-redis/redis/commit/56a3dbc7b656525eb88e0735e239d56e04a23bee))
- upgrade to Redis 7
([d09c27e](https://github.com/go-redis/redis/commit/d09c27e6046129fd27b1d275e5a13a477bd7f778))
## v9 UNRELEASED ## v9 UNRELEASED
@ -29,3 +46,7 @@
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources. - Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources.
`Pipeline.Discard` is still available if you want to reset commands for some reason. `Pipeline.Discard` is still available if you want to reset commands for some reason.
- Replaced `*redis.Z` with `redis.Z` since it is small enough to be passed as value. - Replaced `*redis.Z` with `redis.Z` since it is small enough to be passed as value.
- Renamed `MaxConnAge` to `ConnMaxLifetime`.
- Renamed `IdleTimeout` to `ConnMaxIdleTime`.
- Removed connection reaper in favor of `MaxIdleConns`.
- Removed `WithContext`.

View File

@ -73,11 +73,11 @@ type ClusterOptions struct {
// PoolSize applies per cluster node and not for the whole cluster. // PoolSize applies per cluster node and not for the whole cluster.
PoolSize int PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration MinIdleConns int
IdleCheckFrequency time.Duration MaxIdleConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
TLSConfig *tls.Config TLSConfig *tls.Config
} }
@ -132,8 +132,6 @@ func (opt *ClusterOptions) init() {
} }
func (opt *ClusterOptions) clientOptions() *Options { func (opt *ClusterOptions) clientOptions() *Options {
const disableIdleCheck = -1
return &Options{ return &Options{
Dialer: opt.Dialer, Dialer: opt.Dialer,
OnConnect: opt.OnConnect, OnConnect: opt.OnConnect,
@ -151,11 +149,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, MinIdleConns: opt.MinIdleConns,
IdleCheckFrequency: disableIdleCheck, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
// If ClusterSlots is populated, then we probably have an artificial // If ClusterSlots is populated, then we probably have an artificial
@ -725,10 +723,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
c.cmdable = c.Process c.cmdable = c.Process
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
return c return c
} }
@ -1049,26 +1043,6 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
return nil, firstErr return nil, firstErr
} }
// reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
ticker := time.NewTicker(idleCheckFrequency)
defer ticker.Stop()
for range ticker.C {
nodes, err := c.nodes.All()
if err != nil {
break
}
for _, node := range nodes {
_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
internal.Logger.Printf(context.TODO(), "ReapStaleConns failed: %s", err)
}
}
}
}
func (c *ClusterClient) Pipeline() Pipeliner { func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{ pipe := Pipeline{
exec: c.processPipeline, exec: c.processPipeline,

View File

@ -33,8 +33,7 @@ func BenchmarkPoolGetPut(b *testing.B) {
Dialer: dummyDialer, Dialer: dummyDialer,
PoolSize: bm.poolSize, PoolSize: bm.poolSize,
PoolTimeout: time.Second, PoolTimeout: time.Second,
IdleTimeout: time.Hour, ConnMaxIdleTime: time.Hour,
IdleCheckFrequency: time.Hour,
}) })
b.ResetTimer() b.ResetTimer()
@ -77,8 +76,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
Dialer: dummyDialer, Dialer: dummyDialer,
PoolSize: bm.poolSize, PoolSize: bm.poolSize,
PoolTimeout: time.Second, PoolTimeout: time.Second,
IdleTimeout: time.Hour, ConnMaxIdleTime: time.Hour,
IdleCheckFrequency: time.Hour,
}) })
b.ResetTimer() b.ResetTimer()

View File

@ -27,7 +27,8 @@ func connCheck(conn net.Conn) error {
} }
var sysErr error var sysErr error
err = rawConn.Read(func(fd uintptr) bool {
if err := rawConn.Read(func(fd uintptr) bool {
var buf [1]byte var buf [1]byte
n, err := syscall.Read(int(fd), buf[:]) n, err := syscall.Read(int(fd), buf[:])
switch { switch {
@ -41,8 +42,7 @@ func connCheck(conn net.Conn) error {
sysErr = err sysErr = err
} }
return true return true
}) }); err != nil {
if err != nil {
return err return err
} }

View File

@ -59,11 +59,11 @@ type Options struct {
PoolFIFO bool PoolFIFO bool
PoolSize int PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration MinIdleConns int
IdleCheckFrequency time.Duration MaxIdleConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
} }
type lastDialErrorWrap struct { type lastDialErrorWrap struct {
@ -71,10 +71,9 @@ type lastDialErrorWrap struct {
} }
type ConnPool struct { type ConnPool struct {
opt *Options cfg *Options
dialErrorsNum uint32 // atomic dialErrorsNum uint32 // atomic
lastDialError atomic.Value lastDialError atomic.Value
queue chan struct{} queue chan struct{}
@ -82,6 +81,7 @@ type ConnPool struct {
connsMu sync.Mutex connsMu sync.Mutex
conns []*Conn conns []*Conn
idleConns []*Conn idleConns []*Conn
poolSize int poolSize int
idleConnsLen int idleConnsLen int
@ -95,7 +95,7 @@ var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool { func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{ p := &ConnPool{
opt: opt, cfg: opt,
queue: make(chan struct{}, opt.PoolSize), queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize),
@ -107,18 +107,14 @@ func NewConnPool(opt *Options) *ConnPool {
p.checkMinIdleConns() p.checkMinIdleConns()
p.connsMu.Unlock() p.connsMu.Unlock()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p return p
} }
func (p *ConnPool) checkMinIdleConns() { func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 { if p.cfg.MinIdleConns == 0 {
return return
} }
for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns {
p.poolSize++ p.poolSize++
p.idleConnsLen++ p.idleConnsLen++
@ -176,7 +172,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
p.conns = append(p.conns, cn) p.conns = append(p.conns, cn)
if pooled { if pooled {
// If pool is full remove the cn on next Put. // If pool is full remove the cn on next Put.
if p.poolSize >= p.opt.PoolSize { if p.poolSize >= p.cfg.PoolSize {
cn.pooled = false cn.pooled = false
} else { } else {
p.poolSize++ p.poolSize++
@ -191,14 +187,14 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
return nil, ErrClosed return nil, ErrClosed
} }
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.cfg.PoolSize) {
return nil, p.getLastDialError() return nil, p.getLastDialError()
} }
netConn, err := p.opt.Dialer(ctx) netConn, err := p.cfg.Dialer(ctx)
if err != nil { if err != nil {
p.setLastDialError(err) p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
go p.tryDial() go p.tryDial()
} }
return nil, err return nil, err
@ -215,7 +211,7 @@ func (p *ConnPool) tryDial() {
return return
} }
conn, err := p.opt.Dialer(context.Background()) conn, err := p.cfg.Dialer(context.Background())
if err != nil { if err != nil {
p.setLastDialError(err) p.setLastDialError(err)
time.Sleep(time.Second) time.Sleep(time.Second)
@ -263,7 +259,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
break break
} }
if p.isStaleConn(cn) { if !p.isHealthyConn(cn) {
_ = p.CloseConn(cn) _ = p.CloseConn(cn)
continue continue
} }
@ -283,10 +279,6 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
return newcn, nil return newcn, nil
} }
func (p *ConnPool) getTurn() {
p.queue <- struct{}{}
}
func (p *ConnPool) waitTurn(ctx context.Context) error { func (p *ConnPool) waitTurn(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -301,7 +293,7 @@ func (p *ConnPool) waitTurn(ctx context.Context) error {
} }
timer := timers.Get().(*time.Timer) timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout) timer.Reset(p.cfg.PoolTimeout)
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -337,7 +329,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
} }
var cn *Conn var cn *Conn
if p.opt.PoolFIFO { if p.cfg.PoolFIFO {
cn = p.idleConns[0] cn = p.idleConns[0]
copy(p.idleConns, p.idleConns[1:]) copy(p.idleConns, p.idleConns[1:])
p.idleConns = p.idleConns[:n-1] p.idleConns = p.idleConns[:n-1]
@ -363,11 +355,25 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
return return
} }
var shouldCloseConn bool
p.connsMu.Lock() p.connsMu.Lock()
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
p.idleConns = append(p.idleConns, cn) p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++ p.idleConnsLen++
} else {
p.removeConn(cn)
shouldCloseConn = true
}
p.connsMu.Unlock() p.connsMu.Unlock()
p.freeTurn() p.freeTurn()
if shouldCloseConn {
_ = p.closeConn(cn)
}
} }
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
@ -383,8 +389,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
func (p *ConnPool) removeConnWithLock(cn *Conn) { func (p *ConnPool) removeConnWithLock(cn *Conn) {
p.connsMu.Lock() p.connsMu.Lock()
defer p.connsMu.Unlock()
p.removeConn(cn) p.removeConn(cn)
p.connsMu.Unlock()
} }
func (p *ConnPool) removeConn(cn *Conn) { func (p *ConnPool) removeConn(cn *Conn) {
@ -395,14 +401,14 @@ func (p *ConnPool) removeConn(cn *Conn) {
p.poolSize-- p.poolSize--
p.checkMinIdleConns() p.checkMinIdleConns()
} }
return break
} }
} }
} }
func (p *ConnPool) closeConn(cn *Conn) error { func (p *ConnPool) closeConn(cn *Conn) error {
if p.opt.OnClose != nil { if p.cfg.OnClose != nil {
_ = p.opt.OnClose(cn) _ = p.cfg.OnClose(cn)
} }
return cn.Close() return cn.Close()
} }
@ -477,81 +483,21 @@ func (p *ConnPool) Close() error {
return firstErr return firstErr
} }
func (p *ConnPool) reaper(frequency time.Duration) { func (p *ConnPool) isHealthyConn(cn *Conn) bool {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// It is possible that ticker and closedCh arrive together,
// and select pseudo-randomly pick ticker case, we double
// check here to prevent being executed after closed.
if p.closed() {
return
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
continue
}
case <-p.closedCh:
return
}
}
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
p.freeTurn()
if cn != nil {
_ = p.closeConn(cn)
n++
} else {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
p.removeConn(cn)
return cn
}
func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
return connCheck(cn.netConn) != nil
}
now := time.Now() now := time.Now()
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime {
return false
} }
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { if p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {
return true atomic.AddUint32(&p.stats.IdleConns, 1)
return false
} }
return connCheck(cn.netConn) != nil if connCheck(cn.netConn) != nil {
return false
}
cn.SetUsedAt(now)
return true
} }

View File

@ -22,8 +22,7 @@ var _ = Describe("ConnPool", func() {
Dialer: dummyDialer, Dialer: dummyDialer,
PoolSize: 10, PoolSize: 10,
PoolTimeout: time.Hour, PoolTimeout: time.Hour,
IdleTimeout: time.Millisecond, ConnMaxIdleTime: time.Millisecond,
IdleCheckFrequency: time.Millisecond,
}) })
}) })
@ -47,8 +46,7 @@ var _ = Describe("ConnPool", func() {
}, },
PoolSize: 10, PoolSize: 10,
PoolTimeout: time.Hour, PoolTimeout: time.Hour,
IdleTimeout: time.Millisecond, ConnMaxIdleTime: time.Millisecond,
IdleCheckFrequency: time.Millisecond,
MinIdleConns: minIdleConns, MinIdleConns: minIdleConns,
}) })
wg.Wait() wg.Wait()
@ -131,8 +129,7 @@ var _ = Describe("MinIdleConns", func() {
PoolSize: poolSize, PoolSize: poolSize,
MinIdleConns: minIdleConns, MinIdleConns: minIdleConns,
PoolTimeout: 100 * time.Millisecond, PoolTimeout: 100 * time.Millisecond,
IdleTimeout: -1, ConnMaxIdleTime: -1,
IdleCheckFrequency: -1,
}) })
Eventually(func() int { Eventually(func() int {
return connPool.Len() return connPool.Len()
@ -287,133 +284,6 @@ var _ = Describe("MinIdleConns", func() {
}) })
}) })
var _ = Describe("conns reaper", func() {
const idleTimeout = time.Minute
const maxAge = time.Hour
ctx := context.Background()
var connPool *pool.ConnPool
var conns, staleConns, closedConns []*pool.Conn
assert := func(typ string) {
BeforeEach(func() {
closedConns = nil
connPool = pool.NewConnPool(&pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
IdleTimeout: idleTimeout,
MaxConnAge: maxAge,
PoolTimeout: time.Second,
IdleCheckFrequency: time.Hour,
OnClose: func(cn *pool.Conn) error {
closedConns = append(closedConns, cn)
return nil
},
})
conns = nil
// add stale connections
staleConns = nil
for i := 0; i < 3; i++ {
cn, err := connPool.Get(ctx)
Expect(err).NotTo(HaveOccurred())
switch typ {
case "idle":
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
case "aged":
cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
case "connCheck":
_ = cn.Close()
}
conns = append(conns, cn)
staleConns = append(staleConns, cn)
}
// add fresh connections
for i := 0; i < 3; i++ {
cn, err := connPool.Get(ctx)
Expect(err).NotTo(HaveOccurred())
conns = append(conns, cn)
}
for _, cn := range conns {
connPool.Put(ctx, cn)
}
Expect(connPool.Len()).To(Equal(6))
Expect(connPool.IdleLen()).To(Equal(6))
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(3))
})
AfterEach(func() {
_ = connPool.Close()
Expect(connPool.Len()).To(Equal(0))
Expect(connPool.IdleLen()).To(Equal(0))
Expect(len(closedConns)).To(Equal(len(conns)))
Expect(closedConns).To(ConsistOf(conns))
})
It("reaps stale connections", func() {
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(3))
})
It("does not reap fresh connections", func() {
n, err := connPool.ReapStaleConns()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(0))
})
It("stale connections are closed", func() {
Expect(len(closedConns)).To(Equal(len(staleConns)))
Expect(closedConns).To(ConsistOf(staleConns))
})
It("pool is functional", func() {
for j := 0; j < 3; j++ {
var freeCns []*pool.Conn
for i := 0; i < 3; i++ {
cn, err := connPool.Get(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil())
freeCns = append(freeCns, cn)
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0))
cn, err := connPool.Get(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cn).NotTo(BeNil())
conns = append(conns, cn)
Expect(connPool.Len()).To(Equal(4))
Expect(connPool.IdleLen()).To(Equal(0))
connPool.Remove(ctx, cn, nil)
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(0))
for _, cn := range freeCns {
connPool.Put(ctx, cn)
}
Expect(connPool.Len()).To(Equal(3))
Expect(connPool.IdleLen()).To(Equal(3))
}
})
}
assert("idle")
assert("aged")
assert("connCheck")
})
var _ = Describe("race", func() { var _ = Describe("race", func() {
ctx := context.Background() ctx := context.Background()
var connPool *pool.ConnPool var connPool *pool.ConnPool
@ -436,8 +306,7 @@ var _ = Describe("race", func() {
Dialer: dummyDialer, Dialer: dummyDialer,
PoolSize: 10, PoolSize: 10,
PoolTimeout: time.Minute, PoolTimeout: time.Minute,
IdleTimeout: time.Millisecond, ConnMaxIdleTime: time.Millisecond,
IdleCheckFrequency: time.Millisecond,
}) })
perform(C, func(id int) { perform(C, func(id int) {

View File

@ -132,8 +132,7 @@ func redisOptions() *redis.Options {
PoolSize: 10, PoolSize: 10,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute, ConnMaxIdleTime: time.Minute,
IdleCheckFrequency: 100 * time.Millisecond,
} }
} }
@ -147,8 +146,7 @@ func redisClusterOptions() *redis.ClusterOptions {
PoolSize: 10, PoolSize: 10,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute, ConnMaxIdleTime: time.Minute,
IdleCheckFrequency: 100 * time.Millisecond,
} }
} }
@ -167,8 +165,7 @@ func redisRingOptions() *redis.RingOptions {
PoolSize: 10, PoolSize: 10,
PoolTimeout: 30 * time.Second, PoolTimeout: 30 * time.Second,
IdleTimeout: time.Minute, ConnMaxIdleTime: time.Minute,
IdleCheckFrequency: 100 * time.Millisecond,
} }
} }

View File

@ -87,25 +87,22 @@ type Options struct {
// Maximum number of socket connections. // Maximum number of socket connections.
// Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS. // Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS.
PoolSize int PoolSize int
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int
// Connection age at which client retires (closes) the connection.
// Default is to not close aged connections.
MaxConnAge time.Duration
// Amount of time client waits for connection if all connections // Amount of time client waits for connection if all connections
// are busy before returning an error. // are busy before returning an error.
// Default is ReadTimeout + 1 second. // Default is ReadTimeout + 1 second.
PoolTimeout time.Duration PoolTimeout time.Duration
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int
// Maximum number of idle connections.
MaxIdleConns int
// Amount of time after which client closes idle connections. // Amount of time after which client closes idle connections.
// Should be less than server's timeout. // Should be less than server's timeout.
// Default is 5 minutes. -1 disables idle timeout check. // Default is 5 minutes. -1 disables idle timeout check.
IdleTimeout time.Duration ConnMaxIdleTime time.Duration
// Frequency of idle checks made by idle connections reaper. // Connection age at which client retires (closes) the connection.
// Default is 1 minute. -1 disables idle connections reaper, // Default is to not close aged connections.
// but idle connections are still discarded by the client ConnMaxLifetime time.Duration
// if IdleTimeout is set.
IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes. // Enables read only queries on slave nodes.
readOnly bool readOnly bool
@ -161,11 +158,8 @@ func (opt *Options) init() {
if opt.PoolTimeout == 0 { if opt.PoolTimeout == 0 {
opt.PoolTimeout = opt.ReadTimeout + time.Second opt.PoolTimeout = opt.ReadTimeout + time.Second
} }
if opt.IdleTimeout == 0 { if opt.ConnMaxIdleTime == 0 {
opt.IdleTimeout = 5 * time.Minute opt.ConnMaxIdleTime = 30 * time.Minute
}
if opt.IdleCheckFrequency == 0 {
opt.IdleCheckFrequency = time.Minute
} }
if opt.MaxRetries == -1 { if opt.MaxRetries == -1 {
@ -297,6 +291,10 @@ type queryOptions struct {
err error err error
} }
func (o *queryOptions) has(name string) bool {
return len(o.q[name]) > 0
}
func (o *queryOptions) string(name string) string { func (o *queryOptions) string(name string) string {
vs := o.q[name] vs := o.q[name]
if len(vs) == 0 { if len(vs) == 0 {
@ -391,11 +389,19 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
o.WriteTimeout = q.duration("write_timeout") o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo") o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size") o.PoolSize = q.int("pool_size")
o.MinIdleConns = q.int("min_idle_conns")
o.MaxConnAge = q.duration("max_conn_age")
o.PoolTimeout = q.duration("pool_timeout") o.PoolTimeout = q.duration("pool_timeout")
o.IdleTimeout = q.duration("idle_timeout") o.MinIdleConns = q.int("min_idle_conns")
o.IdleCheckFrequency = q.duration("idle_check_frequency") o.MaxIdleConns = q.int("max_idle_conns")
if q.has("conn_max_idle_time") {
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
} else {
o.ConnMaxIdleTime = q.duration("idle_timeout")
}
if q.has("conn_max_lifetime") {
o.ConnMaxLifetime = q.duration("conn_max_lifetime")
} else {
o.ConnMaxLifetime = q.duration("max_conn_age")
}
if q.err != nil { if q.err != nil {
return nil, q.err return nil, q.err
} }
@ -426,10 +432,10 @@ func newConnPool(opt *Options) *pool.ConnPool {
}, },
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, MinIdleConns: opt.MinIdleConns,
IdleCheckFrequency: opt.IdleCheckFrequency, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
}) })
} }

View File

@ -47,18 +47,18 @@ func TestParseURL(t *testing.T) {
}, { }, {
// special case handling for disabled timeouts // special case handling for disabled timeouts
url: "redis://localhost:123/?db=2&idle_timeout=0", url: "redis://localhost:123/?db=2&idle_timeout=0",
o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: -1}, o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: -1},
}, { }, {
// negative values disable timeouts as well // negative values disable timeouts as well
url: "redis://localhost:123/?db=2&idle_timeout=-1", url: "redis://localhost:123/?db=2&idle_timeout=-1",
o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: -1}, o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: -1},
}, { }, {
// absent timeout values will use defaults // absent timeout values will use defaults
url: "redis://localhost:123/?db=2&idle_timeout=", url: "redis://localhost:123/?db=2&idle_timeout=",
o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: 0}, o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0},
}, { }, {
url: "redis://localhost:123/?db=2&idle_timeout", // missing "=" at the end url: "redis://localhost:123/?db=2&idle_timeout", // missing "=" at the end
o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: 0}, o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0},
}, { }, {
url: "unix:///tmp/redis.sock", url: "unix:///tmp/redis.sock",
o: &Options{Addr: "/tmp/redis.sock"}, o: &Options{Addr: "/tmp/redis.sock"},
@ -174,20 +174,20 @@ func comprareOptions(t *testing.T, actual, expected *Options) {
if actual.PoolSize != expected.PoolSize { if actual.PoolSize != expected.PoolSize {
t.Errorf("PoolSize: got %v, expected %v", actual.PoolSize, expected.PoolSize) t.Errorf("PoolSize: got %v, expected %v", actual.PoolSize, expected.PoolSize)
} }
if actual.MinIdleConns != expected.MinIdleConns {
t.Errorf("MinIdleConns: got %v, expected %v", actual.MinIdleConns, expected.MinIdleConns)
}
if actual.MaxConnAge != expected.MaxConnAge {
t.Errorf("MaxConnAge: got %v, expected %v", actual.MaxConnAge, expected.MaxConnAge)
}
if actual.PoolTimeout != expected.PoolTimeout { if actual.PoolTimeout != expected.PoolTimeout {
t.Errorf("PoolTimeout: got %v, expected %v", actual.PoolTimeout, expected.PoolTimeout) t.Errorf("PoolTimeout: got %v, expected %v", actual.PoolTimeout, expected.PoolTimeout)
} }
if actual.IdleTimeout != expected.IdleTimeout { if actual.MinIdleConns != expected.MinIdleConns {
t.Errorf("IdleTimeout: got %v, expected %v", actual.IdleTimeout, expected.IdleTimeout) t.Errorf("MinIdleConns: got %v, expected %v", actual.MinIdleConns, expected.MinIdleConns)
} }
if actual.IdleCheckFrequency != expected.IdleCheckFrequency { if actual.MaxIdleConns != expected.MaxIdleConns {
t.Errorf("IdleCheckFrequency: got %v, expected %v", actual.IdleCheckFrequency, expected.IdleCheckFrequency) t.Errorf("MaxIdleConns: got %v, expected %v", actual.MaxIdleConns, expected.MaxIdleConns)
}
if actual.ConnMaxIdleTime != expected.ConnMaxIdleTime {
t.Errorf("ConnMaxIdleTime: got %v, expected %v", actual.ConnMaxIdleTime, expected.ConnMaxIdleTime)
}
if actual.ConnMaxLifetime != expected.ConnMaxLifetime {
t.Errorf("ConnMaxLifetime: got %v, expected %v", actual.ConnMaxLifetime, expected.ConnMaxLifetime)
} }
} }

View File

@ -16,8 +16,8 @@ var _ = Describe("pool", func() {
BeforeEach(func() { BeforeEach(func() {
opt := redisOptions() opt := redisOptions()
opt.MinIdleConns = 0 opt.MinIdleConns = 0
opt.MaxConnAge = 0 opt.ConnMaxLifetime = 0
opt.IdleTimeout = time.Second opt.ConnMaxIdleTime = time.Second
client = redis.NewClient(opt) client = redis.NewClient(opt)
}) })
@ -108,8 +108,8 @@ var _ = Describe("pool", func() {
// explain: https://github.com/go-redis/redis/pull/1675 // explain: https://github.com/go-redis/redis/pull/1675
opt := redisOptions() opt := redisOptions()
opt.MinIdleConns = 0 opt.MinIdleConns = 0
opt.MaxConnAge = 0 opt.ConnMaxLifetime = 0
opt.IdleTimeout = 2 * time.Second opt.ConnMaxIdleTime = 10 * time.Second
client = redis.NewClient(opt) client = redis.NewClient(opt)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
@ -127,31 +127,4 @@ var _ = Describe("pool", func() {
Expect(stats.Misses).To(Equal(uint32(1))) Expect(stats.Misses).To(Equal(uint32(1)))
Expect(stats.Timeouts).To(Equal(uint32(0))) Expect(stats.Timeouts).To(Equal(uint32(0)))
}) })
It("removes idle connections", func() {
err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
stats := client.PoolStats()
Expect(stats).To(Equal(&redis.PoolStats{
Hits: 0,
Misses: 1,
Timeouts: 0,
TotalConns: 1,
IdleConns: 1,
StaleConns: 0,
}))
time.Sleep(2 * time.Second)
stats = client.PoolStats()
Expect(stats).To(Equal(&redis.PoolStats{
Hits: 0,
Misses: 1,
Timeouts: 0,
TotalConns: 0,
IdleConns: 0,
StaleConns: 1,
}))
})
}) })

View File

@ -18,7 +18,7 @@ var _ = Describe("PubSub", func() {
BeforeEach(func() { BeforeEach(func() {
opt := redisOptions() opt := redisOptions()
opt.MinIdleConns = 0 opt.MinIdleConns = 0
opt.MaxConnAge = 0 opt.ConnMaxLifetime = 0
client = redis.NewClient(opt) client = redis.NewClient(opt)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
}) })

16
ring.go
View File

@ -83,11 +83,11 @@ type RingOptions struct {
PoolFIFO bool PoolFIFO bool
PoolSize int PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration MinIdleConns int
IdleCheckFrequency time.Duration MaxIdleConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
TLSConfig *tls.Config TLSConfig *tls.Config
Limiter Limiter Limiter Limiter
@ -144,11 +144,11 @@ func (opt *RingOptions) clientOptions() *Options {
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, MinIdleConns: opt.MinIdleConns,
IdleCheckFrequency: opt.IdleCheckFrequency, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
Limiter: opt.Limiter, Limiter: opt.Limiter,

View File

@ -63,15 +63,14 @@ type FailoverOptions struct {
ReadTimeout time.Duration ReadTimeout time.Duration
WriteTimeout time.Duration WriteTimeout time.Duration
// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool PoolFIFO bool
PoolSize int PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration MinIdleConns int
IdleCheckFrequency time.Duration MaxIdleConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
TLSConfig *tls.Config TLSConfig *tls.Config
} }
@ -98,10 +97,10 @@ func (opt *FailoverOptions) clientOptions() *Options {
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns, MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
} }
@ -129,10 +128,10 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns, MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
} }
@ -161,10 +160,10 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns, MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge, MaxIdleConns: opt.MaxIdleConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
} }
@ -580,7 +579,7 @@ func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *Sentin
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
c.opt.MasterName, err) c.opt.MasterName, err)
return []string{} return nil
} }
return parseReplicaAddrs(addrs, false) return parseReplicaAddrs(addrs, false)
} }

View File

@ -40,11 +40,11 @@ type UniversalOptions struct {
PoolFIFO bool PoolFIFO bool
PoolSize int PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration MinIdleConns int
IdleCheckFrequency time.Duration MaxIdleConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
TLSConfig *tls.Config TLSConfig *tls.Config
@ -87,13 +87,15 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
DialTimeout: o.DialTimeout, DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout, ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout, WriteTimeout: o.WriteTimeout,
PoolFIFO: o.PoolFIFO, PoolFIFO: o.PoolFIFO,
PoolSize: o.PoolSize, PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout, PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout, MinIdleConns: o.MinIdleConns,
IdleCheckFrequency: o.IdleCheckFrequency, MaxIdleConns: o.MaxIdleConns,
ConnMaxIdleTime: o.ConnMaxIdleTime,
ConnMaxLifetime: o.ConnMaxLifetime,
TLSConfig: o.TLSConfig, TLSConfig: o.TLSConfig,
} }
@ -128,11 +130,11 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
PoolFIFO: o.PoolFIFO, PoolFIFO: o.PoolFIFO,
PoolSize: o.PoolSize, PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout, PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout, MinIdleConns: o.MinIdleConns,
IdleCheckFrequency: o.IdleCheckFrequency, MaxIdleConns: o.MaxIdleConns,
ConnMaxIdleTime: o.ConnMaxIdleTime,
ConnMaxLifetime: o.ConnMaxLifetime,
TLSConfig: o.TLSConfig, TLSConfig: o.TLSConfig,
} }
@ -164,11 +166,11 @@ func (o *UniversalOptions) Simple() *Options {
PoolFIFO: o.PoolFIFO, PoolFIFO: o.PoolFIFO,
PoolSize: o.PoolSize, PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout, PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout, MinIdleConns: o.MinIdleConns,
IdleCheckFrequency: o.IdleCheckFrequency, MaxIdleConns: o.MaxIdleConns,
ConnMaxIdleTime: o.ConnMaxIdleTime,
ConnMaxLifetime: o.ConnMaxLifetime,
TLSConfig: o.TLSConfig, TLSConfig: o.TLSConfig,
} }