mirror of https://github.com/go-redis/redis.git
Compare commits
4 Commits
207300ccfa
...
4f739de1e2
Author | SHA1 | Date |
---|---|---|
LINKIWI | 4f739de1e2 | |
Justin | f1ffb55c9a | |
LINKIWI | 080e051124 | |
Kevin Lin | 49c7902daf |
|
@ -33,6 +33,7 @@ func BenchmarkPoolGetPut(b *testing.B) {
|
|||
Dialer: dummyDialer,
|
||||
PoolSize: bm.poolSize,
|
||||
PoolTimeout: time.Second,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ConnMaxIdleTime: time.Hour,
|
||||
})
|
||||
|
||||
|
@ -76,6 +77,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
|
|||
Dialer: dummyDialer,
|
||||
PoolSize: bm.poolSize,
|
||||
PoolTimeout: time.Second,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ConnMaxIdleTime: time.Hour,
|
||||
})
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ type Options struct {
|
|||
|
||||
PoolFIFO bool
|
||||
PoolSize int
|
||||
DialTimeout time.Duration
|
||||
PoolTimeout time.Duration
|
||||
MinIdleConns int
|
||||
MaxIdleConns int
|
||||
|
@ -140,7 +141,10 @@ func (p *ConnPool) checkMinIdleConns() {
|
|||
}
|
||||
|
||||
func (p *ConnPool) addIdleConn() error {
|
||||
cn, err := p.dialConn(context.TODO(), true)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
|
||||
defer cancel()
|
||||
|
||||
cn, err := p.dialConn(ctx, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -230,15 +234,19 @@ func (p *ConnPool) tryDial() {
|
|||
return
|
||||
}
|
||||
|
||||
conn, err := p.cfg.Dialer(context.Background())
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
|
||||
|
||||
conn, err := p.cfg.Dialer(ctx)
|
||||
if err != nil {
|
||||
p.setLastDialError(err)
|
||||
time.Sleep(time.Second)
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&p.dialErrorsNum, 0)
|
||||
_ = conn.Close()
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ var _ = Describe("ConnPool", func() {
|
|||
Dialer: dummyDialer,
|
||||
PoolSize: 10,
|
||||
PoolTimeout: time.Hour,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ConnMaxIdleTime: time.Millisecond,
|
||||
})
|
||||
})
|
||||
|
@ -46,6 +47,7 @@ var _ = Describe("ConnPool", func() {
|
|||
},
|
||||
PoolSize: 10,
|
||||
PoolTimeout: time.Hour,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ConnMaxIdleTime: time.Millisecond,
|
||||
MinIdleConns: minIdleConns,
|
||||
})
|
||||
|
@ -129,6 +131,7 @@ var _ = Describe("MinIdleConns", func() {
|
|||
PoolSize: poolSize,
|
||||
MinIdleConns: minIdleConns,
|
||||
PoolTimeout: 100 * time.Millisecond,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ConnMaxIdleTime: -1,
|
||||
})
|
||||
Eventually(func() int {
|
||||
|
@ -306,6 +309,7 @@ var _ = Describe("race", func() {
|
|||
Dialer: dummyDialer,
|
||||
PoolSize: 10,
|
||||
PoolTimeout: time.Minute,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ConnMaxIdleTime: time.Millisecond,
|
||||
})
|
||||
|
||||
|
@ -336,6 +340,7 @@ var _ = Describe("race", func() {
|
|||
PoolSize: 1000,
|
||||
MinIdleConns: 50,
|
||||
PoolTimeout: 3 * time.Second,
|
||||
DialTimeout: 1 * time.Second,
|
||||
}
|
||||
p := pool.NewConnPool(opt)
|
||||
|
||||
|
|
|
@ -518,6 +518,7 @@ func newConnPool(
|
|||
PoolFIFO: opt.PoolFIFO,
|
||||
PoolSize: opt.PoolSize,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
DialTimeout: opt.DialTimeout,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxIdleConns: opt.MaxIdleConns,
|
||||
MaxActiveConns: opt.MaxActiveConns,
|
||||
|
|
|
@ -21,6 +21,10 @@ import (
|
|||
"github.com/redis/go-redis/v9/internal/rand"
|
||||
)
|
||||
|
||||
const (
|
||||
minLatencyMeasurementInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||
|
||||
// ClusterOptions are used to configure a cluster client and should be
|
||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
|||
latency uint32 // atomic
|
||||
generation uint32 // atomic
|
||||
failing uint32 // atomic
|
||||
|
||||
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||
// from epoch
|
||||
lastLatencyMeasurement int64 // atomic
|
||||
}
|
||||
|
||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
|||
latency = float64(dur) / float64(successes)
|
||||
}
|
||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||
n.SetLastLatencyMeasurement(time.Now())
|
||||
}
|
||||
|
||||
func (n *clusterNode) Latency() time.Duration {
|
||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
|||
return atomic.LoadUint32(&n.generation)
|
||||
}
|
||||
|
||||
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||
}
|
||||
|
||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||
for {
|
||||
v := atomic.LoadUint32(&n.generation)
|
||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||
for {
|
||||
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type clusterNodes struct {
|
||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
|||
c.mu.Lock()
|
||||
|
||||
c.activeAddrs = c.activeAddrs[:0]
|
||||
now := time.Now()
|
||||
for addr, node := range c.nodes {
|
||||
if node.Generation() >= generation {
|
||||
c.activeAddrs = append(c.activeAddrs, addr)
|
||||
if c.opt.RouteByLatency {
|
||||
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||
go node.updateLatency()
|
||||
}
|
||||
continue
|
||||
|
|
2
redis.go
2
redis.go
|
@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
|
|||
}
|
||||
|
||||
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
hs.hooksMu.Lock()
|
||||
defer hs.hooksMu.Unlock()
|
||||
return hs.current.dial(ctx, network, addr)
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
|
|||
}))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("Dialer connection timeouts", func() {
|
||||
var client *redis.Client
|
||||
|
||||
const dialSimulatedDelay = 1 * time.Second
|
||||
|
||||
BeforeEach(func() {
|
||||
options := redisOptions()
|
||||
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
// Simulated slow dialer.
|
||||
// Note that the following sleep is deliberately not context-aware.
|
||||
time.Sleep(dialSimulatedDelay)
|
||||
return net.Dial("tcp", options.Addr)
|
||||
}
|
||||
options.MinIdleConns = 1
|
||||
client = redis.NewClient(options)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
err := client.Close()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("does not contend on connection dial for concurrent commands", func() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
const concurrency = 10
|
||||
|
||||
durations := make(chan time.Duration, concurrency)
|
||||
errs := make(chan error, concurrency)
|
||||
|
||||
start := time.Now()
|
||||
wg.Add(concurrency)
|
||||
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
start := time.Now()
|
||||
err := client.Ping(ctx).Err()
|
||||
durations <- time.Since(start)
|
||||
errs <- err
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(durations)
|
||||
close(errs)
|
||||
|
||||
// All commands should eventually succeed, after acquiring a connection.
|
||||
for err := range errs {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Each individual command should complete within the simulated dial duration bound.
|
||||
for duration := range durations {
|
||||
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||
}
|
||||
|
||||
// Due to concurrent execution, the entire test suite should also complete within
|
||||
// the same dial duration bound applied for individual commands.
|
||||
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue