From f6a8adc50cdaec30527f50d06468f9176ee674fe Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Thu, 28 Jul 2022 15:11:35 +0300 Subject: [PATCH] fix: remove conn reaper from the pool and uptrace option names --- CHANGELOG.md | 55 ++++++++---- cluster.go | 52 +++-------- internal/pool/bench_test.go | 18 ++-- internal/pool/conn_check.go | 6 +- internal/pool/pool.go | 166 ++++++++++++------------------------ internal/pool/pool_test.go | 165 ++++------------------------------- main_test.go | 21 ++--- options.go | 62 ++++++++------ options_test.go | 28 +++--- pool_test.go | 35 +------- pubsub_test.go | 2 +- ring.go | 26 +++--- sentinel.go | 57 ++++++------- universal.go | 62 +++++++------- 14 files changed, 270 insertions(+), 485 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4377e945..0350b02c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,27 +1,44 @@ # [9.0.0-beta.1](https://github.com/go-redis/redis/compare/v8.11.5...v9.0.0-beta.1) (2022-06-04) - ### Bug Fixes -* **#1943:** xInfoConsumer.Idle should be time.Duration instead of int64 ([#2052](https://github.com/go-redis/redis/issues/2052)) ([997ab5e](https://github.com/go-redis/redis/commit/997ab5e7e3ddf53837917013a4babbded73e944f)), closes [#1943](https://github.com/go-redis/redis/issues/1943) -* add XInfoConsumers test ([6f1a1ac](https://github.com/go-redis/redis/commit/6f1a1ac284ea3f683eeb3b06a59969e8424b6376)) -* fix tests ([3a722be](https://github.com/go-redis/redis/commit/3a722be81180e4d2a9cf0a29dc9a1ee1421f5859)) -* remove test(XInfoConsumer.idle), not a stable return value when tested. ([f5fbb36](https://github.com/go-redis/redis/commit/f5fbb367e7d9dfd7f391fc535a7387002232fa8a)) -* update ChannelWithSubscriptions to accept options ([c98c5f0](https://github.com/go-redis/redis/commit/c98c5f0eebf8d254307183c2ce702a48256b718d)) -* update COMMAND parser for Redis 7 ([b0bb514](https://github.com/go-redis/redis/commit/b0bb514059249e01ed7328c9094e5b8a439dfb12)) -* use redis over ssh channel([#2057](https://github.com/go-redis/redis/issues/2057)) ([#2060](https://github.com/go-redis/redis/issues/2060)) ([3961b95](https://github.com/go-redis/redis/commit/3961b9577f622a3079fe74f8fc8da12ba67a77ff)) - +- **#1943:** xInfoConsumer.Idle should be time.Duration instead of int64 + ([#2052](https://github.com/go-redis/redis/issues/2052)) + ([997ab5e](https://github.com/go-redis/redis/commit/997ab5e7e3ddf53837917013a4babbded73e944f)), + closes [#1943](https://github.com/go-redis/redis/issues/1943) +- add XInfoConsumers test + ([6f1a1ac](https://github.com/go-redis/redis/commit/6f1a1ac284ea3f683eeb3b06a59969e8424b6376)) +- fix tests + ([3a722be](https://github.com/go-redis/redis/commit/3a722be81180e4d2a9cf0a29dc9a1ee1421f5859)) +- remove test(XInfoConsumer.idle), not a stable return value when tested. + ([f5fbb36](https://github.com/go-redis/redis/commit/f5fbb367e7d9dfd7f391fc535a7387002232fa8a)) +- update ChannelWithSubscriptions to accept options + ([c98c5f0](https://github.com/go-redis/redis/commit/c98c5f0eebf8d254307183c2ce702a48256b718d)) +- update COMMAND parser for Redis 7 + ([b0bb514](https://github.com/go-redis/redis/commit/b0bb514059249e01ed7328c9094e5b8a439dfb12)) +- use redis over ssh channel([#2057](https://github.com/go-redis/redis/issues/2057)) + ([#2060](https://github.com/go-redis/redis/issues/2060)) + ([3961b95](https://github.com/go-redis/redis/commit/3961b9577f622a3079fe74f8fc8da12ba67a77ff)) ### Features -* add ClientUnpause ([91171f5](https://github.com/go-redis/redis/commit/91171f5e19a261dc4cfbf8706626d461b6ba03e4)) -* add NewXPendingResult for unit testing XPending ([#2066](https://github.com/go-redis/redis/issues/2066)) ([b7fd09e](https://github.com/go-redis/redis/commit/b7fd09e59479bc6ed5b3b13c4645a3620fd448a3)) -* add WriteArg and Scan net.IP([#2062](https://github.com/go-redis/redis/issues/2062)) ([7d5167e](https://github.com/go-redis/redis/commit/7d5167e8624ac1515e146ed183becb97dadb3d1a)) -* **pool:** add check for badConnection ([a8a7665](https://github.com/go-redis/redis/commit/a8a7665ddf8cc657c5226b1826a8ee83dab4b8c1)), closes [#2053](https://github.com/go-redis/redis/issues/2053) -* provide a username and password callback method, so that the plaintext username and password will not be stored in the memory, and the username and password will only be generated once when the CredentialsProvider is called. After the method is executed, the username and password strings on the stack will be released. ([#2097](https://github.com/go-redis/redis/issues/2097)) ([56a3dbc](https://github.com/go-redis/redis/commit/56a3dbc7b656525eb88e0735e239d56e04a23bee)) -* upgrade to Redis 7 ([d09c27e](https://github.com/go-redis/redis/commit/d09c27e6046129fd27b1d275e5a13a477bd7f778)) - - +- add ClientUnpause + ([91171f5](https://github.com/go-redis/redis/commit/91171f5e19a261dc4cfbf8706626d461b6ba03e4)) +- add NewXPendingResult for unit testing XPending + ([#2066](https://github.com/go-redis/redis/issues/2066)) + ([b7fd09e](https://github.com/go-redis/redis/commit/b7fd09e59479bc6ed5b3b13c4645a3620fd448a3)) +- add WriteArg and Scan net.IP([#2062](https://github.com/go-redis/redis/issues/2062)) + ([7d5167e](https://github.com/go-redis/redis/commit/7d5167e8624ac1515e146ed183becb97dadb3d1a)) +- **pool:** add check for badConnection + ([a8a7665](https://github.com/go-redis/redis/commit/a8a7665ddf8cc657c5226b1826a8ee83dab4b8c1)), + closes [#2053](https://github.com/go-redis/redis/issues/2053) +- provide a username and password callback method, so that the plaintext username and password will + not be stored in the memory, and the username and password will only be generated once when the + CredentialsProvider is called. After the method is executed, the username and password strings on + the stack will be released. ([#2097](https://github.com/go-redis/redis/issues/2097)) + ([56a3dbc](https://github.com/go-redis/redis/commit/56a3dbc7b656525eb88e0735e239d56e04a23bee)) +- upgrade to Redis 7 + ([d09c27e](https://github.com/go-redis/redis/commit/d09c27e6046129fd27b1d275e5a13a477bd7f778)) ## v9 UNRELEASED @@ -29,3 +46,7 @@ - Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources. `Pipeline.Discard` is still available if you want to reset commands for some reason. - Replaced `*redis.Z` with `redis.Z` since it is small enough to be passed as value. +- Renamed `MaxConnAge` to `ConnMaxLifetime`. +- Renamed `IdleTimeout` to `ConnMaxIdleTime`. +- Removed connection reaper in favor of `MaxIdleConns`. +- Removed `WithContext`. diff --git a/cluster.go b/cluster.go index 2db73bdb..05b234aa 100644 --- a/cluster.go +++ b/cluster.go @@ -72,12 +72,12 @@ type ClusterOptions struct { PoolFIFO bool // PoolSize applies per cluster node and not for the whole cluster. - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolSize int + PoolTimeout time.Duration + MinIdleConns int + MaxIdleConns int + ConnMaxIdleTime time.Duration + ConnMaxLifetime time.Duration TLSConfig *tls.Config } @@ -132,8 +132,6 @@ func (opt *ClusterOptions) init() { } func (opt *ClusterOptions) clientOptions() *Options { - const disableIdleCheck = -1 - return &Options{ Dialer: opt.Dialer, OnConnect: opt.OnConnect, @@ -149,13 +147,13 @@ func (opt *ClusterOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: disableIdleCheck, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, // If ClusterSlots is populated, then we probably have an artificial @@ -725,10 +723,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) c.cmdable = c.Process - if opt.IdleCheckFrequency > 0 { - go c.reaper(opt.IdleCheckFrequency) - } - return c } @@ -1049,26 +1043,6 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) { return nil, firstErr } -// reaper closes idle connections to the cluster. -func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) { - ticker := time.NewTicker(idleCheckFrequency) - defer ticker.Stop() - - for range ticker.C { - nodes, err := c.nodes.All() - if err != nil { - break - } - - for _, node := range nodes { - _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns() - if err != nil { - internal.Logger.Printf(context.TODO(), "ReapStaleConns failed: %s", err) - } - } - } -} - func (c *ClusterClient) Pipeline() Pipeliner { pipe := Pipeline{ exec: c.processPipeline, diff --git a/internal/pool/bench_test.go b/internal/pool/bench_test.go index 0d8dbbdf..067ae06c 100644 --- a/internal/pool/bench_test.go +++ b/internal/pool/bench_test.go @@ -30,11 +30,10 @@ func BenchmarkPoolGetPut(b *testing.B) { for _, bm := range benchmarks { b.Run(bm.String(), func(b *testing.B) { connPool := pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: bm.poolSize, - PoolTimeout: time.Second, - IdleTimeout: time.Hour, - IdleCheckFrequency: time.Hour, + Dialer: dummyDialer, + PoolSize: bm.poolSize, + PoolTimeout: time.Second, + ConnMaxIdleTime: time.Hour, }) b.ResetTimer() @@ -74,11 +73,10 @@ func BenchmarkPoolGetRemove(b *testing.B) { for _, bm := range benchmarks { b.Run(bm.String(), func(b *testing.B) { connPool := pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: bm.poolSize, - PoolTimeout: time.Second, - IdleTimeout: time.Hour, - IdleCheckFrequency: time.Hour, + Dialer: dummyDialer, + PoolSize: bm.poolSize, + PoolTimeout: time.Second, + ConnMaxIdleTime: time.Hour, }) b.ResetTimer() diff --git a/internal/pool/conn_check.go b/internal/pool/conn_check.go index 74680e4b..f04dc1c3 100644 --- a/internal/pool/conn_check.go +++ b/internal/pool/conn_check.go @@ -27,7 +27,8 @@ func connCheck(conn net.Conn) error { } var sysErr error - err = rawConn.Read(func(fd uintptr) bool { + + if err := rawConn.Read(func(fd uintptr) bool { var buf [1]byte n, err := syscall.Read(int(fd), buf[:]) switch { @@ -41,8 +42,7 @@ func connCheck(conn net.Conn) error { sysErr = err } return true - }) - if err != nil { + }); err != nil { return err } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 42fed8e2..541ee8d4 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -57,13 +57,13 @@ type Options struct { Dialer func(context.Context) (net.Conn, error) OnClose func(*Conn) error - PoolFIFO bool - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolFIFO bool + PoolSize int + PoolTimeout time.Duration + MinIdleConns int + MaxIdleConns int + ConnMaxIdleTime time.Duration + ConnMaxLifetime time.Duration } type lastDialErrorWrap struct { @@ -71,17 +71,17 @@ type lastDialErrorWrap struct { } type ConnPool struct { - opt *Options + cfg *Options dialErrorsNum uint32 // atomic - lastDialError atomic.Value queue chan struct{} - connsMu sync.Mutex - conns []*Conn - idleConns []*Conn + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int idleConnsLen int @@ -95,7 +95,7 @@ var _ Pooler = (*ConnPool)(nil) func NewConnPool(opt *Options) *ConnPool { p := &ConnPool{ - opt: opt, + cfg: opt, queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), @@ -107,18 +107,14 @@ func NewConnPool(opt *Options) *ConnPool { p.checkMinIdleConns() p.connsMu.Unlock() - if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { - go p.reaper(opt.IdleCheckFrequency) - } - return p } func (p *ConnPool) checkMinIdleConns() { - if p.opt.MinIdleConns == 0 { + if p.cfg.MinIdleConns == 0 { return } - for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns { p.poolSize++ p.idleConnsLen++ @@ -176,7 +172,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { p.conns = append(p.conns, cn) if pooled { // If pool is full remove the cn on next Put. - if p.poolSize >= p.opt.PoolSize { + if p.poolSize >= p.cfg.PoolSize { cn.pooled = false } else { p.poolSize++ @@ -191,14 +187,14 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { return nil, ErrClosed } - if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { + if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.cfg.PoolSize) { return nil, p.getLastDialError() } - netConn, err := p.opt.Dialer(ctx) + netConn, err := p.cfg.Dialer(ctx) if err != nil { p.setLastDialError(err) - if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { + if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) { go p.tryDial() } return nil, err @@ -215,7 +211,7 @@ func (p *ConnPool) tryDial() { return } - conn, err := p.opt.Dialer(context.Background()) + conn, err := p.cfg.Dialer(context.Background()) if err != nil { p.setLastDialError(err) time.Sleep(time.Second) @@ -263,7 +259,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { break } - if p.isStaleConn(cn) { + if !p.isHealthyConn(cn) { _ = p.CloseConn(cn) continue } @@ -283,10 +279,6 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { return newcn, nil } -func (p *ConnPool) getTurn() { - p.queue <- struct{}{} -} - func (p *ConnPool) waitTurn(ctx context.Context) error { select { case <-ctx.Done(): @@ -301,7 +293,7 @@ func (p *ConnPool) waitTurn(ctx context.Context) error { } timer := timers.Get().(*time.Timer) - timer.Reset(p.opt.PoolTimeout) + timer.Reset(p.cfg.PoolTimeout) select { case <-ctx.Done(): @@ -337,7 +329,7 @@ func (p *ConnPool) popIdle() (*Conn, error) { } var cn *Conn - if p.opt.PoolFIFO { + if p.cfg.PoolFIFO { cn = p.idleConns[0] copy(p.idleConns, p.idleConns[1:]) p.idleConns = p.idleConns[:n-1] @@ -363,11 +355,25 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { return } + var shouldCloseConn bool + p.connsMu.Lock() - p.idleConns = append(p.idleConns, cn) - p.idleConnsLen++ + + if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns { + p.idleConns = append(p.idleConns, cn) + p.idleConnsLen++ + } else { + p.removeConn(cn) + shouldCloseConn = true + } + p.connsMu.Unlock() + p.freeTurn() + + if shouldCloseConn { + _ = p.closeConn(cn) + } } func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { @@ -383,8 +389,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error { func (p *ConnPool) removeConnWithLock(cn *Conn) { p.connsMu.Lock() + defer p.connsMu.Unlock() p.removeConn(cn) - p.connsMu.Unlock() } func (p *ConnPool) removeConn(cn *Conn) { @@ -395,14 +401,14 @@ func (p *ConnPool) removeConn(cn *Conn) { p.poolSize-- p.checkMinIdleConns() } - return + break } } } func (p *ConnPool) closeConn(cn *Conn) error { - if p.opt.OnClose != nil { - _ = p.opt.OnClose(cn) + if p.cfg.OnClose != nil { + _ = p.cfg.OnClose(cn) } return cn.Close() } @@ -477,81 +483,21 @@ func (p *ConnPool) Close() error { return firstErr } -func (p *ConnPool) reaper(frequency time.Duration) { - ticker := time.NewTicker(frequency) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // It is possible that ticker and closedCh arrive together, - // and select pseudo-randomly pick ticker case, we double - // check here to prevent being executed after closed. - if p.closed() { - return - } - _, err := p.ReapStaleConns() - if err != nil { - internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err) - continue - } - case <-p.closedCh: - return - } - } -} - -func (p *ConnPool) ReapStaleConns() (int, error) { - var n int - for { - p.getTurn() - - p.connsMu.Lock() - cn := p.reapStaleConn() - p.connsMu.Unlock() - - p.freeTurn() - - if cn != nil { - _ = p.closeConn(cn) - n++ - } else { - break - } - } - atomic.AddUint32(&p.stats.StaleConns, uint32(n)) - return n, nil -} - -func (p *ConnPool) reapStaleConn() *Conn { - if len(p.idleConns) == 0 { - return nil - } - - cn := p.idleConns[0] - if !p.isStaleConn(cn) { - return nil - } - - p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) - p.idleConnsLen-- - p.removeConn(cn) - - return cn -} - -func (p *ConnPool) isStaleConn(cn *Conn) bool { - if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { - return connCheck(cn.netConn) != nil - } - +func (p *ConnPool) isHealthyConn(cn *Conn) bool { now := time.Now() - if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { - return true + + if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime { + return false } - if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { - return true + if p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime { + atomic.AddUint32(&p.stats.IdleConns, 1) + return false } - return connCheck(cn.netConn) != nil + if connCheck(cn.netConn) != nil { + return false + } + + cn.SetUsedAt(now) + return true } diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index e1e89f44..23a13af7 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -19,11 +19,10 @@ var _ = Describe("ConnPool", func() { BeforeEach(func() { connPool = pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: 10, - PoolTimeout: time.Hour, - IdleTimeout: time.Millisecond, - IdleCheckFrequency: time.Millisecond, + Dialer: dummyDialer, + PoolSize: 10, + PoolTimeout: time.Hour, + ConnMaxIdleTime: time.Millisecond, }) }) @@ -45,11 +44,10 @@ var _ = Describe("ConnPool", func() { <-closedChan return &net.TCPConn{}, nil }, - PoolSize: 10, - PoolTimeout: time.Hour, - IdleTimeout: time.Millisecond, - IdleCheckFrequency: time.Millisecond, - MinIdleConns: minIdleConns, + PoolSize: 10, + PoolTimeout: time.Hour, + ConnMaxIdleTime: time.Millisecond, + MinIdleConns: minIdleConns, }) wg.Wait() Expect(connPool.Close()).NotTo(HaveOccurred()) @@ -127,12 +125,11 @@ var _ = Describe("MinIdleConns", func() { newConnPool := func() *pool.ConnPool { connPool := pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: poolSize, - MinIdleConns: minIdleConns, - PoolTimeout: 100 * time.Millisecond, - IdleTimeout: -1, - IdleCheckFrequency: -1, + Dialer: dummyDialer, + PoolSize: poolSize, + MinIdleConns: minIdleConns, + PoolTimeout: 100 * time.Millisecond, + ConnMaxIdleTime: -1, }) Eventually(func() int { return connPool.Len() @@ -287,133 +284,6 @@ var _ = Describe("MinIdleConns", func() { }) }) -var _ = Describe("conns reaper", func() { - const idleTimeout = time.Minute - const maxAge = time.Hour - - ctx := context.Background() - var connPool *pool.ConnPool - var conns, staleConns, closedConns []*pool.Conn - - assert := func(typ string) { - BeforeEach(func() { - closedConns = nil - connPool = pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: 10, - IdleTimeout: idleTimeout, - MaxConnAge: maxAge, - PoolTimeout: time.Second, - IdleCheckFrequency: time.Hour, - OnClose: func(cn *pool.Conn) error { - closedConns = append(closedConns, cn) - return nil - }, - }) - - conns = nil - - // add stale connections - staleConns = nil - for i := 0; i < 3; i++ { - cn, err := connPool.Get(ctx) - Expect(err).NotTo(HaveOccurred()) - switch typ { - case "idle": - cn.SetUsedAt(time.Now().Add(-2 * idleTimeout)) - case "aged": - cn.SetCreatedAt(time.Now().Add(-2 * maxAge)) - case "connCheck": - _ = cn.Close() - } - conns = append(conns, cn) - staleConns = append(staleConns, cn) - } - - // add fresh connections - for i := 0; i < 3; i++ { - cn, err := connPool.Get(ctx) - Expect(err).NotTo(HaveOccurred()) - conns = append(conns, cn) - } - - for _, cn := range conns { - connPool.Put(ctx, cn) - } - - Expect(connPool.Len()).To(Equal(6)) - Expect(connPool.IdleLen()).To(Equal(6)) - - n, err := connPool.ReapStaleConns() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(3)) - }) - - AfterEach(func() { - _ = connPool.Close() - Expect(connPool.Len()).To(Equal(0)) - Expect(connPool.IdleLen()).To(Equal(0)) - Expect(len(closedConns)).To(Equal(len(conns))) - Expect(closedConns).To(ConsistOf(conns)) - }) - - It("reaps stale connections", func() { - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(3)) - }) - - It("does not reap fresh connections", func() { - n, err := connPool.ReapStaleConns() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(0)) - }) - - It("stale connections are closed", func() { - Expect(len(closedConns)).To(Equal(len(staleConns))) - Expect(closedConns).To(ConsistOf(staleConns)) - }) - - It("pool is functional", func() { - for j := 0; j < 3; j++ { - var freeCns []*pool.Conn - for i := 0; i < 3; i++ { - cn, err := connPool.Get(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(cn).NotTo(BeNil()) - freeCns = append(freeCns, cn) - } - - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(0)) - - cn, err := connPool.Get(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(cn).NotTo(BeNil()) - conns = append(conns, cn) - - Expect(connPool.Len()).To(Equal(4)) - Expect(connPool.IdleLen()).To(Equal(0)) - - connPool.Remove(ctx, cn, nil) - - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(0)) - - for _, cn := range freeCns { - connPool.Put(ctx, cn) - } - - Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.IdleLen()).To(Equal(3)) - } - }) - } - - assert("idle") - assert("aged") - assert("connCheck") -}) - var _ = Describe("race", func() { ctx := context.Background() var connPool *pool.ConnPool @@ -433,11 +303,10 @@ var _ = Describe("race", func() { It("does not happen on Get, Put, and Remove", func() { connPool = pool.NewConnPool(&pool.Options{ - Dialer: dummyDialer, - PoolSize: 10, - PoolTimeout: time.Minute, - IdleTimeout: time.Millisecond, - IdleCheckFrequency: time.Millisecond, + Dialer: dummyDialer, + PoolSize: 10, + PoolTimeout: time.Minute, + ConnMaxIdleTime: time.Millisecond, }) perform(C, func(id int) { diff --git a/main_test.go b/main_test.go index 7a80e9d2..24ba02ec 100644 --- a/main_test.go +++ b/main_test.go @@ -130,10 +130,9 @@ func redisOptions() *redis.Options { MaxRetries: -1, - PoolSize: 10, - PoolTimeout: 30 * time.Second, - IdleTimeout: time.Minute, - IdleCheckFrequency: 100 * time.Millisecond, + PoolSize: 10, + PoolTimeout: 30 * time.Second, + ConnMaxIdleTime: time.Minute, } } @@ -145,10 +144,9 @@ func redisClusterOptions() *redis.ClusterOptions { MaxRedirects: 8, - PoolSize: 10, - PoolTimeout: 30 * time.Second, - IdleTimeout: time.Minute, - IdleCheckFrequency: 100 * time.Millisecond, + PoolSize: 10, + PoolTimeout: 30 * time.Second, + ConnMaxIdleTime: time.Minute, } } @@ -165,10 +163,9 @@ func redisRingOptions() *redis.RingOptions { MaxRetries: -1, - PoolSize: 10, - PoolTimeout: 30 * time.Second, - IdleTimeout: time.Minute, - IdleCheckFrequency: 100 * time.Millisecond, + PoolSize: 10, + PoolTimeout: 30 * time.Second, + ConnMaxIdleTime: time.Minute, } } diff --git a/options.go b/options.go index 99cfddbb..8f5f380f 100644 --- a/options.go +++ b/options.go @@ -87,25 +87,22 @@ type Options struct { // Maximum number of socket connections. // Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS. PoolSize int - // Minimum number of idle connections which is useful when establishing - // new connection is slow. - MinIdleConns int - // Connection age at which client retires (closes) the connection. - // Default is to not close aged connections. - MaxConnAge time.Duration // Amount of time client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. PoolTimeout time.Duration + // Minimum number of idle connections which is useful when establishing + // new connection is slow. + MinIdleConns int + // Maximum number of idle connections. + MaxIdleConns int // Amount of time after which client closes idle connections. // Should be less than server's timeout. // Default is 5 minutes. -1 disables idle timeout check. - IdleTimeout time.Duration - // Frequency of idle checks made by idle connections reaper. - // Default is 1 minute. -1 disables idle connections reaper, - // but idle connections are still discarded by the client - // if IdleTimeout is set. - IdleCheckFrequency time.Duration + ConnMaxIdleTime time.Duration + // Connection age at which client retires (closes) the connection. + // Default is to not close aged connections. + ConnMaxLifetime time.Duration // Enables read only queries on slave nodes. readOnly bool @@ -161,11 +158,8 @@ func (opt *Options) init() { if opt.PoolTimeout == 0 { opt.PoolTimeout = opt.ReadTimeout + time.Second } - if opt.IdleTimeout == 0 { - opt.IdleTimeout = 5 * time.Minute - } - if opt.IdleCheckFrequency == 0 { - opt.IdleCheckFrequency = time.Minute + if opt.ConnMaxIdleTime == 0 { + opt.ConnMaxIdleTime = 30 * time.Minute } if opt.MaxRetries == -1 { @@ -297,6 +291,10 @@ type queryOptions struct { err error } +func (o *queryOptions) has(name string) bool { + return len(o.q[name]) > 0 +} + func (o *queryOptions) string(name string) string { vs := o.q[name] if len(vs) == 0 { @@ -391,11 +389,19 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) { o.WriteTimeout = q.duration("write_timeout") o.PoolFIFO = q.bool("pool_fifo") o.PoolSize = q.int("pool_size") - o.MinIdleConns = q.int("min_idle_conns") - o.MaxConnAge = q.duration("max_conn_age") o.PoolTimeout = q.duration("pool_timeout") - o.IdleTimeout = q.duration("idle_timeout") - o.IdleCheckFrequency = q.duration("idle_check_frequency") + o.MinIdleConns = q.int("min_idle_conns") + o.MaxIdleConns = q.int("max_idle_conns") + if q.has("conn_max_idle_time") { + o.ConnMaxIdleTime = q.duration("conn_max_idle_time") + } else { + o.ConnMaxIdleTime = q.duration("idle_timeout") + } + if q.has("conn_max_lifetime") { + o.ConnMaxLifetime = q.duration("conn_max_lifetime") + } else { + o.ConnMaxLifetime = q.duration("max_conn_age") + } if q.err != nil { return nil, q.err } @@ -424,12 +430,12 @@ func newConnPool(opt *Options) *pool.ConnPool { Dialer: func(ctx context.Context) (net.Conn, error) { return opt.Dialer(ctx, opt.Network, opt.Addr) }, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, }) } diff --git a/options_test.go b/options_test.go index 14505239..f9c69a48 100644 --- a/options_test.go +++ b/options_test.go @@ -47,18 +47,18 @@ func TestParseURL(t *testing.T) { }, { // special case handling for disabled timeouts url: "redis://localhost:123/?db=2&idle_timeout=0", - o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: -1}, + o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: -1}, }, { // negative values disable timeouts as well url: "redis://localhost:123/?db=2&idle_timeout=-1", - o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: -1}, + o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: -1}, }, { // absent timeout values will use defaults url: "redis://localhost:123/?db=2&idle_timeout=", - o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: 0}, + o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0}, }, { url: "redis://localhost:123/?db=2&idle_timeout", // missing "=" at the end - o: &Options{Addr: "localhost:123", DB: 2, IdleTimeout: 0}, + o: &Options{Addr: "localhost:123", DB: 2, ConnMaxIdleTime: 0}, }, { url: "unix:///tmp/redis.sock", o: &Options{Addr: "/tmp/redis.sock"}, @@ -174,20 +174,20 @@ func comprareOptions(t *testing.T, actual, expected *Options) { if actual.PoolSize != expected.PoolSize { t.Errorf("PoolSize: got %v, expected %v", actual.PoolSize, expected.PoolSize) } - if actual.MinIdleConns != expected.MinIdleConns { - t.Errorf("MinIdleConns: got %v, expected %v", actual.MinIdleConns, expected.MinIdleConns) - } - if actual.MaxConnAge != expected.MaxConnAge { - t.Errorf("MaxConnAge: got %v, expected %v", actual.MaxConnAge, expected.MaxConnAge) - } if actual.PoolTimeout != expected.PoolTimeout { t.Errorf("PoolTimeout: got %v, expected %v", actual.PoolTimeout, expected.PoolTimeout) } - if actual.IdleTimeout != expected.IdleTimeout { - t.Errorf("IdleTimeout: got %v, expected %v", actual.IdleTimeout, expected.IdleTimeout) + if actual.MinIdleConns != expected.MinIdleConns { + t.Errorf("MinIdleConns: got %v, expected %v", actual.MinIdleConns, expected.MinIdleConns) } - if actual.IdleCheckFrequency != expected.IdleCheckFrequency { - t.Errorf("IdleCheckFrequency: got %v, expected %v", actual.IdleCheckFrequency, expected.IdleCheckFrequency) + if actual.MaxIdleConns != expected.MaxIdleConns { + t.Errorf("MaxIdleConns: got %v, expected %v", actual.MaxIdleConns, expected.MaxIdleConns) + } + if actual.ConnMaxIdleTime != expected.ConnMaxIdleTime { + t.Errorf("ConnMaxIdleTime: got %v, expected %v", actual.ConnMaxIdleTime, expected.ConnMaxIdleTime) + } + if actual.ConnMaxLifetime != expected.ConnMaxLifetime { + t.Errorf("ConnMaxLifetime: got %v, expected %v", actual.ConnMaxLifetime, expected.ConnMaxLifetime) } } diff --git a/pool_test.go b/pool_test.go index a45b3de4..2a22edd3 100644 --- a/pool_test.go +++ b/pool_test.go @@ -16,8 +16,8 @@ var _ = Describe("pool", func() { BeforeEach(func() { opt := redisOptions() opt.MinIdleConns = 0 - opt.MaxConnAge = 0 - opt.IdleTimeout = time.Second + opt.ConnMaxLifetime = 0 + opt.ConnMaxIdleTime = time.Second client = redis.NewClient(opt) }) @@ -108,8 +108,8 @@ var _ = Describe("pool", func() { // explain: https://github.com/go-redis/redis/pull/1675 opt := redisOptions() opt.MinIdleConns = 0 - opt.MaxConnAge = 0 - opt.IdleTimeout = 2 * time.Second + opt.ConnMaxLifetime = 0 + opt.ConnMaxIdleTime = 10 * time.Second client = redis.NewClient(opt) for i := 0; i < 100; i++ { @@ -127,31 +127,4 @@ var _ = Describe("pool", func() { Expect(stats.Misses).To(Equal(uint32(1))) Expect(stats.Timeouts).To(Equal(uint32(0))) }) - - It("removes idle connections", func() { - err := client.Ping(ctx).Err() - Expect(err).NotTo(HaveOccurred()) - - stats := client.PoolStats() - Expect(stats).To(Equal(&redis.PoolStats{ - Hits: 0, - Misses: 1, - Timeouts: 0, - TotalConns: 1, - IdleConns: 1, - StaleConns: 0, - })) - - time.Sleep(2 * time.Second) - - stats = client.PoolStats() - Expect(stats).To(Equal(&redis.PoolStats{ - Hits: 0, - Misses: 1, - Timeouts: 0, - TotalConns: 0, - IdleConns: 0, - StaleConns: 1, - })) - }) }) diff --git a/pubsub_test.go b/pubsub_test.go index 6777ec67..892118e0 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -18,7 +18,7 @@ var _ = Describe("PubSub", func() { BeforeEach(func() { opt := redisOptions() opt.MinIdleConns = 0 - opt.MaxConnAge = 0 + opt.ConnMaxLifetime = 0 client = redis.NewClient(opt) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) }) diff --git a/ring.go b/ring.go index cdb429a9..dede1e49 100644 --- a/ring.go +++ b/ring.go @@ -82,12 +82,12 @@ type RingOptions struct { // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). PoolFIFO bool - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolSize int + PoolTimeout time.Duration + MinIdleConns int + MaxIdleConns int + ConnMaxIdleTime time.Duration + ConnMaxLifetime time.Duration TLSConfig *tls.Config Limiter Limiter @@ -142,13 +142,13 @@ func (opt *RingOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, Limiter: opt.Limiter, diff --git a/sentinel.go b/sentinel.go index f318e0fe..a2dbcce4 100644 --- a/sentinel.go +++ b/sentinel.go @@ -63,15 +63,14 @@ type FailoverOptions struct { ReadTimeout time.Duration WriteTimeout time.Duration - // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). PoolFIFO bool - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolSize int + PoolTimeout time.Duration + MinIdleConns int + MaxIdleConns int + ConnMaxIdleTime time.Duration + ConnMaxLifetime time.Duration TLSConfig *tls.Config } @@ -95,13 +94,13 @@ func (opt *FailoverOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, } @@ -126,13 +125,13 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, } @@ -158,13 +157,13 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + PoolTimeout: opt.PoolTimeout, + MinIdleConns: opt.MinIdleConns, + MaxIdleConns: opt.MaxIdleConns, + ConnMaxIdleTime: opt.ConnMaxIdleTime, + ConnMaxLifetime: opt.ConnMaxLifetime, TLSConfig: opt.TLSConfig, } @@ -580,7 +579,7 @@ func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *Sentin if err != nil { internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s", c.opt.MasterName, err) - return []string{} + return nil } return parseReplicaAddrs(addrs, false) } diff --git a/universal.go b/universal.go index 257a2055..a3c5b9d0 100644 --- a/universal.go +++ b/universal.go @@ -39,12 +39,12 @@ type UniversalOptions struct { // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO). PoolFIFO bool - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolSize int + PoolTimeout time.Duration + MinIdleConns int + MaxIdleConns int + ConnMaxIdleTime time.Duration + ConnMaxLifetime time.Duration TLSConfig *tls.Config @@ -84,16 +84,18 @@ func (o *UniversalOptions) Cluster() *ClusterOptions { MinRetryBackoff: o.MinRetryBackoff, MaxRetryBackoff: o.MaxRetryBackoff, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, - PoolFIFO: o.PoolFIFO, - PoolSize: o.PoolSize, - MinIdleConns: o.MinIdleConns, - MaxConnAge: o.MaxConnAge, - PoolTimeout: o.PoolTimeout, - IdleTimeout: o.IdleTimeout, - IdleCheckFrequency: o.IdleCheckFrequency, + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + + PoolFIFO: o.PoolFIFO, + + PoolSize: o.PoolSize, + PoolTimeout: o.PoolTimeout, + MinIdleConns: o.MinIdleConns, + MaxIdleConns: o.MaxIdleConns, + ConnMaxIdleTime: o.ConnMaxIdleTime, + ConnMaxLifetime: o.ConnMaxLifetime, TLSConfig: o.TLSConfig, } @@ -126,13 +128,13 @@ func (o *UniversalOptions) Failover() *FailoverOptions { ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, - PoolFIFO: o.PoolFIFO, - PoolSize: o.PoolSize, - MinIdleConns: o.MinIdleConns, - MaxConnAge: o.MaxConnAge, - PoolTimeout: o.PoolTimeout, - IdleTimeout: o.IdleTimeout, - IdleCheckFrequency: o.IdleCheckFrequency, + PoolFIFO: o.PoolFIFO, + PoolSize: o.PoolSize, + PoolTimeout: o.PoolTimeout, + MinIdleConns: o.MinIdleConns, + MaxIdleConns: o.MaxIdleConns, + ConnMaxIdleTime: o.ConnMaxIdleTime, + ConnMaxLifetime: o.ConnMaxLifetime, TLSConfig: o.TLSConfig, } @@ -162,13 +164,13 @@ func (o *UniversalOptions) Simple() *Options { ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, - PoolFIFO: o.PoolFIFO, - PoolSize: o.PoolSize, - MinIdleConns: o.MinIdleConns, - MaxConnAge: o.MaxConnAge, - PoolTimeout: o.PoolTimeout, - IdleTimeout: o.IdleTimeout, - IdleCheckFrequency: o.IdleCheckFrequency, + PoolFIFO: o.PoolFIFO, + PoolSize: o.PoolSize, + PoolTimeout: o.PoolTimeout, + MinIdleConns: o.MinIdleConns, + MaxIdleConns: o.MaxIdleConns, + ConnMaxIdleTime: o.ConnMaxIdleTime, + ConnMaxLifetime: o.ConnMaxLifetime, TLSConfig: o.TLSConfig, }