From faf5666fbd4d53631c20a3cdabc4719e3a9c55c2 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 28 May 2018 17:27:24 +0300 Subject: [PATCH] Cleanup pool --- cluster.go | 12 +- internal/pool/bench_test.go | 12 +- internal/pool/pool.go | 205 +++++++++++++++++++++-------------- internal/pool/pool_single.go | 12 +- internal/pool/pool_sticky.go | 54 ++++----- internal/pool/pool_test.go | 57 +++++----- options.go | 3 +- pool_test.go | 21 ++-- redis.go | 27 ++--- redis_test.go | 12 +- ring.go | 6 +- tx_test.go | 5 +- 12 files changed, 219 insertions(+), 207 deletions(-) diff --git a/cluster.go b/cluster.go index 6f0855eb..5fad373d 100644 --- a/cluster.go +++ b/cluster.go @@ -1172,7 +1172,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.getConn() + cn, err := node.Client.getConn() if err != nil { if err == pool.ErrClosed { c.remapCmds(cmds, failedCmds) @@ -1184,9 +1184,9 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) if err == nil || internal.IsRedisError(err) { - _ = node.Client.connPool.Put(cn) + node.Client.connPool.Put(cn) } else { - _ = node.Client.connPool.Remove(cn) + node.Client.connPool.Remove(cn) } } @@ -1336,7 +1336,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.getConn() + cn, err := node.Client.getConn() if err != nil { if err == pool.ErrClosed { c.remapCmds(cmds, failedCmds) @@ -1348,9 +1348,9 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) if err == nil || internal.IsRedisError(err) { - _ = node.Client.connPool.Put(cn) + node.Client.connPool.Put(cn) } else { - _ = node.Client.connPool.Remove(cn) + node.Client.connPool.Remove(cn) } } diff --git a/internal/pool/bench_test.go b/internal/pool/bench_test.go index e0bb5244..e80c9c00 100644 --- a/internal/pool/bench_test.go +++ b/internal/pool/bench_test.go @@ -20,13 +20,11 @@ func benchmarkPoolGetPut(b *testing.B, poolSize int) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - cn, _, err := connPool.Get() + cn, err := connPool.Get() if err != nil { b.Fatal(err) } - if err = connPool.Put(cn); err != nil { - b.Fatal(err) - } + connPool.Put(cn) } }) } @@ -56,13 +54,11 @@ func benchmarkPoolGetRemove(b *testing.B, poolSize int) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - cn, _, err := connPool.Get() + cn, err := connPool.Get() if err != nil { b.Fatal(err) } - if err := connPool.Remove(cn); err != nil { - b.Fatal(err) - } + connPool.Remove(cn) } }) } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index ae81905e..cab66904 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -28,7 +28,8 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // number of free connections in the pool + FreeConns uint32 // deprecated - use IdleConns + IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -36,12 +37,12 @@ type Pooler interface { NewConn() (*Conn, error) CloseConn(*Conn) error - Get() (*Conn, bool, error) - Put(*Conn) error - Remove(*Conn) error + Get() (*Conn, error) + Put(*Conn) + Remove(*Conn) Len() int - FreeLen() int + IdleLen() int Stats() *Stats Close() error @@ -70,8 +71,8 @@ type ConnPool struct { connsMu sync.Mutex conns []*Conn - freeConnsMu sync.Mutex - freeConns []*Conn + idleConnsMu sync.RWMutex + idleConns []*Conn stats Stats @@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool { queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), - freeConns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } + return p } func (p *ConnPool) NewConn() (*Conn, error) { + cn, err := p.newConn() + if err != nil { + return nil, err + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.connsMu.Unlock() + return cn, nil +} + +func (p *ConnPool) newConn() (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) { return nil, err } - cn := NewConn(netConn) - p.connsMu.Lock() - p.conns = append(p.conns, cn) - p.connsMu.Unlock() - - return cn, nil + return NewConn(netConn), nil } func (p *ConnPool) tryDial() { @@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error { } // Get returns existed connection from the pool or creates a new one. -func (p *ConnPool) Get() (*Conn, bool, error) { +func (p *ConnPool) Get() (*Conn, error) { if p.closed() { - return nil, false, ErrClosed + return nil, ErrClosed } - select { - case p.queue <- struct{}{}: - default: - timer := timers.Get().(*time.Timer) - timer.Reset(p.opt.PoolTimeout) - - select { - case p.queue <- struct{}{}: - if !timer.Stop() { - <-timer.C - } - timers.Put(timer) - case <-timer.C: - timers.Put(timer) - atomic.AddUint32(&p.stats.Timeouts, 1) - return nil, false, ErrPoolTimeout - } + err := p.waitTurn() + if err != nil { + return nil, err } for { - p.freeConnsMu.Lock() - cn := p.popFree() - p.freeConnsMu.Unlock() + p.idleConnsMu.Lock() + cn := p.popIdle() + p.idleConnsMu.Unlock() if cn == nil { break @@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) { } atomic.AddUint32(&p.stats.Hits, 1) - return cn, false, nil + return cn, nil } atomic.AddUint32(&p.stats.Misses, 1) newcn, err := p.NewConn() if err != nil { - <-p.queue - return nil, false, err + p.freeTurn() + return nil, err } - return newcn, true, nil + return newcn, nil } -func (p *ConnPool) popFree() *Conn { - if len(p.freeConns) == 0 { +func (p *ConnPool) getTurn() { + p.queue <- struct{}{} +} + +func (p *ConnPool) waitTurn() error { + select { + case p.queue <- struct{}{}: + return nil + default: + timer := timers.Get().(*time.Timer) + timer.Reset(p.opt.PoolTimeout) + + select { + case p.queue <- struct{}{}: + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return nil + case <-timer.C: + timers.Put(timer) + atomic.AddUint32(&p.stats.Timeouts, 1) + return ErrPoolTimeout + } + } +} + +func (p *ConnPool) freeTurn() { + <-p.queue +} + +func (p *ConnPool) popIdle() *Conn { + if len(p.idleConns) == 0 { return nil } - idx := len(p.freeConns) - 1 - cn := p.freeConns[idx] - p.freeConns = p.freeConns[:idx] + idx := len(p.idleConns) - 1 + cn := p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + return cn } -func (p *ConnPool) Put(cn *Conn) error { - if data := cn.Rd.PeekBuffered(); data != nil { - internal.Logf("connection has unread data: %q", data) - return p.Remove(cn) +func (p *ConnPool) Put(cn *Conn) { + buf := cn.Rd.PeekBuffered() + if buf != nil { + internal.Logf("connection has unread data: %.100q", buf) + p.Remove(cn) + return } - p.freeConnsMu.Lock() - p.freeConns = append(p.freeConns, cn) - p.freeConnsMu.Unlock() - <-p.queue - return nil + + p.idleConnsMu.Lock() + p.idleConns = append(p.idleConns, cn) + p.idleConnsMu.Unlock() + p.freeTurn() } -func (p *ConnPool) Remove(cn *Conn) error { - _ = p.CloseConn(cn) - <-p.queue - return nil +func (p *ConnPool) Remove(cn *Conn) { + p.removeConn(cn) + p.freeTurn() + _ = p.closeConn(cn) } func (p *ConnPool) CloseConn(cn *Conn) error { + p.removeConn(cn) + return p.closeConn(cn) +} + +func (p *ConnPool) removeConn(cn *Conn) { p.connsMu.Lock() for i, c := range p.conns { if c == cn { @@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error { } } p.connsMu.Unlock() - - return p.closeConn(cn) } func (p *ConnPool) closeConn(cn *Conn) error { @@ -263,22 +296,24 @@ func (p *ConnPool) Len() int { return l } -// FreeLen returns number of free connections. -func (p *ConnPool) FreeLen() int { - p.freeConnsMu.Lock() - l := len(p.freeConns) - p.freeConnsMu.Unlock() +// FreeLen returns number of idle connections. +func (p *ConnPool) IdleLen() int { + p.idleConnsMu.RLock() + l := len(p.idleConns) + p.idleConnsMu.RUnlock() return l } func (p *ConnPool) Stats() *Stats { + idleLen := p.IdleLen() return &Stats{ Hits: atomic.LoadUint32(&p.stats.Hits), Misses: atomic.LoadUint32(&p.stats.Misses), Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(p.FreeLen()), + FreeConns: uint32(idleLen), + IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } } @@ -316,41 +351,45 @@ func (p *ConnPool) Close() error { p.conns = nil p.connsMu.Unlock() - p.freeConnsMu.Lock() - p.freeConns = nil - p.freeConnsMu.Unlock() + p.idleConnsMu.Lock() + p.idleConns = nil + p.idleConnsMu.Unlock() return firstErr } -func (p *ConnPool) reapStaleConn() bool { - if len(p.freeConns) == 0 { - return false +func (p *ConnPool) reapStaleConn() *Conn { + if len(p.idleConns) == 0 { + return nil } - cn := p.freeConns[0] + cn := p.idleConns[0] if !cn.IsStale(p.opt.IdleTimeout) { - return false + return nil } - p.CloseConn(cn) - p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...) + p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) - return true + return cn } func (p *ConnPool) ReapStaleConns() (int, error) { var n int for { - p.queue <- struct{}{} - p.freeConnsMu.Lock() + p.getTurn() - reaped := p.reapStaleConn() + p.idleConnsMu.Lock() + cn := p.reapStaleConn() + p.idleConnsMu.Unlock() - p.freeConnsMu.Unlock() - <-p.queue + if cn != nil { + p.removeConn(cn) + } - if reaped { + p.freeTurn() + + if cn != nil { + p.closeConn(cn) n++ } else { break diff --git a/internal/pool/pool_single.go b/internal/pool/pool_single.go index ff91279b..b35b78af 100644 --- a/internal/pool/pool_single.go +++ b/internal/pool/pool_single.go @@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error { panic("not implemented") } -func (p *SingleConnPool) Get() (*Conn, bool, error) { - return p.cn, false, nil +func (p *SingleConnPool) Get() (*Conn, error) { + return p.cn, nil } -func (p *SingleConnPool) Put(cn *Conn) error { +func (p *SingleConnPool) Put(cn *Conn) { if p.cn != cn { panic("p.cn != cn") } - return nil } -func (p *SingleConnPool) Remove(cn *Conn) error { +func (p *SingleConnPool) Remove(cn *Conn) { if p.cn != cn { panic("p.cn != cn") } - return nil } func (p *SingleConnPool) Len() int { return 1 } -func (p *SingleConnPool) FreeLen() int { +func (p *SingleConnPool) IdleLen() int { return 0 } diff --git a/internal/pool/pool_sticky.go b/internal/pool/pool_sticky.go index 17f16385..91bd9133 100644 --- a/internal/pool/pool_sticky.go +++ b/internal/pool/pool_sticky.go @@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error { panic("not implemented") } -func (p *StickyConnPool) Get() (*Conn, bool, error) { +func (p *StickyConnPool) Get() (*Conn, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { - return nil, false, ErrClosed + return nil, ErrClosed } if p.cn != nil { - return p.cn, false, nil + return p.cn, nil } - cn, _, err := p.pool.Get() + cn, err := p.pool.Get() if err != nil { - return nil, false, err + return nil, err } + p.cn = cn - return cn, true, nil + return cn, nil } -func (p *StickyConnPool) putUpstream() (err error) { - err = p.pool.Put(p.cn) +func (p *StickyConnPool) putUpstream() { + p.pool.Put(p.cn) p.cn = nil - return err } -func (p *StickyConnPool) Put(cn *Conn) error { - p.mu.Lock() - defer p.mu.Unlock() +func (p *StickyConnPool) Put(cn *Conn) {} - if p.closed { - return ErrClosed - } - return nil -} - -func (p *StickyConnPool) removeUpstream() error { - err := p.pool.Remove(p.cn) +func (p *StickyConnPool) removeUpstream() { + p.pool.Remove(p.cn) p.cn = nil - return err } -func (p *StickyConnPool) Remove(cn *Conn) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return nil - } - return p.removeUpstream() +func (p *StickyConnPool) Remove(cn *Conn) { + p.removeUpstream() } func (p *StickyConnPool) Len() int { @@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int { return 1 } -func (p *StickyConnPool) FreeLen() int { +func (p *StickyConnPool) IdleLen() int { p.mu.Lock() defer p.mu.Unlock() @@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error { return ErrClosed } p.closed = true - var err error + if p.cn != nil { if p.reusable { - err = p.putUpstream() + p.putUpstream() } else { - err = p.removeUpstream() + p.removeUpstream() } } - return err + + return nil } diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 68c9a1be..49602685 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -29,13 +29,13 @@ var _ = Describe("ConnPool", func() { It("should unblock client when conn is removed", func() { // Reserve one connection. - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) // Reserve all other connections. var cns []*pool.Conn for i := 0; i < 9; i++ { - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) cns = append(cns, cn) } @@ -46,12 +46,11 @@ var _ = Describe("ConnPool", func() { defer GinkgoRecover() started <- true - _, _, err := connPool.Get() + _, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) done <- true - err = connPool.Put(cn) - Expect(err).NotTo(HaveOccurred()) + connPool.Put(cn) }() <-started @@ -59,14 +58,13 @@ var _ = Describe("ConnPool", func() { select { case <-done: Fail("Get is not blocked") - default: + case <-time.After(time.Millisecond): // ok } - err = connPool.Remove(cn) - Expect(err).NotTo(HaveOccurred()) + connPool.Remove(cn) - // Check that Ping is unblocked. + // Check that Get is unblocked. select { case <-done: // ok @@ -75,8 +73,7 @@ var _ = Describe("ConnPool", func() { } for _, cn := range cns { - err = connPool.Put(cn) - Expect(err).NotTo(HaveOccurred()) + connPool.Put(cn) } }) }) @@ -107,7 +104,7 @@ var _ = Describe("conns reaper", func() { // add stale connections idleConns = nil for i := 0; i < 3; i++ { - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) cn.SetUsedAt(time.Now().Add(-2 * idleTimeout)) conns = append(conns, cn) @@ -116,17 +113,17 @@ var _ = Describe("conns reaper", func() { // add fresh connections for i := 0; i < 3; i++ { - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) conns = append(conns, cn) } for _, cn := range conns { - Expect(connPool.Put(cn)).NotTo(HaveOccurred()) + connPool.Put(cn) } Expect(connPool.Len()).To(Equal(6)) - Expect(connPool.FreeLen()).To(Equal(6)) + Expect(connPool.IdleLen()).To(Equal(6)) n, err := connPool.ReapStaleConns() Expect(err).NotTo(HaveOccurred()) @@ -136,14 +133,14 @@ var _ = Describe("conns reaper", func() { AfterEach(func() { _ = connPool.Close() Expect(connPool.Len()).To(Equal(0)) - Expect(connPool.FreeLen()).To(Equal(0)) + Expect(connPool.IdleLen()).To(Equal(0)) Expect(len(closedConns)).To(Equal(len(conns))) Expect(closedConns).To(ConsistOf(conns)) }) It("reaps stale connections", func() { Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.FreeLen()).To(Equal(3)) + Expect(connPool.IdleLen()).To(Equal(3)) }) It("does not reap fresh connections", func() { @@ -161,36 +158,34 @@ var _ = Describe("conns reaper", func() { for j := 0; j < 3; j++ { var freeCns []*pool.Conn for i := 0; i < 3; i++ { - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) Expect(cn).NotTo(BeNil()) freeCns = append(freeCns, cn) } Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.FreeLen()).To(Equal(0)) + Expect(connPool.IdleLen()).To(Equal(0)) - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) Expect(cn).NotTo(BeNil()) conns = append(conns, cn) Expect(connPool.Len()).To(Equal(4)) - Expect(connPool.FreeLen()).To(Equal(0)) + Expect(connPool.IdleLen()).To(Equal(0)) - err = connPool.Remove(cn) - Expect(err).NotTo(HaveOccurred()) + connPool.Remove(cn) Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.FreeLen()).To(Equal(0)) + Expect(connPool.IdleLen()).To(Equal(0)) for _, cn := range freeCns { - err := connPool.Put(cn) - Expect(err).NotTo(HaveOccurred()) + connPool.Put(cn) } Expect(connPool.Len()).To(Equal(3)) - Expect(connPool.FreeLen()).To(Equal(3)) + Expect(connPool.IdleLen()).To(Equal(3)) } }) }) @@ -222,18 +217,18 @@ var _ = Describe("race", func() { perform(C, func(id int) { for i := 0; i < N; i++ { - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) if err == nil { - Expect(connPool.Put(cn)).NotTo(HaveOccurred()) + connPool.Put(cn) } } }, func(id int) { for i := 0; i < N; i++ { - cn, _, err := connPool.Get() + cn, err := connPool.Get() Expect(err).NotTo(HaveOccurred()) if err == nil { - Expect(connPool.Remove(cn)).NotTo(HaveOccurred()) + connPool.Remove(cn) } } }) diff --git a/options.go b/options.go index 75648053..35ce0619 100644 --- a/options.go +++ b/options.go @@ -68,8 +68,7 @@ type Options struct { // Default is 5 minutes. IdleTimeout time.Duration // Frequency of idle checks. - // Default is 1 minute. - // When minus value is set, then idle check is disabled. + // Default is 1 minute. -1 disables idle check. IdleCheckFrequency time.Duration // Enables read only queries on slave nodes. diff --git a/pool_test.go b/pool_test.go index 0ca09adc..d7dc9d01 100644 --- a/pool_test.go +++ b/pool_test.go @@ -30,8 +30,8 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(BeNumerically("<=", 10)) - Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) - Expect(pool.Len()).To(Equal(pool.FreeLen())) + Expect(pool.IdleLen()).To(BeNumerically("<=", 10)) + Expect(pool.Len()).To(Equal(pool.IdleLen())) }) It("respects max size on multi", func() { @@ -55,8 +55,8 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(BeNumerically("<=", 10)) - Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) - Expect(pool.Len()).To(Equal(pool.FreeLen())) + Expect(pool.IdleLen()).To(BeNumerically("<=", 10)) + Expect(pool.Len()).To(Equal(pool.IdleLen())) }) It("respects max size on pipelines", func() { @@ -73,15 +73,15 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(BeNumerically("<=", 10)) - Expect(pool.FreeLen()).To(BeNumerically("<=", 10)) - Expect(pool.Len()).To(Equal(pool.FreeLen())) + Expect(pool.IdleLen()).To(BeNumerically("<=", 10)) + Expect(pool.Len()).To(Equal(pool.IdleLen())) }) It("removes broken connections", func() { - cn, _, err := client.Pool().Get() + cn, err := client.Pool().Get() Expect(err).NotTo(HaveOccurred()) cn.SetNetConn(&badConn{}) - Expect(client.Pool().Put(cn)).NotTo(HaveOccurred()) + client.Pool().Put(cn) err = client.Ping().Err() Expect(err).To(MatchError("bad connection")) @@ -92,7 +92,7 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(Equal(1)) - Expect(pool.FreeLen()).To(Equal(1)) + Expect(pool.IdleLen()).To(Equal(1)) stats := pool.Stats() Expect(stats.Hits).To(Equal(uint32(2))) @@ -109,7 +109,7 @@ var _ = Describe("pool", func() { pool := client.Pool() Expect(pool.Len()).To(Equal(1)) - Expect(pool.FreeLen()).To(Equal(1)) + Expect(pool.IdleLen()).To(Equal(1)) stats := pool.Stats() Expect(stats.Hits).To(Equal(uint32(100))) @@ -125,6 +125,7 @@ var _ = Describe("pool", func() { Timeouts: 0, TotalConns: 1, FreeConns: 1, + IdleConns: 1, StaleConns: 0, })) diff --git a/redis.go b/redis.go index 7a606b70..beb632e1 100644 --- a/redis.go +++ b/redis.go @@ -60,29 +60,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return cn, nil } -func (c *baseClient) getConn() (*pool.Conn, bool, error) { - cn, isNew, err := c.connPool.Get() +func (c *baseClient) getConn() (*pool.Conn, error) { + cn, err := c.connPool.Get() if err != nil { - return nil, false, err + return nil, err } if !cn.Inited { - if err := c.initConn(cn); err != nil { - _ = c.connPool.Remove(cn) - return nil, false, err + err := c.initConn(cn) + if err != nil { + c.connPool.Remove(cn) + return nil, err } } - return cn, isNew, nil + return cn, nil } func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { if internal.IsBadConn(err, false) { - _ = c.connPool.Remove(cn) + c.connPool.Remove(cn) return false } - _ = c.connPool.Put(cn) + c.connPool.Put(cn) return true } @@ -137,7 +138,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - cn, _, err := c.getConn() + cn, err := c.getConn() if err != nil { cmd.setErr(err) if internal.IsRetryableError(err, true) { @@ -225,7 +226,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e time.Sleep(c.retryBackoff(attempt)) } - cn, _, err := c.getConn() + cn, err := c.getConn() if err != nil { setCmdsErr(cmds, err) return err @@ -234,10 +235,10 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e canRetry, err := p(cn, cmds) if err == nil || internal.IsRedisError(err) { - _ = c.connPool.Put(cn) + c.connPool.Put(cn) break } - _ = c.connPool.Remove(cn) + c.connPool.Remove(cn) if !canRetry || !internal.IsRetryableError(err, true) { break diff --git a/redis_test.go b/redis_test.go index df2485d3..f46728f9 100644 --- a/redis_test.go +++ b/redis_test.go @@ -145,12 +145,11 @@ var _ = Describe("Client", func() { }) // Put bad connection in the pool. - cn, _, err := client.Pool().Get() + cn, err := client.Pool().Get() Expect(err).NotTo(HaveOccurred()) cn.SetNetConn(&badConn{}) - err = client.Pool().Put(cn) - Expect(err).NotTo(HaveOccurred()) + client.Pool().Put(cn) err = client.Ping().Err() Expect(err).NotTo(HaveOccurred()) @@ -184,19 +183,18 @@ var _ = Describe("Client", func() { }) It("should update conn.UsedAt on read/write", func() { - cn, _, err := client.Pool().Get() + cn, err := client.Pool().Get() Expect(err).NotTo(HaveOccurred()) Expect(cn.UsedAt).NotTo(BeZero()) createdAt := cn.UsedAt() - err = client.Pool().Put(cn) - Expect(err).NotTo(HaveOccurred()) + client.Pool().Put(cn) Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue()) err = client.Ping().Err() Expect(err).NotTo(HaveOccurred()) - cn, _, err = client.Pool().Get() + cn, err = client.Pool().Get() Expect(err).NotTo(HaveOccurred()) Expect(cn).NotTo(BeNil()) Expect(cn.UsedAt().After(createdAt)).To(BeTrue()) diff --git a/ring.go b/ring.go index 362bd031..b47a1094 100644 --- a/ring.go +++ b/ring.go @@ -525,7 +525,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { continue } - cn, _, err := shard.Client.getConn() + cn, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) continue @@ -533,10 +533,10 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) if err == nil || internal.IsRedisError(err) { - _ = shard.Client.connPool.Put(cn) + shard.Client.connPool.Put(cn) continue } - _ = shard.Client.connPool.Remove(cn) + shard.Client.connPool.Remove(cn) if canRetry && internal.IsRetryableError(err, true) { if failedCmdsMap == nil { diff --git a/tx_test.go b/tx_test.go index de597ff0..c70f08ce 100644 --- a/tx_test.go +++ b/tx_test.go @@ -124,12 +124,11 @@ var _ = Describe("Tx", func() { It("should recover from bad connection", func() { // Put bad connection in the pool. - cn, _, err := client.Pool().Get() + cn, err := client.Pool().Get() Expect(err).NotTo(HaveOccurred()) cn.SetNetConn(&badConn{}) - err = client.Pool().Put(cn) - Expect(err).NotTo(HaveOccurred()) + client.Pool().Put(cn) do := func() error { err := client.Watch(func(tx *redis.Tx) error {