forked from mirror/redis
Merge pull request #780 from go-redis/fix/pool-cleanup
Fix/pool cleanup
This commit is contained in:
commit
ef33be46cd
12
cluster.go
12
cluster.go
|
@ -1172,7 +1172,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
failedCmds := make(map[*clusterNode][]Cmder)
|
failedCmds := make(map[*clusterNode][]Cmder)
|
||||||
|
|
||||||
for node, cmds := range cmdsMap {
|
for node, cmds := range cmdsMap {
|
||||||
cn, _, err := node.Client.getConn()
|
cn, err := node.Client.getConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == pool.ErrClosed {
|
if err == pool.ErrClosed {
|
||||||
c.remapCmds(cmds, failedCmds)
|
c.remapCmds(cmds, failedCmds)
|
||||||
|
@ -1184,9 +1184,9 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
|
|
||||||
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
|
err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
if err == nil || internal.IsRedisError(err) {
|
||||||
_ = node.Client.connPool.Put(cn)
|
node.Client.connPool.Put(cn)
|
||||||
} else {
|
} else {
|
||||||
_ = node.Client.connPool.Remove(cn)
|
node.Client.connPool.Remove(cn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1336,7 +1336,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
|
||||||
failedCmds := make(map[*clusterNode][]Cmder)
|
failedCmds := make(map[*clusterNode][]Cmder)
|
||||||
|
|
||||||
for node, cmds := range cmdsMap {
|
for node, cmds := range cmdsMap {
|
||||||
cn, _, err := node.Client.getConn()
|
cn, err := node.Client.getConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == pool.ErrClosed {
|
if err == pool.ErrClosed {
|
||||||
c.remapCmds(cmds, failedCmds)
|
c.remapCmds(cmds, failedCmds)
|
||||||
|
@ -1348,9 +1348,9 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
|
||||||
|
|
||||||
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
|
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
if err == nil || internal.IsRedisError(err) {
|
||||||
_ = node.Client.connPool.Put(cn)
|
node.Client.connPool.Put(cn)
|
||||||
} else {
|
} else {
|
||||||
_ = node.Client.connPool.Remove(cn)
|
node.Client.connPool.Remove(cn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,13 +20,11 @@ func benchmarkPoolGetPut(b *testing.B, poolSize int) {
|
||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
if err = connPool.Put(cn); err != nil {
|
connPool.Put(cn)
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -56,13 +54,11 @@ func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
|
||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := connPool.Remove(cn); err != nil {
|
connPool.Remove(cn)
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,8 @@ type Stats struct {
|
||||||
Timeouts uint32 // number of times a wait timeout occurred
|
Timeouts uint32 // number of times a wait timeout occurred
|
||||||
|
|
||||||
TotalConns uint32 // number of total connections in the pool
|
TotalConns uint32 // number of total connections in the pool
|
||||||
FreeConns uint32 // number of free connections in the pool
|
FreeConns uint32 // deprecated - use IdleConns
|
||||||
|
IdleConns uint32 // number of idle connections in the pool
|
||||||
StaleConns uint32 // number of stale connections removed from the pool
|
StaleConns uint32 // number of stale connections removed from the pool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,12 +37,12 @@ type Pooler interface {
|
||||||
NewConn() (*Conn, error)
|
NewConn() (*Conn, error)
|
||||||
CloseConn(*Conn) error
|
CloseConn(*Conn) error
|
||||||
|
|
||||||
Get() (*Conn, bool, error)
|
Get() (*Conn, error)
|
||||||
Put(*Conn) error
|
Put(*Conn)
|
||||||
Remove(*Conn) error
|
Remove(*Conn)
|
||||||
|
|
||||||
Len() int
|
Len() int
|
||||||
FreeLen() int
|
IdleLen() int
|
||||||
Stats() *Stats
|
Stats() *Stats
|
||||||
|
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -70,8 +71,8 @@ type ConnPool struct {
|
||||||
connsMu sync.Mutex
|
connsMu sync.Mutex
|
||||||
conns []*Conn
|
conns []*Conn
|
||||||
|
|
||||||
freeConnsMu sync.Mutex
|
idleConnsMu sync.RWMutex
|
||||||
freeConns []*Conn
|
idleConns []*Conn
|
||||||
|
|
||||||
stats Stats
|
stats Stats
|
||||||
|
|
||||||
|
@ -86,15 +87,29 @@ func NewConnPool(opt *Options) *ConnPool {
|
||||||
|
|
||||||
queue: make(chan struct{}, opt.PoolSize),
|
queue: make(chan struct{}, opt.PoolSize),
|
||||||
conns: make([]*Conn, 0, opt.PoolSize),
|
conns: make([]*Conn, 0, opt.PoolSize),
|
||||||
freeConns: make([]*Conn, 0, opt.PoolSize),
|
idleConns: make([]*Conn, 0, opt.PoolSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
|
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
|
||||||
go p.reaper(opt.IdleCheckFrequency)
|
go p.reaper(opt.IdleCheckFrequency)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) NewConn() (*Conn, error) {
|
func (p *ConnPool) NewConn() (*Conn, error) {
|
||||||
|
cn, err := p.newConn()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.connsMu.Lock()
|
||||||
|
p.conns = append(p.conns, cn)
|
||||||
|
p.connsMu.Unlock()
|
||||||
|
return cn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ConnPool) newConn() (*Conn, error) {
|
||||||
if p.closed() {
|
if p.closed() {
|
||||||
return nil, ErrClosed
|
return nil, ErrClosed
|
||||||
}
|
}
|
||||||
|
@ -112,12 +127,7 @@ func (p *ConnPool) NewConn() (*Conn, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cn := NewConn(netConn)
|
return NewConn(netConn), nil
|
||||||
p.connsMu.Lock()
|
|
||||||
p.conns = append(p.conns, cn)
|
|
||||||
p.connsMu.Unlock()
|
|
||||||
|
|
||||||
return cn, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) tryDial() {
|
func (p *ConnPool) tryDial() {
|
||||||
|
@ -153,34 +163,20 @@ func (p *ConnPool) getLastDialError() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns existed connection from the pool or creates a new one.
|
// Get returns existed connection from the pool or creates a new one.
|
||||||
func (p *ConnPool) Get() (*Conn, bool, error) {
|
func (p *ConnPool) Get() (*Conn, error) {
|
||||||
if p.closed() {
|
if p.closed() {
|
||||||
return nil, false, ErrClosed
|
return nil, ErrClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
err := p.waitTurn()
|
||||||
case p.queue <- struct{}{}:
|
if err != nil {
|
||||||
default:
|
return nil, err
|
||||||
timer := timers.Get().(*time.Timer)
|
|
||||||
timer.Reset(p.opt.PoolTimeout)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case p.queue <- struct{}{}:
|
|
||||||
if !timer.Stop() {
|
|
||||||
<-timer.C
|
|
||||||
}
|
|
||||||
timers.Put(timer)
|
|
||||||
case <-timer.C:
|
|
||||||
timers.Put(timer)
|
|
||||||
atomic.AddUint32(&p.stats.Timeouts, 1)
|
|
||||||
return nil, false, ErrPoolTimeout
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
p.freeConnsMu.Lock()
|
p.idleConnsMu.Lock()
|
||||||
cn := p.popFree()
|
cn := p.popIdle()
|
||||||
p.freeConnsMu.Unlock()
|
p.idleConnsMu.Unlock()
|
||||||
|
|
||||||
if cn == nil {
|
if cn == nil {
|
||||||
break
|
break
|
||||||
|
@ -192,50 +188,89 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddUint32(&p.stats.Hits, 1)
|
atomic.AddUint32(&p.stats.Hits, 1)
|
||||||
return cn, false, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddUint32(&p.stats.Misses, 1)
|
atomic.AddUint32(&p.stats.Misses, 1)
|
||||||
|
|
||||||
newcn, err := p.NewConn()
|
newcn, err := p.NewConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
<-p.queue
|
p.freeTurn()
|
||||||
return nil, false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return newcn, true, nil
|
return newcn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) popFree() *Conn {
|
func (p *ConnPool) getTurn() {
|
||||||
if len(p.freeConns) == 0 {
|
p.queue <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ConnPool) waitTurn() error {
|
||||||
|
select {
|
||||||
|
case p.queue <- struct{}{}:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
timer := timers.Get().(*time.Timer)
|
||||||
|
timer.Reset(p.opt.PoolTimeout)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case p.queue <- struct{}{}:
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
timers.Put(timer)
|
||||||
|
return nil
|
||||||
|
case <-timer.C:
|
||||||
|
timers.Put(timer)
|
||||||
|
atomic.AddUint32(&p.stats.Timeouts, 1)
|
||||||
|
return ErrPoolTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ConnPool) freeTurn() {
|
||||||
|
<-p.queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ConnPool) popIdle() *Conn {
|
||||||
|
if len(p.idleConns) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
idx := len(p.freeConns) - 1
|
idx := len(p.idleConns) - 1
|
||||||
cn := p.freeConns[idx]
|
cn := p.idleConns[idx]
|
||||||
p.freeConns = p.freeConns[:idx]
|
p.idleConns = p.idleConns[:idx]
|
||||||
|
|
||||||
return cn
|
return cn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Put(cn *Conn) error {
|
func (p *ConnPool) Put(cn *Conn) {
|
||||||
if data := cn.Rd.PeekBuffered(); data != nil {
|
buf := cn.Rd.PeekBuffered()
|
||||||
internal.Logf("connection has unread data: %q", data)
|
if buf != nil {
|
||||||
return p.Remove(cn)
|
internal.Logf("connection has unread data: %.100q", buf)
|
||||||
|
p.Remove(cn)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
p.freeConnsMu.Lock()
|
|
||||||
p.freeConns = append(p.freeConns, cn)
|
p.idleConnsMu.Lock()
|
||||||
p.freeConnsMu.Unlock()
|
p.idleConns = append(p.idleConns, cn)
|
||||||
<-p.queue
|
p.idleConnsMu.Unlock()
|
||||||
return nil
|
p.freeTurn()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Remove(cn *Conn) error {
|
func (p *ConnPool) Remove(cn *Conn) {
|
||||||
_ = p.CloseConn(cn)
|
p.removeConn(cn)
|
||||||
<-p.queue
|
p.freeTurn()
|
||||||
return nil
|
_ = p.closeConn(cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) CloseConn(cn *Conn) error {
|
func (p *ConnPool) CloseConn(cn *Conn) error {
|
||||||
|
p.removeConn(cn)
|
||||||
|
return p.closeConn(cn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ConnPool) removeConn(cn *Conn) {
|
||||||
p.connsMu.Lock()
|
p.connsMu.Lock()
|
||||||
for i, c := range p.conns {
|
for i, c := range p.conns {
|
||||||
if c == cn {
|
if c == cn {
|
||||||
|
@ -244,8 +279,6 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.connsMu.Unlock()
|
p.connsMu.Unlock()
|
||||||
|
|
||||||
return p.closeConn(cn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) closeConn(cn *Conn) error {
|
func (p *ConnPool) closeConn(cn *Conn) error {
|
||||||
|
@ -263,22 +296,24 @@ func (p *ConnPool) Len() int {
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
// FreeLen returns number of free connections.
|
// FreeLen returns number of idle connections.
|
||||||
func (p *ConnPool) FreeLen() int {
|
func (p *ConnPool) IdleLen() int {
|
||||||
p.freeConnsMu.Lock()
|
p.idleConnsMu.RLock()
|
||||||
l := len(p.freeConns)
|
l := len(p.idleConns)
|
||||||
p.freeConnsMu.Unlock()
|
p.idleConnsMu.RUnlock()
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Stats() *Stats {
|
func (p *ConnPool) Stats() *Stats {
|
||||||
|
idleLen := p.IdleLen()
|
||||||
return &Stats{
|
return &Stats{
|
||||||
Hits: atomic.LoadUint32(&p.stats.Hits),
|
Hits: atomic.LoadUint32(&p.stats.Hits),
|
||||||
Misses: atomic.LoadUint32(&p.stats.Misses),
|
Misses: atomic.LoadUint32(&p.stats.Misses),
|
||||||
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
|
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
|
||||||
|
|
||||||
TotalConns: uint32(p.Len()),
|
TotalConns: uint32(p.Len()),
|
||||||
FreeConns: uint32(p.FreeLen()),
|
FreeConns: uint32(idleLen),
|
||||||
|
IdleConns: uint32(idleLen),
|
||||||
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
|
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -316,41 +351,45 @@ func (p *ConnPool) Close() error {
|
||||||
p.conns = nil
|
p.conns = nil
|
||||||
p.connsMu.Unlock()
|
p.connsMu.Unlock()
|
||||||
|
|
||||||
p.freeConnsMu.Lock()
|
p.idleConnsMu.Lock()
|
||||||
p.freeConns = nil
|
p.idleConns = nil
|
||||||
p.freeConnsMu.Unlock()
|
p.idleConnsMu.Unlock()
|
||||||
|
|
||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) reapStaleConn() bool {
|
func (p *ConnPool) reapStaleConn() *Conn {
|
||||||
if len(p.freeConns) == 0 {
|
if len(p.idleConns) == 0 {
|
||||||
return false
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cn := p.freeConns[0]
|
cn := p.idleConns[0]
|
||||||
if !cn.IsStale(p.opt.IdleTimeout) {
|
if !cn.IsStale(p.opt.IdleTimeout) {
|
||||||
return false
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
p.CloseConn(cn)
|
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
|
||||||
p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
|
|
||||||
|
|
||||||
return true
|
return cn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) ReapStaleConns() (int, error) {
|
func (p *ConnPool) ReapStaleConns() (int, error) {
|
||||||
var n int
|
var n int
|
||||||
for {
|
for {
|
||||||
p.queue <- struct{}{}
|
p.getTurn()
|
||||||
p.freeConnsMu.Lock()
|
|
||||||
|
|
||||||
reaped := p.reapStaleConn()
|
p.idleConnsMu.Lock()
|
||||||
|
cn := p.reapStaleConn()
|
||||||
|
p.idleConnsMu.Unlock()
|
||||||
|
|
||||||
p.freeConnsMu.Unlock()
|
if cn != nil {
|
||||||
<-p.queue
|
p.removeConn(cn)
|
||||||
|
}
|
||||||
|
|
||||||
if reaped {
|
p.freeTurn()
|
||||||
|
|
||||||
|
if cn != nil {
|
||||||
|
p.closeConn(cn)
|
||||||
n++
|
n++
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
|
|
|
@ -20,29 +20,27 @@ func (p *SingleConnPool) CloseConn(*Conn) error {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Get() (*Conn, bool, error) {
|
func (p *SingleConnPool) Get() (*Conn, error) {
|
||||||
return p.cn, false, nil
|
return p.cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Put(cn *Conn) error {
|
func (p *SingleConnPool) Put(cn *Conn) {
|
||||||
if p.cn != cn {
|
if p.cn != cn {
|
||||||
panic("p.cn != cn")
|
panic("p.cn != cn")
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Remove(cn *Conn) error {
|
func (p *SingleConnPool) Remove(cn *Conn) {
|
||||||
if p.cn != cn {
|
if p.cn != cn {
|
||||||
panic("p.cn != cn")
|
panic("p.cn != cn")
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Len() int {
|
func (p *SingleConnPool) Len() int {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) FreeLen() int {
|
func (p *SingleConnPool) IdleLen() int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,55 +28,40 @@ func (p *StickyConnPool) CloseConn(*Conn) error {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Get() (*Conn, bool, error) {
|
func (p *StickyConnPool) Get() (*Conn, error) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
if p.closed {
|
if p.closed {
|
||||||
return nil, false, ErrClosed
|
return nil, ErrClosed
|
||||||
}
|
}
|
||||||
if p.cn != nil {
|
if p.cn != nil {
|
||||||
return p.cn, false, nil
|
return p.cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, _, err := p.pool.Get()
|
cn, err := p.pool.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p.cn = cn
|
p.cn = cn
|
||||||
return cn, true, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) putUpstream() (err error) {
|
func (p *StickyConnPool) putUpstream() {
|
||||||
err = p.pool.Put(p.cn)
|
p.pool.Put(p.cn)
|
||||||
p.cn = nil
|
p.cn = nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Put(cn *Conn) error {
|
func (p *StickyConnPool) Put(cn *Conn) {}
|
||||||
p.mu.Lock()
|
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
if p.closed {
|
func (p *StickyConnPool) removeUpstream() {
|
||||||
return ErrClosed
|
p.pool.Remove(p.cn)
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *StickyConnPool) removeUpstream() error {
|
|
||||||
err := p.pool.Remove(p.cn)
|
|
||||||
p.cn = nil
|
p.cn = nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Remove(cn *Conn) error {
|
func (p *StickyConnPool) Remove(cn *Conn) {
|
||||||
p.mu.Lock()
|
p.removeUpstream()
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
if p.closed {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return p.removeUpstream()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Len() int {
|
func (p *StickyConnPool) Len() int {
|
||||||
|
@ -89,7 +74,7 @@ func (p *StickyConnPool) Len() int {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) FreeLen() int {
|
func (p *StickyConnPool) IdleLen() int {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
@ -111,13 +96,14 @@ func (p *StickyConnPool) Close() error {
|
||||||
return ErrClosed
|
return ErrClosed
|
||||||
}
|
}
|
||||||
p.closed = true
|
p.closed = true
|
||||||
var err error
|
|
||||||
if p.cn != nil {
|
if p.cn != nil {
|
||||||
if p.reusable {
|
if p.reusable {
|
||||||
err = p.putUpstream()
|
p.putUpstream()
|
||||||
} else {
|
} else {
|
||||||
err = p.removeUpstream()
|
p.removeUpstream()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,13 +29,13 @@ var _ = Describe("ConnPool", func() {
|
||||||
|
|
||||||
It("should unblock client when conn is removed", func() {
|
It("should unblock client when conn is removed", func() {
|
||||||
// Reserve one connection.
|
// Reserve one connection.
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Reserve all other connections.
|
// Reserve all other connections.
|
||||||
var cns []*pool.Conn
|
var cns []*pool.Conn
|
||||||
for i := 0; i < 9; i++ {
|
for i := 0; i < 9; i++ {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
cns = append(cns, cn)
|
cns = append(cns, cn)
|
||||||
}
|
}
|
||||||
|
@ -46,12 +46,11 @@ var _ = Describe("ConnPool", func() {
|
||||||
defer GinkgoRecover()
|
defer GinkgoRecover()
|
||||||
|
|
||||||
started <- true
|
started <- true
|
||||||
_, _, err := connPool.Get()
|
_, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
done <- true
|
done <- true
|
||||||
|
|
||||||
err = connPool.Put(cn)
|
connPool.Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}()
|
}()
|
||||||
<-started
|
<-started
|
||||||
|
|
||||||
|
@ -59,14 +58,13 @@ var _ = Describe("ConnPool", func() {
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
Fail("Get is not blocked")
|
Fail("Get is not blocked")
|
||||||
default:
|
case <-time.After(time.Millisecond):
|
||||||
// ok
|
// ok
|
||||||
}
|
}
|
||||||
|
|
||||||
err = connPool.Remove(cn)
|
connPool.Remove(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
// Check that Ping is unblocked.
|
// Check that Get is unblocked.
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
// ok
|
// ok
|
||||||
|
@ -75,8 +73,7 @@ var _ = Describe("ConnPool", func() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, cn := range cns {
|
for _, cn := range cns {
|
||||||
err = connPool.Put(cn)
|
connPool.Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -107,7 +104,7 @@ var _ = Describe("conns reaper", func() {
|
||||||
// add stale connections
|
// add stale connections
|
||||||
idleConns = nil
|
idleConns = nil
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
|
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
|
||||||
conns = append(conns, cn)
|
conns = append(conns, cn)
|
||||||
|
@ -116,17 +113,17 @@ var _ = Describe("conns reaper", func() {
|
||||||
|
|
||||||
// add fresh connections
|
// add fresh connections
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
conns = append(conns, cn)
|
conns = append(conns, cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, cn := range conns {
|
for _, cn := range conns {
|
||||||
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
|
connPool.Put(cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
Expect(connPool.Len()).To(Equal(6))
|
Expect(connPool.Len()).To(Equal(6))
|
||||||
Expect(connPool.FreeLen()).To(Equal(6))
|
Expect(connPool.IdleLen()).To(Equal(6))
|
||||||
|
|
||||||
n, err := connPool.ReapStaleConns()
|
n, err := connPool.ReapStaleConns()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -136,14 +133,14 @@ var _ = Describe("conns reaper", func() {
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
_ = connPool.Close()
|
_ = connPool.Close()
|
||||||
Expect(connPool.Len()).To(Equal(0))
|
Expect(connPool.Len()).To(Equal(0))
|
||||||
Expect(connPool.FreeLen()).To(Equal(0))
|
Expect(connPool.IdleLen()).To(Equal(0))
|
||||||
Expect(len(closedConns)).To(Equal(len(conns)))
|
Expect(len(closedConns)).To(Equal(len(conns)))
|
||||||
Expect(closedConns).To(ConsistOf(conns))
|
Expect(closedConns).To(ConsistOf(conns))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("reaps stale connections", func() {
|
It("reaps stale connections", func() {
|
||||||
Expect(connPool.Len()).To(Equal(3))
|
Expect(connPool.Len()).To(Equal(3))
|
||||||
Expect(connPool.FreeLen()).To(Equal(3))
|
Expect(connPool.IdleLen()).To(Equal(3))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("does not reap fresh connections", func() {
|
It("does not reap fresh connections", func() {
|
||||||
|
@ -161,36 +158,34 @@ var _ = Describe("conns reaper", func() {
|
||||||
for j := 0; j < 3; j++ {
|
for j := 0; j < 3; j++ {
|
||||||
var freeCns []*pool.Conn
|
var freeCns []*pool.Conn
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cn).NotTo(BeNil())
|
Expect(cn).NotTo(BeNil())
|
||||||
freeCns = append(freeCns, cn)
|
freeCns = append(freeCns, cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
Expect(connPool.Len()).To(Equal(3))
|
Expect(connPool.Len()).To(Equal(3))
|
||||||
Expect(connPool.FreeLen()).To(Equal(0))
|
Expect(connPool.IdleLen()).To(Equal(0))
|
||||||
|
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cn).NotTo(BeNil())
|
Expect(cn).NotTo(BeNil())
|
||||||
conns = append(conns, cn)
|
conns = append(conns, cn)
|
||||||
|
|
||||||
Expect(connPool.Len()).To(Equal(4))
|
Expect(connPool.Len()).To(Equal(4))
|
||||||
Expect(connPool.FreeLen()).To(Equal(0))
|
Expect(connPool.IdleLen()).To(Equal(0))
|
||||||
|
|
||||||
err = connPool.Remove(cn)
|
connPool.Remove(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
Expect(connPool.Len()).To(Equal(3))
|
Expect(connPool.Len()).To(Equal(3))
|
||||||
Expect(connPool.FreeLen()).To(Equal(0))
|
Expect(connPool.IdleLen()).To(Equal(0))
|
||||||
|
|
||||||
for _, cn := range freeCns {
|
for _, cn := range freeCns {
|
||||||
err := connPool.Put(cn)
|
connPool.Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Expect(connPool.Len()).To(Equal(3))
|
Expect(connPool.Len()).To(Equal(3))
|
||||||
Expect(connPool.FreeLen()).To(Equal(3))
|
Expect(connPool.IdleLen()).To(Equal(3))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -222,18 +217,18 @@ var _ = Describe("race", func() {
|
||||||
|
|
||||||
perform(C, func(id int) {
|
perform(C, func(id int) {
|
||||||
for i := 0; i < N; i++ {
|
for i := 0; i < N; i++ {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
|
connPool.Put(cn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, func(id int) {
|
}, func(id int) {
|
||||||
for i := 0; i < N; i++ {
|
for i := 0; i < N; i++ {
|
||||||
cn, _, err := connPool.Get()
|
cn, err := connPool.Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
Expect(connPool.Remove(cn)).NotTo(HaveOccurred())
|
connPool.Remove(cn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -68,8 +68,7 @@ type Options struct {
|
||||||
// Default is 5 minutes.
|
// Default is 5 minutes.
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
// Frequency of idle checks.
|
// Frequency of idle checks.
|
||||||
// Default is 1 minute.
|
// Default is 1 minute. -1 disables idle check.
|
||||||
// When minus value is set, then idle check is disabled.
|
|
||||||
IdleCheckFrequency time.Duration
|
IdleCheckFrequency time.Duration
|
||||||
|
|
||||||
// Enables read only queries on slave nodes.
|
// Enables read only queries on slave nodes.
|
||||||
|
|
21
pool_test.go
21
pool_test.go
|
@ -30,8 +30,8 @@ var _ = Describe("pool", func() {
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(BeNumerically("<=", 10))
|
Expect(pool.Len()).To(BeNumerically("<=", 10))
|
||||||
Expect(pool.FreeLen()).To(BeNumerically("<=", 10))
|
Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
|
||||||
Expect(pool.Len()).To(Equal(pool.FreeLen()))
|
Expect(pool.Len()).To(Equal(pool.IdleLen()))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("respects max size on multi", func() {
|
It("respects max size on multi", func() {
|
||||||
|
@ -55,8 +55,8 @@ var _ = Describe("pool", func() {
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(BeNumerically("<=", 10))
|
Expect(pool.Len()).To(BeNumerically("<=", 10))
|
||||||
Expect(pool.FreeLen()).To(BeNumerically("<=", 10))
|
Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
|
||||||
Expect(pool.Len()).To(Equal(pool.FreeLen()))
|
Expect(pool.Len()).To(Equal(pool.IdleLen()))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("respects max size on pipelines", func() {
|
It("respects max size on pipelines", func() {
|
||||||
|
@ -73,15 +73,15 @@ var _ = Describe("pool", func() {
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(BeNumerically("<=", 10))
|
Expect(pool.Len()).To(BeNumerically("<=", 10))
|
||||||
Expect(pool.FreeLen()).To(BeNumerically("<=", 10))
|
Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
|
||||||
Expect(pool.Len()).To(Equal(pool.FreeLen()))
|
Expect(pool.Len()).To(Equal(pool.IdleLen()))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("removes broken connections", func() {
|
It("removes broken connections", func() {
|
||||||
cn, _, err := client.Pool().Get()
|
cn, err := client.Pool().Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
cn.SetNetConn(&badConn{})
|
cn.SetNetConn(&badConn{})
|
||||||
Expect(client.Pool().Put(cn)).NotTo(HaveOccurred())
|
client.Pool().Put(cn)
|
||||||
|
|
||||||
err = client.Ping().Err()
|
err = client.Ping().Err()
|
||||||
Expect(err).To(MatchError("bad connection"))
|
Expect(err).To(MatchError("bad connection"))
|
||||||
|
@ -92,7 +92,7 @@ var _ = Describe("pool", func() {
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(Equal(1))
|
Expect(pool.Len()).To(Equal(1))
|
||||||
Expect(pool.FreeLen()).To(Equal(1))
|
Expect(pool.IdleLen()).To(Equal(1))
|
||||||
|
|
||||||
stats := pool.Stats()
|
stats := pool.Stats()
|
||||||
Expect(stats.Hits).To(Equal(uint32(2)))
|
Expect(stats.Hits).To(Equal(uint32(2)))
|
||||||
|
@ -109,7 +109,7 @@ var _ = Describe("pool", func() {
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(Equal(1))
|
Expect(pool.Len()).To(Equal(1))
|
||||||
Expect(pool.FreeLen()).To(Equal(1))
|
Expect(pool.IdleLen()).To(Equal(1))
|
||||||
|
|
||||||
stats := pool.Stats()
|
stats := pool.Stats()
|
||||||
Expect(stats.Hits).To(Equal(uint32(100)))
|
Expect(stats.Hits).To(Equal(uint32(100)))
|
||||||
|
@ -125,6 +125,7 @@ var _ = Describe("pool", func() {
|
||||||
Timeouts: 0,
|
Timeouts: 0,
|
||||||
TotalConns: 1,
|
TotalConns: 1,
|
||||||
FreeConns: 1,
|
FreeConns: 1,
|
||||||
|
IdleConns: 1,
|
||||||
StaleConns: 0,
|
StaleConns: 0,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
27
redis.go
27
redis.go
|
@ -60,29 +60,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
|
||||||
return cn, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) getConn() (*pool.Conn, bool, error) {
|
func (c *baseClient) getConn() (*pool.Conn, error) {
|
||||||
cn, isNew, err := c.connPool.Get()
|
cn, err := c.connPool.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cn.Inited {
|
if !cn.Inited {
|
||||||
if err := c.initConn(cn); err != nil {
|
err := c.initConn(cn)
|
||||||
_ = c.connPool.Remove(cn)
|
if err != nil {
|
||||||
return nil, false, err
|
c.connPool.Remove(cn)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return cn, isNew, nil
|
return cn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
|
func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
|
||||||
if internal.IsBadConn(err, false) {
|
if internal.IsBadConn(err, false) {
|
||||||
_ = c.connPool.Remove(cn)
|
c.connPool.Remove(cn)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = c.connPool.Put(cn)
|
c.connPool.Put(cn)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +138,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
|
||||||
time.Sleep(c.retryBackoff(attempt))
|
time.Sleep(c.retryBackoff(attempt))
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, _, err := c.getConn()
|
cn, err := c.getConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.setErr(err)
|
cmd.setErr(err)
|
||||||
if internal.IsRetryableError(err, true) {
|
if internal.IsRetryableError(err, true) {
|
||||||
|
@ -225,7 +226,7 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
|
||||||
time.Sleep(c.retryBackoff(attempt))
|
time.Sleep(c.retryBackoff(attempt))
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, _, err := c.getConn()
|
cn, err := c.getConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
return err
|
return err
|
||||||
|
@ -234,10 +235,10 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
|
||||||
canRetry, err := p(cn, cmds)
|
canRetry, err := p(cn, cmds)
|
||||||
|
|
||||||
if err == nil || internal.IsRedisError(err) {
|
if err == nil || internal.IsRedisError(err) {
|
||||||
_ = c.connPool.Put(cn)
|
c.connPool.Put(cn)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
_ = c.connPool.Remove(cn)
|
c.connPool.Remove(cn)
|
||||||
|
|
||||||
if !canRetry || !internal.IsRetryableError(err, true) {
|
if !canRetry || !internal.IsRetryableError(err, true) {
|
||||||
break
|
break
|
||||||
|
|
|
@ -145,12 +145,11 @@ var _ = Describe("Client", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Put bad connection in the pool.
|
// Put bad connection in the pool.
|
||||||
cn, _, err := client.Pool().Get()
|
cn, err := client.Pool().Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
cn.SetNetConn(&badConn{})
|
cn.SetNetConn(&badConn{})
|
||||||
err = client.Pool().Put(cn)
|
client.Pool().Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
err = client.Ping().Err()
|
err = client.Ping().Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -184,19 +183,18 @@ var _ = Describe("Client", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should update conn.UsedAt on read/write", func() {
|
It("should update conn.UsedAt on read/write", func() {
|
||||||
cn, _, err := client.Pool().Get()
|
cn, err := client.Pool().Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cn.UsedAt).NotTo(BeZero())
|
Expect(cn.UsedAt).NotTo(BeZero())
|
||||||
createdAt := cn.UsedAt()
|
createdAt := cn.UsedAt()
|
||||||
|
|
||||||
err = client.Pool().Put(cn)
|
client.Pool().Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
|
Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
|
||||||
|
|
||||||
err = client.Ping().Err()
|
err = client.Ping().Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
cn, _, err = client.Pool().Get()
|
cn, err = client.Pool().Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cn).NotTo(BeNil())
|
Expect(cn).NotTo(BeNil())
|
||||||
Expect(cn.UsedAt().After(createdAt)).To(BeTrue())
|
Expect(cn.UsedAt().After(createdAt)).To(BeTrue())
|
||||||
|
|
6
ring.go
6
ring.go
|
@ -525,7 +525,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
cn, _, err := shard.Client.getConn()
|
cn, err := shard.Client.getConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
setCmdsErr(cmds, err)
|
setCmdsErr(cmds, err)
|
||||||
continue
|
continue
|
||||||
|
@ -533,10 +533,10 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
|
||||||
|
|
||||||
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
|
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
|
||||||
if err == nil || internal.IsRedisError(err) {
|
if err == nil || internal.IsRedisError(err) {
|
||||||
_ = shard.Client.connPool.Put(cn)
|
shard.Client.connPool.Put(cn)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_ = shard.Client.connPool.Remove(cn)
|
shard.Client.connPool.Remove(cn)
|
||||||
|
|
||||||
if canRetry && internal.IsRetryableError(err, true) {
|
if canRetry && internal.IsRetryableError(err, true) {
|
||||||
if failedCmdsMap == nil {
|
if failedCmdsMap == nil {
|
||||||
|
|
|
@ -124,12 +124,11 @@ var _ = Describe("Tx", func() {
|
||||||
|
|
||||||
It("should recover from bad connection", func() {
|
It("should recover from bad connection", func() {
|
||||||
// Put bad connection in the pool.
|
// Put bad connection in the pool.
|
||||||
cn, _, err := client.Pool().Get()
|
cn, err := client.Pool().Get()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
cn.SetNetConn(&badConn{})
|
cn.SetNetConn(&badConn{})
|
||||||
err = client.Pool().Put(cn)
|
client.Pool().Put(cn)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
do := func() error {
|
do := func() error {
|
||||||
err := client.Watch(func(tx *redis.Tx) error {
|
err := client.Watch(func(tx *redis.Tx) error {
|
||||||
|
|
Loading…
Reference in New Issue