From 82f413871d4a23c11b844f19a593ab014b68c8cb Mon Sep 17 00:00:00 2001 From: "pinkman.long" Date: Mon, 13 Dec 2021 11:42:13 +0800 Subject: [PATCH] perf: retry dial with channel and ticker --- bench_test.go | 3 +- internal/pool/pool.go | 69 +++++++++++++++++++++++++++++++++---------- options.go | 21 ++++++++----- 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/bench_test.go b/bench_test.go index 5644f50c..d9ba7137 100644 --- a/bench_test.go +++ b/bench_test.go @@ -4,13 +4,12 @@ import ( "bytes" "context" "fmt" + "github.com/go-redis/redis/v8" "strconv" "strings" "sync" "testing" "time" - - "github.com/go-redis/redis/v8" ) func benchmarkRedisClient(ctx context.Context, poolSize int) *redis.Client { diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 44a4e779..7d9b1e3f 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -36,6 +36,7 @@ type Stats struct { TotalConns uint32 // number of total connections in the pool IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool + DialErrNum uint32 // number of errors occurred dialing connections } type Pooler interface { @@ -57,13 +58,14 @@ type Options struct { Dialer func(context.Context) (net.Conn, error) OnClose func(*Conn) error - PoolFIFO bool - PoolSize int - MinIdleConns int - MaxConnAge time.Duration - PoolTimeout time.Duration - IdleTimeout time.Duration - IdleCheckFrequency time.Duration + PoolFIFO bool + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration + DialRecoveryCheckFrequency time.Duration } type lastDialErrorWrap struct { @@ -77,7 +79,8 @@ type ConnPool struct { lastDialError atomic.Value - queue chan struct{} + queue chan struct{} + dialRecoveryQueue chan struct{} connsMu sync.Mutex conns []*Conn @@ -97,16 +100,17 @@ func NewConnPool(opt *Options) *ConnPool { p := &ConnPool{ opt: opt, - queue: make(chan struct{}, opt.PoolSize), - conns: make([]*Conn, 0, opt.PoolSize), - idleConns: make([]*Conn, 0, opt.PoolSize), - closedCh: make(chan struct{}), + queue: make(chan struct{}, opt.PoolSize), + dialRecoveryQueue: make(chan struct{}, opt.PoolSize), + conns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), + closedCh: make(chan struct{}), } p.connsMu.Lock() p.checkMinIdleConns() p.connsMu.Unlock() - + p.dialRecovery() if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } @@ -186,6 +190,35 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { return cn, nil } +func (p *ConnPool) dialRecovery() { + frequency := time.Second * 1 + if p.opt.DialRecoveryCheckFrequency > 0 { + frequency = p.opt.DialRecoveryCheckFrequency + } + ticker := time.NewTicker(frequency) + defer ticker.Stop() + go func() { + for { + select { + case <-p.dialRecoveryQueue: + if p.isDialCircuitBreakerOpened() { + p.tryDial() + } + case <-ticker.C: + if p.isDialCircuitBreakerOpened() { + p.tryDial() + } + case <-p.closedCh: + return + } + } + }() +} + +func (p *ConnPool) isDialCircuitBreakerOpened() bool { + return atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) +} + func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed @@ -198,10 +231,13 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { netConn, err := p.opt.Dialer(ctx) if err != nil { p.setLastDialError(err) - if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { - go p.tryDial() + atomic.AddUint32(&p.dialErrorsNum, 1) + select { + case p.dialRecoveryQueue <- struct{}{}: + return nil, err + default: + return nil, err } - return nil, err } cn := NewConn(netConn) @@ -433,6 +469,7 @@ func (p *ConnPool) Stats() *Stats { TotalConns: uint32(p.Len()), IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), + DialErrNum: atomic.LoadUint32(&p.dialErrorsNum), } } diff --git a/options.go b/options.go index a4abe32c..c8c0d008 100644 --- a/options.go +++ b/options.go @@ -104,6 +104,8 @@ type Options struct { // if IdleTimeout is set. IdleCheckFrequency time.Duration + DialRecoveryCheckFrequency time.Duration + // Enables read only queries on slave nodes. readOnly bool @@ -164,7 +166,9 @@ func (opt *Options) init() { if opt.IdleCheckFrequency == 0 { opt.IdleCheckFrequency = time.Minute } - + if opt.DialRecoveryCheckFrequency == 0 { + opt.DialRecoveryCheckFrequency = time.Second + } if opt.MaxRetries == -1 { opt.MaxRetries = 0 } else if opt.MaxRetries == 0 { @@ -418,12 +422,13 @@ func newConnPool(opt *Options) *pool.ConnPool { Dialer: func(ctx context.Context) (net.Conn, error) { return opt.Dialer(ctx, opt.Network, opt.Addr) }, - PoolFIFO: opt.PoolFIFO, - PoolSize: opt.PoolSize, - MinIdleConns: opt.MinIdleConns, - MaxConnAge: opt.MaxConnAge, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - IdleCheckFrequency: opt.IdleCheckFrequency, + PoolFIFO: opt.PoolFIFO, + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, + IdleCheckFrequency: opt.IdleCheckFrequency, + DialRecoveryCheckFrequency: opt.DialRecoveryCheckFrequency, }) }