mirror of https://github.com/go-redis/redis.git
Compare commits
6 Commits
9e095f2bf3
...
af51a55c3f
Author | SHA1 | Date |
---|---|---|
kwen | af51a55c3f | |
dependabot[bot] | e63669e170 | |
LINKIWI | fc32d0a01d | |
Justin | f1ffb55c9a | |
LINKIWI | 080e051124 | |
kwen | 7e63d6519a |
|
@ -8,7 +8,7 @@ jobs:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Check Spelling
|
- name: Check Spelling
|
||||||
uses: rojopolis/spellcheck-github-actions@0.40.0
|
uses: rojopolis/spellcheck-github-actions@0.45.0
|
||||||
with:
|
with:
|
||||||
config_path: .github/spellcheck-settings.yml
|
config_path: .github/spellcheck-settings.yml
|
||||||
task_name: Markdown
|
task_name: Markdown
|
||||||
|
|
|
@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
|
||||||
switch v := arg.(type) {
|
switch v := arg.(type) {
|
||||||
case string:
|
case string:
|
||||||
return v
|
return v
|
||||||
|
case []byte:
|
||||||
|
return string(v)
|
||||||
default:
|
default:
|
||||||
// TODO: consider using appendArg
|
// TODO: consider using appendArg
|
||||||
return fmt.Sprint(v)
|
return fmt.Sprint(v)
|
||||||
|
|
|
@ -21,6 +21,10 @@ import (
|
||||||
"github.com/redis/go-redis/v9/internal/rand"
|
"github.com/redis/go-redis/v9/internal/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minLatencyMeasurementInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||||
|
|
||||||
// ClusterOptions are used to configure a cluster client and should be
|
// ClusterOptions are used to configure a cluster client and should be
|
||||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
||||||
latency uint32 // atomic
|
latency uint32 // atomic
|
||||||
generation uint32 // atomic
|
generation uint32 // atomic
|
||||||
failing uint32 // atomic
|
failing uint32 // atomic
|
||||||
|
|
||||||
|
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||||
|
// from epoch
|
||||||
|
lastLatencyMeasurement int64 // atomic
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
||||||
latency = float64(dur) / float64(successes)
|
latency = float64(dur) / float64(successes)
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||||
|
n.SetLastLatencyMeasurement(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Latency() time.Duration {
|
func (n *clusterNode) Latency() time.Duration {
|
||||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
||||||
return atomic.LoadUint32(&n.generation)
|
return atomic.LoadUint32(&n.generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||||
|
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
for {
|
for {
|
||||||
v := atomic.LoadUint32(&n.generation)
|
v := atomic.LoadUint32(&n.generation)
|
||||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||||
|
for {
|
||||||
|
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type clusterNodes struct {
|
type clusterNodes struct {
|
||||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|
||||||
c.activeAddrs = c.activeAddrs[:0]
|
c.activeAddrs = c.activeAddrs[:0]
|
||||||
|
now := time.Now()
|
||||||
for addr, node := range c.nodes {
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
c.activeAddrs = append(c.activeAddrs, addr)
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
if c.opt.RouteByLatency {
|
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||||
go node.updateLatency()
|
go node.updateLatency()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("determines hash slots correctly for generic commands", func() {
|
||||||
|
opt := redisClusterOptions()
|
||||||
|
opt.MaxRedirects = -1
|
||||||
|
client := cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
err := client.Do(ctx, "GET", "A").Err()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
Eventually(func() error {
|
||||||
|
return client.SwapNodes(ctx, "A")
|
||||||
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.Do(ctx, "GET", "A").Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
It("follows node redirection immediately", func() {
|
It("follows node redirection immediately", func() {
|
||||||
// Configure retry backoffs far in excess of the expected duration of redirection
|
// Configure retry backoffs far in excess of the expected duration of redirection
|
||||||
opt := redisClusterOptions()
|
opt := redisClusterOptions()
|
||||||
|
|
2
redis.go
2
redis.go
|
@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
hs.hooksMu.Lock()
|
|
||||||
defer hs.hooksMu.Unlock()
|
|
||||||
return hs.current.dial(ctx, network, addr)
|
return hs.current.dial(ctx, network, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Dialer connection timeouts", func() {
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
const dialSimulatedDelay = 1 * time.Second
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
options := redisOptions()
|
||||||
|
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
|
// Simulated slow dialer.
|
||||||
|
// Note that the following sleep is deliberately not context-aware.
|
||||||
|
time.Sleep(dialSimulatedDelay)
|
||||||
|
return net.Dial("tcp", options.Addr)
|
||||||
|
}
|
||||||
|
options.MinIdleConns = 1
|
||||||
|
client = redis.NewClient(options)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
err := client.Close()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("does not contend on connection dial for concurrent commands", func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
const concurrency = 10
|
||||||
|
|
||||||
|
durations := make(chan time.Duration, concurrency)
|
||||||
|
errs := make(chan error, concurrency)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
wg.Add(concurrency)
|
||||||
|
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := client.Ping(ctx).Err()
|
||||||
|
durations <- time.Since(start)
|
||||||
|
errs <- err
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(durations)
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
// All commands should eventually succeed, after acquiring a connection.
|
||||||
|
for err := range errs {
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each individual command should complete within the simulated dial duration bound.
|
||||||
|
for duration := range durations {
|
||||||
|
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Due to concurrent execution, the entire test suite should also complete within
|
||||||
|
// the same dial duration bound applied for individual commands.
|
||||||
|
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
54
sentinel.go
54
sentinel.go
|
@ -549,26 +549,50 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var masterAddr string
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var once sync.Once
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
wg.Add(1)
|
||||||
|
go func(i int, addr string) {
|
||||||
|
defer wg.Done()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
default:
|
||||||
|
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
|
||||||
|
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
|
||||||
|
if err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
|
||||||
|
addr, c.opt.MasterName, err)
|
||||||
|
_ = sentinelCli.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
internal.Logger.Printf(ctx, "get addr %s master res %v", addr, masterAddr)
|
||||||
|
once.Do(func() {
|
||||||
|
// Push working sentinel to the top.
|
||||||
|
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
|
||||||
|
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
||||||
|
c.setSentinel(ctx, sentinelCli)
|
||||||
|
close(done)
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
|
|
||||||
if err != nil {
|
|
||||||
_ = sentinel.Close()
|
|
||||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
||||||
return "", err
|
|
||||||
}
|
}
|
||||||
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
|
|
||||||
c.opt.MasterName, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push working sentinel to the top.
|
}(i, sentinelAddr)
|
||||||
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
|
||||||
c.setSentinel(ctx, sentinel)
|
|
||||||
|
|
||||||
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
|
}
|
||||||
return addr, nil
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
once.Do(func() {
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
<-done
|
||||||
|
if masterAddr != "" {
|
||||||
|
return masterAddr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
||||||
|
|
Loading…
Reference in New Issue