perf: retry dial with channel and ticker

This commit is contained in:
pinkman.long 2021-12-13 11:42:13 +08:00
parent e9a8bb4f86
commit 82f413871d
3 changed files with 67 additions and 26 deletions

View File

@ -4,13 +4,12 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"github.com/go-redis/redis/v8"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/go-redis/redis/v8"
) )
func benchmarkRedisClient(ctx context.Context, poolSize int) *redis.Client { func benchmarkRedisClient(ctx context.Context, poolSize int) *redis.Client {

View File

@ -36,6 +36,7 @@ type Stats struct {
TotalConns uint32 // number of total connections in the pool TotalConns uint32 // number of total connections in the pool
IdleConns uint32 // number of idle connections in the pool IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool StaleConns uint32 // number of stale connections removed from the pool
DialErrNum uint32 // number of errors occurred dialing connections
} }
type Pooler interface { type Pooler interface {
@ -57,13 +58,14 @@ type Options struct {
Dialer func(context.Context) (net.Conn, error) Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error OnClose func(*Conn) error
PoolFIFO bool PoolFIFO bool
PoolSize int PoolSize int
MinIdleConns int MinIdleConns int
MaxConnAge time.Duration MaxConnAge time.Duration
PoolTimeout time.Duration PoolTimeout time.Duration
IdleTimeout time.Duration IdleTimeout time.Duration
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
DialRecoveryCheckFrequency time.Duration
} }
type lastDialErrorWrap struct { type lastDialErrorWrap struct {
@ -77,7 +79,8 @@ type ConnPool struct {
lastDialError atomic.Value lastDialError atomic.Value
queue chan struct{} queue chan struct{}
dialRecoveryQueue chan struct{}
connsMu sync.Mutex connsMu sync.Mutex
conns []*Conn conns []*Conn
@ -97,16 +100,17 @@ func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{ p := &ConnPool{
opt: opt, opt: opt,
queue: make(chan struct{}, opt.PoolSize), queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize), dialRecoveryQueue: make(chan struct{}, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize),
closedCh: make(chan struct{}), idleConns: make([]*Conn, 0, opt.PoolSize),
closedCh: make(chan struct{}),
} }
p.connsMu.Lock() p.connsMu.Lock()
p.checkMinIdleConns() p.checkMinIdleConns()
p.connsMu.Unlock() p.connsMu.Unlock()
p.dialRecovery()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency) go p.reaper(opt.IdleCheckFrequency)
} }
@ -186,6 +190,35 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
return cn, nil return cn, nil
} }
func (p *ConnPool) dialRecovery() {
frequency := time.Second * 1
if p.opt.DialRecoveryCheckFrequency > 0 {
frequency = p.opt.DialRecoveryCheckFrequency
}
ticker := time.NewTicker(frequency)
defer ticker.Stop()
go func() {
for {
select {
case <-p.dialRecoveryQueue:
if p.isDialCircuitBreakerOpened() {
p.tryDial()
}
case <-ticker.C:
if p.isDialCircuitBreakerOpened() {
p.tryDial()
}
case <-p.closedCh:
return
}
}
}()
}
func (p *ConnPool) isDialCircuitBreakerOpened() bool {
return atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize)
}
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
if p.closed() { if p.closed() {
return nil, ErrClosed return nil, ErrClosed
@ -198,10 +231,13 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
netConn, err := p.opt.Dialer(ctx) netConn, err := p.opt.Dialer(ctx)
if err != nil { if err != nil {
p.setLastDialError(err) p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { atomic.AddUint32(&p.dialErrorsNum, 1)
go p.tryDial() select {
case p.dialRecoveryQueue <- struct{}{}:
return nil, err
default:
return nil, err
} }
return nil, err
} }
cn := NewConn(netConn) cn := NewConn(netConn)
@ -433,6 +469,7 @@ func (p *ConnPool) Stats() *Stats {
TotalConns: uint32(p.Len()), TotalConns: uint32(p.Len()),
IdleConns: uint32(idleLen), IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns), StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
DialErrNum: atomic.LoadUint32(&p.dialErrorsNum),
} }
} }

View File

@ -104,6 +104,8 @@ type Options struct {
// if IdleTimeout is set. // if IdleTimeout is set.
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
DialRecoveryCheckFrequency time.Duration
// Enables read only queries on slave nodes. // Enables read only queries on slave nodes.
readOnly bool readOnly bool
@ -164,7 +166,9 @@ func (opt *Options) init() {
if opt.IdleCheckFrequency == 0 { if opt.IdleCheckFrequency == 0 {
opt.IdleCheckFrequency = time.Minute opt.IdleCheckFrequency = time.Minute
} }
if opt.DialRecoveryCheckFrequency == 0 {
opt.DialRecoveryCheckFrequency = time.Second
}
if opt.MaxRetries == -1 { if opt.MaxRetries == -1 {
opt.MaxRetries = 0 opt.MaxRetries = 0
} else if opt.MaxRetries == 0 { } else if opt.MaxRetries == 0 {
@ -418,12 +422,13 @@ func newConnPool(opt *Options) *pool.ConnPool {
Dialer: func(ctx context.Context) (net.Conn, error) { Dialer: func(ctx context.Context) (net.Conn, error) {
return opt.Dialer(ctx, opt.Network, opt.Addr) return opt.Dialer(ctx, opt.Network, opt.Addr)
}, },
PoolFIFO: opt.PoolFIFO, PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns, MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge, MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout, PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout, IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency, IdleCheckFrequency: opt.IdleCheckFrequency,
DialRecoveryCheckFrequency: opt.DialRecoveryCheckFrequency,
}) })
} }