Compare commits

...

6 Commits

Author SHA1 Message Date
Andreas Bergmeier a2d86d85a0
Merge 69bab38007 into e63669e170 2024-11-22 00:17:39 -05:00
dependabot[bot] e63669e170
chore(deps): bump rojopolis/spellcheck-github-actions (#3188)
Bumps [rojopolis/spellcheck-github-actions](https://github.com/rojopolis/spellcheck-github-actions) from 0.40.0 to 0.45.0.
- [Release notes](https://github.com/rojopolis/spellcheck-github-actions/releases)
- [Changelog](https://github.com/rojopolis/spellcheck-github-actions/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rojopolis/spellcheck-github-actions/compare/0.40.0...0.45.0)

---
updated-dependencies:
- dependency-name: rojopolis/spellcheck-github-actions
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:38 +02:00
LINKIWI fc32d0a01d
Recognize byte slice for key argument in cluster client hash slot computation (#3049)
Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:11 +02:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
LINKIWI 080e051124
Eliminate redundant dial mutex causing unbounded connection queue contention (#3088)
* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 13:38:06 +02:00
Andreas Bergmeier 69bab38007 Allow for tracking state in Limiter
Extend Allow and ReportResult functions to handle a Context.
Allow can override the passed in Context.
The returned Context is then further passed down to ReportResult.
Using this Context it is then possible to store values/track state
between Allow and ReportResult calls.

Without this tracking HalfOpen/Generation state is hard to implement
efficiently for Circuit Breakers.
2024-10-10 16:30:30 +02:00
7 changed files with 133 additions and 16 deletions

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Check Spelling - name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.40.0 uses: rojopolis/spellcheck-github-actions@0.45.0
with: with:
config_path: .github/spellcheck-settings.yml config_path: .github/spellcheck-settings.yml
task_name: Markdown task_name: Markdown

View File

@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
switch v := arg.(type) { switch v := arg.(type) {
case string: case string:
return v return v
case []byte:
return string(v)
default: default:
// TODO: consider using appendArg // TODO: consider using appendArg
return fmt.Sprint(v) return fmt.Sprint(v)

View File

@ -21,10 +21,12 @@ type Limiter interface {
// Allow returns nil if operation is allowed or an error otherwise. // Allow returns nil if operation is allowed or an error otherwise.
// If operation is allowed client must ReportResult of the operation // If operation is allowed client must ReportResult of the operation
// whether it is a success or a failure. // whether it is a success or a failure.
Allow() error // The returned context will be passed to ReportResult.
Allow(ctx context.Context) (context.Context, error)
// ReportResult reports the result of the previously allowed operation. // ReportResult reports the result of the previously allowed operation.
// nil indicates a success, non-nil error usually indicates a failure. // nil indicates a success, non-nil error usually indicates a failure.
ReportResult(result error) // Context can be used to access state tracked by previous Allow call.
ReportResult(ctx context.Context, result error)
} }
// Options keeps the settings to set up redis connection. // Options keeps the settings to set up redis connection.

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand" "github.com/redis/go-redis/v9/internal/rand"
) )
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be // ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic latency uint32 // atomic
generation uint32 // atomic generation uint32 // atomic
failing uint32 // atomic failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
} }
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes) latency = float64(dur) / float64(successes)
} }
atomic.StoreUint32(&n.latency, uint32(latency+0.5)) atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
} }
func (n *clusterNode) Latency() time.Duration { func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation) return atomic.LoadUint32(&n.generation)
} }
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) { func (n *clusterNode) SetGeneration(gen uint32) {
for { for {
v := atomic.LoadUint32(&n.generation) v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
} }
} }
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterNodes struct { type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock() c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0] c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes { for addr, node := range c.nodes {
if node.Generation() >= generation { if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr) c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency { if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency() go node.updateLatency()
} }
continue continue
@ -1319,7 +1342,7 @@ func (c *ClusterClient) processPipelineNode(
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
) { ) {
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { _ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
cn, err := node.Client.getConn(ctx) ctx, cn, err := node.Client.getConn(ctx)
if err != nil { if err != nil {
node.MarkAsFailing() node.MarkAsFailing()
_ = c.mapCmdsByNode(ctx, failedCmds, cmds) _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
@ -1504,7 +1527,7 @@ func (c *ClusterClient) processTxPipelineNode(
) { ) {
cmds = wrapMultiExec(ctx, cmds) cmds = wrapMultiExec(ctx, cmds)
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { _ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
cn, err := node.Client.getConn(ctx) ctx, cn, err := node.Client.getConn(ctx)
if err != nil { if err != nil {
_ = c.mapCmdsByNode(ctx, failedCmds, cmds) _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
setCmdsErr(cmds, err) setCmdsErr(cmds, err)

View File

@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
It("determines hash slots correctly for generic commands", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
err := client.Do(ctx, "GET", "A").Err()
Expect(err).To(Equal(redis.Nil))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(Equal(redis.Nil))
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err = client.Do(ctx, "GET", "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("follows node redirection immediately", func() { It("follows node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection // Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions() opt := redisClusterOptions()

View File

@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
} }
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) { func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr) return hs.current.dial(ctx, network, addr)
} }
@ -237,23 +235,24 @@ func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
return cn, nil return cn, nil
} }
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) { func (c *baseClient) getConn(ctx context.Context) (context.Context, *pool.Conn, error) {
var err error
if c.opt.Limiter != nil { if c.opt.Limiter != nil {
err := c.opt.Limiter.Allow() ctx, err = c.opt.Limiter.Allow(ctx)
if err != nil { if err != nil {
return nil, err return ctx, nil, err
} }
} }
cn, err := c._getConn(ctx) cn, err := c._getConn(ctx)
if err != nil { if err != nil {
if c.opt.Limiter != nil { if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err) c.opt.Limiter.ReportResult(ctx, err)
} }
return nil, err return ctx, nil, err
} }
return cn, nil return ctx, cn, nil
} }
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
@ -365,7 +364,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) { func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
if c.opt.Limiter != nil { if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err) c.opt.Limiter.ReportResult(ctx, err)
} }
if isBadConn(err, false, c.opt.Addr) { if isBadConn(err, false, c.opt.Addr) {
@ -378,7 +377,7 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
func (c *baseClient) withConn( func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error, ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error { ) error {
cn, err := c.getConn(ctx) ctx, cn, err := c.getConn(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
})) }))
}) })
}) })
var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client
const dialSimulatedDelay = 1 * time.Second
BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup
const concurrency = 10
durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)
start := time.Now()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}
wg.Wait()
close(durations)
close(errs)
// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}
// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}
// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})