Compare commits

...

4 Commits

Author SHA1 Message Date
LINKIWI 4f739de1e2
Merge 49c7902daf into f1ffb55c9a 2024-11-20 20:00:36 -05:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
LINKIWI 080e051124
Eliminate redundant dial mutex causing unbounded connection queue contention (#3088)
* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 13:38:06 +02:00
Kevin Lin 49c7902daf
Bound connection pool background dials to configured dial timeout 2024-08-10 14:41:15 -07:00
7 changed files with 107 additions and 5 deletions

View File

@ -33,6 +33,7 @@ func BenchmarkPoolGetPut(b *testing.B) {
Dialer: dummyDialer,
PoolSize: bm.poolSize,
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
})
@ -76,6 +77,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
Dialer: dummyDialer,
PoolSize: bm.poolSize,
PoolTimeout: time.Second,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Hour,
})

View File

@ -62,6 +62,7 @@ type Options struct {
PoolFIFO bool
PoolSize int
DialTimeout time.Duration
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
@ -140,7 +141,10 @@ func (p *ConnPool) checkMinIdleConns() {
}
func (p *ConnPool) addIdleConn() error {
cn, err := p.dialConn(context.TODO(), true)
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
defer cancel()
cn, err := p.dialConn(ctx, true)
if err != nil {
return err
}
@ -230,15 +234,19 @@ func (p *ConnPool) tryDial() {
return
}
conn, err := p.cfg.Dialer(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
conn, err := p.cfg.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
cancel()
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Close()
cancel()
return
}
}

View File

@ -22,6 +22,7 @@ var _ = Describe("ConnPool", func() {
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Hour,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Millisecond,
})
})
@ -46,6 +47,7 @@ var _ = Describe("ConnPool", func() {
},
PoolSize: 10,
PoolTimeout: time.Hour,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Millisecond,
MinIdleConns: minIdleConns,
})
@ -129,6 +131,7 @@ var _ = Describe("MinIdleConns", func() {
PoolSize: poolSize,
MinIdleConns: minIdleConns,
PoolTimeout: 100 * time.Millisecond,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: -1,
})
Eventually(func() int {
@ -306,6 +309,7 @@ var _ = Describe("race", func() {
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Minute,
DialTimeout: 1 * time.Second,
ConnMaxIdleTime: time.Millisecond,
})
@ -336,6 +340,7 @@ var _ = Describe("race", func() {
PoolSize: 1000,
MinIdleConns: 50,
PoolTimeout: 3 * time.Second,
DialTimeout: 1 * time.Second,
}
p := pool.NewConnPool(opt)

View File

@ -518,6 +518,7 @@ func newConnPool(
PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
DialTimeout: opt.DialTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
MaxActiveConns: opt.MaxActiveConns,

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand"
)
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes)
}
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
}
func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation)
}
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) {
for {
v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
}
}
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------
type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes {
if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency {
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency()
}
continue

View File

@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
}
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr)
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"testing"
"time"
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
}))
})
})
var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client
const dialSimulatedDelay = 1 * time.Second
BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup
const concurrency = 10
durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)
start := time.Now()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}
wg.Wait()
close(durations)
close(errs)
// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}
// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}
// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})