mirror of https://github.com/go-redis/redis.git
Compare commits
9 Commits
275db505bd
...
54e8a0368d
Author | SHA1 | Date |
---|---|---|
Marco | 54e8a0368d | |
dependabot[bot] | e63669e170 | |
LINKIWI | fc32d0a01d | |
Justin | f1ffb55c9a | |
LINKIWI | 080e051124 | |
ofekshenawa | 930d904205 | |
ofekshenawa | 8b1073d2d6 | |
ofekshenawa | d1b4eaed41 | |
Marco | 4c6d248722 |
|
@ -54,6 +54,7 @@ stunnel
|
||||||
SynDump
|
SynDump
|
||||||
TCP
|
TCP
|
||||||
TLS
|
TLS
|
||||||
|
UnstableResp
|
||||||
uri
|
uri
|
||||||
URI
|
URI
|
||||||
url
|
url
|
||||||
|
@ -62,3 +63,5 @@ RedisStack
|
||||||
RedisGears
|
RedisGears
|
||||||
RedisTimeseries
|
RedisTimeseries
|
||||||
RediSearch
|
RediSearch
|
||||||
|
RawResult
|
||||||
|
RawVal
|
|
@ -8,7 +8,7 @@ jobs:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Check Spelling
|
- name: Check Spelling
|
||||||
uses: rojopolis/spellcheck-github-actions@0.40.0
|
uses: rojopolis/spellcheck-github-actions@0.45.0
|
||||||
with:
|
with:
|
||||||
config_path: .github/spellcheck-settings.yml
|
config_path: .github/spellcheck-settings.yml
|
||||||
task_name: Markdown
|
task_name: Markdown
|
||||||
|
|
15
README.md
15
README.md
|
@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
|
||||||
#### Unstable RESP3 Structures for RediSearch Commands
|
#### Unstable RESP3 Structures for RediSearch Commands
|
||||||
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
|
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
|
||||||
|
|
||||||
|
To enable unstable RESP3, set the option in your client configuration:
|
||||||
|
|
||||||
|
```go
|
||||||
|
redis.NewClient(&redis.Options{
|
||||||
|
UnstableResp3: true,
|
||||||
|
})
|
||||||
|
```
|
||||||
|
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
|
||||||
|
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
|
||||||
|
|
||||||
|
```go
|
||||||
|
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
|
||||||
|
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
|
||||||
|
```
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!
|
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!
|
||||||
|
|
62
command.go
62
command.go
|
@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
|
||||||
switch v := arg.(type) {
|
switch v := arg.(type) {
|
||||||
case string:
|
case string:
|
||||||
return v
|
return v
|
||||||
|
case []byte:
|
||||||
|
return string(v)
|
||||||
default:
|
default:
|
||||||
// TODO: consider using appendArg
|
// TODO: consider using appendArg
|
||||||
return fmt.Sprint(v)
|
return fmt.Sprint(v)
|
||||||
|
@ -1403,27 +1405,63 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
n, err := rd.ReadMapLen()
|
readType, err := rd.PeekReplyType()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cmd.val = make(map[string][]interface{}, n)
|
|
||||||
for i := 0; i < n; i++ {
|
cmd.val = make(map[string][]interface{})
|
||||||
k, err := rd.ReadString()
|
|
||||||
|
if readType == proto.RespMap {
|
||||||
|
n, err := rd.ReadMapLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
nn, err := rd.ReadArrayLen()
|
for i := 0; i < n; i++ {
|
||||||
if err != nil {
|
k, err := rd.ReadString()
|
||||||
return err
|
|
||||||
}
|
|
||||||
cmd.val[k] = make([]interface{}, nn)
|
|
||||||
for j := 0; j < nn; j++ {
|
|
||||||
value, err := rd.ReadReply()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cmd.val[k][j] = value
|
nn, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[k] = make([]interface{}, nn)
|
||||||
|
for j := 0; j < nn; j++ {
|
||||||
|
value, err := rd.ReadReply()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[k][j] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if readType == proto.RespArray {
|
||||||
|
// RESP2 response
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
// Each entry in this array is itself an array with key details
|
||||||
|
itemLen, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[key] = make([]interface{}, 0, itemLen-1)
|
||||||
|
for j := 1; j < itemLen; j++ {
|
||||||
|
// Read the inner array for timestamp-value pairs
|
||||||
|
data, err := rd.ReadReply()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[key] = append(cmd.val[key], data)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,10 @@ import (
|
||||||
"github.com/redis/go-redis/v9/internal/rand"
|
"github.com/redis/go-redis/v9/internal/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minLatencyMeasurementInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||||
|
|
||||||
// ClusterOptions are used to configure a cluster client and should be
|
// ClusterOptions are used to configure a cluster client and should be
|
||||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
||||||
latency uint32 // atomic
|
latency uint32 // atomic
|
||||||
generation uint32 // atomic
|
generation uint32 // atomic
|
||||||
failing uint32 // atomic
|
failing uint32 // atomic
|
||||||
|
|
||||||
|
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||||
|
// from epoch
|
||||||
|
lastLatencyMeasurement int64 // atomic
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
||||||
latency = float64(dur) / float64(successes)
|
latency = float64(dur) / float64(successes)
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||||
|
n.SetLastLatencyMeasurement(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Latency() time.Duration {
|
func (n *clusterNode) Latency() time.Duration {
|
||||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
||||||
return atomic.LoadUint32(&n.generation)
|
return atomic.LoadUint32(&n.generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||||
|
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
for {
|
for {
|
||||||
v := atomic.LoadUint32(&n.generation)
|
v := atomic.LoadUint32(&n.generation)
|
||||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||||
|
for {
|
||||||
|
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type clusterNodes struct {
|
type clusterNodes struct {
|
||||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|
||||||
c.activeAddrs = c.activeAddrs[:0]
|
c.activeAddrs = c.activeAddrs[:0]
|
||||||
|
now := time.Now()
|
||||||
for addr, node := range c.nodes {
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
c.activeAddrs = append(c.activeAddrs, addr)
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
if c.opt.RouteByLatency {
|
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||||
go node.updateLatency()
|
go node.updateLatency()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("determines hash slots correctly for generic commands", func() {
|
||||||
|
opt := redisClusterOptions()
|
||||||
|
opt.MaxRedirects = -1
|
||||||
|
client := cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
err := client.Do(ctx, "GET", "A").Err()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
Eventually(func() error {
|
||||||
|
return client.SwapNodes(ctx, "A")
|
||||||
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.Do(ctx, "GET", "A").Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
It("follows node redirection immediately", func() {
|
It("follows node redirection immediately", func() {
|
||||||
// Configure retry backoffs far in excess of the expected duration of redirection
|
// Configure retry backoffs far in excess of the expected duration of redirection
|
||||||
opt := redisClusterOptions()
|
opt := redisClusterOptions()
|
||||||
|
|
|
@ -319,37 +319,69 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
|
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
n, err := rd.ReadMapLen()
|
result := BFInfo{}
|
||||||
|
|
||||||
|
// Create a mapping from key names to pointers of struct fields
|
||||||
|
respMapping := map[string]*int64{
|
||||||
|
"Capacity": &result.Capacity,
|
||||||
|
"CAPACITY": &result.Capacity,
|
||||||
|
"Size": &result.Size,
|
||||||
|
"SIZE": &result.Size,
|
||||||
|
"Number of filters": &result.Filters,
|
||||||
|
"FILTERS": &result.Filters,
|
||||||
|
"Number of items inserted": &result.ItemsInserted,
|
||||||
|
"ITEMS": &result.ItemsInserted,
|
||||||
|
"Expansion rate": &result.ExpansionRate,
|
||||||
|
"EXPANSION": &result.ExpansionRate,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to read and assign a value based on the key
|
||||||
|
readAndAssignValue := func(key string) error {
|
||||||
|
fieldPtr, exists := respMapping[key]
|
||||||
|
if !exists {
|
||||||
|
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the integer and assign to the field via pointer dereferencing
|
||||||
|
val, err := rd.ReadInt()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*fieldPtr = val
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
readType, err := rd.PeekReplyType()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var key string
|
if len(cmd.args) > 2 && readType == proto.RespArray {
|
||||||
var result BFInfo
|
n, err := rd.ReadArrayLen()
|
||||||
for f := 0; f < n; f++ {
|
|
||||||
key, err = rd.ReadString()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if key, ok := cmd.args[2].(string); ok && n == 1 {
|
||||||
switch key {
|
if err := readAndAssignValue(key); err != nil {
|
||||||
case "Capacity":
|
return err
|
||||||
result.Capacity, err = rd.ReadInt()
|
}
|
||||||
case "Size":
|
} else {
|
||||||
result.Size, err = rd.ReadInt()
|
return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
|
||||||
case "Number of filters":
|
|
||||||
result.Filters, err = rd.ReadInt()
|
|
||||||
case "Number of items inserted":
|
|
||||||
result.ItemsInserted, err = rd.ReadInt()
|
|
||||||
case "Expansion rate":
|
|
||||||
result.ExpansionRate, err = rd.ReadInt()
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
n, err := rd.ReadMapLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
key, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := readAndAssignValue(key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.val = result
|
cmd.val = result
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
14
pubsub.go
14
pubsub.go
|
@ -409,6 +409,20 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
||||||
return &Pong{
|
return &Pong{
|
||||||
Payload: reply[1].(string),
|
Payload: reply[1].(string),
|
||||||
}, nil
|
}, nil
|
||||||
|
case "invalidate":
|
||||||
|
switch payload := reply[1].(type) {
|
||||||
|
case []interface{}:
|
||||||
|
s := make([]string, len(payload))
|
||||||
|
for idx := range payload {
|
||||||
|
s[idx] = payload[idx].(string)
|
||||||
|
}
|
||||||
|
return &Message{
|
||||||
|
Channel: "invalidate",
|
||||||
|
PayloadSlice: s,
|
||||||
|
}, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("redis: unsupported invalidate message payload: %#v", payload)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -567,4 +569,99 @@ var _ = Describe("PubSub", func() {
|
||||||
Expect(msg.Channel).To(Equal("mychannel"))
|
Expect(msg.Channel).To(Equal("mychannel"))
|
||||||
Expect(msg.Payload).To(Equal(text))
|
Expect(msg.Payload).To(Equal(text))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("supports client-cache invalidation messages", func() {
|
||||||
|
ch := make(chan []string, 2)
|
||||||
|
defer close(ch)
|
||||||
|
client := redis.NewClient(getOptsWithTracking(redisOptions(), func(keys []string) error {
|
||||||
|
ch <- keys
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
v1 := client.Get(context.Background(), "foo")
|
||||||
|
Expect(v1.Val()).To(Equal(""))
|
||||||
|
s1 := client.Set(context.Background(), "foo", "bar", time.Duration(time.Minute))
|
||||||
|
Expect(s1.Val()).To(Equal("OK"))
|
||||||
|
v2 := client.Get(context.Background(), "foo")
|
||||||
|
Expect(v2.Val()).To(Equal("bar"))
|
||||||
|
// sleep a little to allow time for the first invalidation message to come through
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
s2 := client.Set(context.Background(), "foo", "foobar", time.Duration(time.Minute))
|
||||||
|
Expect(s2.Val()).To(Equal("OK"))
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case keys := <-ch:
|
||||||
|
Expect(keys).ToNot(BeEmpty())
|
||||||
|
Expect(keys[0]).To(Equal("foo"))
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
// fail on timeouts
|
||||||
|
Fail("invalidation message wait timed out")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
func getOptsWithTracking(opt *redis.Options, processInvalidKeysFunc func([]string) error) *redis.Options {
|
||||||
|
var mu sync.Mutex
|
||||||
|
invalidateClientID := int64(-1)
|
||||||
|
invalidateOpts := *opt
|
||||||
|
invalidateOpts.OnConnect = func(ctx context.Context, conn *redis.Conn) (err error) {
|
||||||
|
invalidateClientID, err = conn.ClientID(ctx).Result()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
startBackgroundInvalidationSubscription := func(ctx context.Context) int64 {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
|
if invalidateClientID != -1 {
|
||||||
|
return invalidateClientID
|
||||||
|
}
|
||||||
|
|
||||||
|
invalidateClient := redis.NewClient(&invalidateOpts)
|
||||||
|
invalidations := invalidateClient.Subscribe(ctx, "__redis__:invalidate")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
invalidations.Close()
|
||||||
|
invalidateClient.Close()
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
invalidateClientID = -1
|
||||||
|
mu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg, err := invalidations.ReceiveMessage(context.Background())
|
||||||
|
if err == io.EOF || err == context.Canceled {
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
fmt.Printf("warning: subscription on key invalidations aborted: %s\n", err.Error())
|
||||||
|
// send back empty []string to fail the test
|
||||||
|
processInvalidKeysFunc([]string{})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
processInvalidKeysFunc(msg.PayloadSlice)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return invalidateClientID
|
||||||
|
}
|
||||||
|
|
||||||
|
opt.OnConnect = func(ctx context.Context, conn *redis.Conn) error {
|
||||||
|
invalidateClientID := startBackgroundInvalidationSubscription(ctx)
|
||||||
|
return conn.Process(
|
||||||
|
ctx,
|
||||||
|
redis.NewBoolCmd(
|
||||||
|
ctx,
|
||||||
|
"CLIENT", "TRACKING", "on",
|
||||||
|
"REDIRECT", fmt.Sprintf("%d", invalidateClientID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return opt
|
||||||
|
}
|
||||||
|
|
2
redis.go
2
redis.go
|
@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
hs.hooksMu.Lock()
|
|
||||||
defer hs.hooksMu.Unlock()
|
|
||||||
return hs.current.dial(ctx, network, addr)
|
return hs.current.dial(ctx, network, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Dialer connection timeouts", func() {
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
const dialSimulatedDelay = 1 * time.Second
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
options := redisOptions()
|
||||||
|
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
|
// Simulated slow dialer.
|
||||||
|
// Note that the following sleep is deliberately not context-aware.
|
||||||
|
time.Sleep(dialSimulatedDelay)
|
||||||
|
return net.Dial("tcp", options.Addr)
|
||||||
|
}
|
||||||
|
options.MinIdleConns = 1
|
||||||
|
client = redis.NewClient(options)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
err := client.Close()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("does not contend on connection dial for concurrent commands", func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
const concurrency = 10
|
||||||
|
|
||||||
|
durations := make(chan time.Duration, concurrency)
|
||||||
|
errs := make(chan error, concurrency)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
wg.Add(concurrency)
|
||||||
|
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := client.Ping(ctx).Err()
|
||||||
|
durations <- time.Since(start)
|
||||||
|
errs <- err
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(durations)
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
// All commands should eventually succeed, after acquiring a connection.
|
||||||
|
for err := range errs {
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each individual command should complete within the simulated dial duration bound.
|
||||||
|
for duration := range durations {
|
||||||
|
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Due to concurrent execution, the entire test suite should also complete within
|
||||||
|
// the same dial duration bound applied for individual commands.
|
||||||
|
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue