Compare commits

...

5 Commits

Author SHA1 Message Date
Oleglacto 596c06ca98
Merge db2321cb57 into e63669e170 2024-11-22 00:17:39 -05:00
dependabot[bot] e63669e170
chore(deps): bump rojopolis/spellcheck-github-actions (#3188)
Bumps [rojopolis/spellcheck-github-actions](https://github.com/rojopolis/spellcheck-github-actions) from 0.40.0 to 0.45.0.
- [Release notes](https://github.com/rojopolis/spellcheck-github-actions/releases)
- [Changelog](https://github.com/rojopolis/spellcheck-github-actions/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rojopolis/spellcheck-github-actions/compare/0.40.0...0.45.0)

---
updated-dependencies:
- dependency-name: rojopolis/spellcheck-github-actions
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:38 +02:00
LINKIWI fc32d0a01d
Recognize byte slice for key argument in cluster client hash slot computation (#3049)
Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:11 +02:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
Oleg Laktyushkin db2321cb57 added `Do` method for raw query by single conn from `pool.Conn()` 2024-11-04 18:08:12 +03:00
5 changed files with 59 additions and 2 deletions

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.40.0
uses: rojopolis/spellcheck-github-actions@0.45.0
with:
config_path: .github/spellcheck-settings.yml
task_name: Markdown

View File

@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
switch v := arg.(type) {
case string:
return v
case []byte:
return string(v)
default:
// TODO: consider using appendArg
return fmt.Sprint(v)

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand"
)
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes)
}
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
}
func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation)
}
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) {
for {
v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
}
}
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------
type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes {
if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency {
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency()
}
continue

View File

@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("determines hash slots correctly for generic commands", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
err := client.Do(ctx, "GET", "A").Err()
Expect(err).To(Equal(redis.Nil))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(Equal(redis.Nil))
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err = client.Do(ctx, "GET", "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("follows node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions()

View File

@ -228,6 +228,12 @@ type UniversalClient interface {
PoolStats() *PoolStats
}
func (c cmdable) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args)
_ = c(ctx, cmd)
return cmd
}
var (
_ UniversalClient = (*Client)(nil)
_ UniversalClient = (*ClusterClient)(nil)