Compare commits

...

16 Commits

Author SHA1 Message Date
ofekshenawa c2834b66b5
Merge a5fdfa2ec8 into e63669e170 2024-11-22 00:17:39 -05:00
dependabot[bot] e63669e170
chore(deps): bump rojopolis/spellcheck-github-actions (#3188)
Bumps [rojopolis/spellcheck-github-actions](https://github.com/rojopolis/spellcheck-github-actions) from 0.40.0 to 0.45.0.
- [Release notes](https://github.com/rojopolis/spellcheck-github-actions/releases)
- [Changelog](https://github.com/rojopolis/spellcheck-github-actions/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rojopolis/spellcheck-github-actions/compare/0.40.0...0.45.0)

---
updated-dependencies:
- dependency-name: rojopolis/spellcheck-github-actions
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:38 +02:00
LINKIWI fc32d0a01d
Recognize byte slice for key argument in cluster client hash slot computation (#3049)
Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:11 +02:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
LINKIWI 080e051124
Eliminate redundant dial mutex causing unbounded connection queue contention (#3088)
* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 13:38:06 +02:00
ofekshenawa 930d904205
Add guidance on unstable RESP3 support for RediSearch commands to README (#3177)
* Add UnstableResp3 to docs

* Add RawVal and RawResult to wordlist

* Explain more about SetVal

* Add UnstableResp to wordlist
2024-11-13 13:20:59 +02:00
ofekshenawa 8b1073d2d6
Support Probabilistic commands with RESP 2 protocol (#3176)
* Support bloom resp 2

* Support Resp2 for BF.Info

* simplify BFInfoCmd field assignment using map-based key-to-field references
2024-11-13 11:15:19 +02:00
ofekshenawa d1b4eaed41
Support TimeSeries commands with RESP 2 protocol (#3184)
* Support Timeseries resp 2

* Change to resp 2

* Support Resp2 for TimeSeries commands
2024-11-13 10:27:00 +02:00
andy-stark-redis 80c9f5bb77
DOC-4345 added JSON samples for home page (#3183) 2024-11-06 17:25:46 +02:00
ofekshenawa a5fdfa2ec8 Change getInterleavedArguments function 2024-09-23 01:37:41 +03:00
ofekshenawa be805cbe10 Remove extractKeys function 2024-09-23 01:34:38 +03:00
ofekshenawa 8de72d6d3c Extract keys from string commands 2024-09-23 01:32:35 +03:00
ofekshenawa c4cce2333a Add keys method to Cmder 2024-09-23 01:04:53 +03:00
ofekshenawa 5b30e4ecb8 Test extractKeys 2024-09-20 18:44:49 +03:00
ofekshenawa 249510cbb5 Create extractKeys function 2024-09-20 17:28:54 +03:00
ofekshenawa 711143687c Initial testing 2024-09-20 16:13:13 +03:00
14 changed files with 2494 additions and 1811 deletions

View File

@ -54,6 +54,7 @@ stunnel
SynDump SynDump
TCP TCP
TLS TLS
UnstableResp
uri uri
URI URI
url url
@ -62,3 +63,5 @@ RedisStack
RedisGears RedisGears
RedisTimeseries RedisTimeseries
RediSearch RediSearch
RawResult
RawVal

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Check Spelling - name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.40.0 uses: rojopolis/spellcheck-github-actions@0.45.0
with: with:
config_path: .github/spellcheck-settings.yml config_path: .github/spellcheck-settings.yml
task_name: Markdown task_name: Markdown

View File

@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
#### Unstable RESP3 Structures for RediSearch Commands #### Unstable RESP3 Structures for RediSearch Commands
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes. When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
To enable unstable RESP3, set the option in your client configuration:
```go
redis.NewClient(&redis.Options{
UnstableResp3: true,
})
```
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
```go
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
```
## Contributing ## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library! Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

View File

@ -30,6 +30,9 @@ type Cmder interface {
// e.g. "set k v ex 10" -> "[set k v ex 10]". // e.g. "set k v ex 10" -> "[set k v ex 10]".
Args() []interface{} Args() []interface{}
//all keys in command.
Keys() []string
// format request and response string. // format request and response string.
// e.g. "set k v ex 10" -> "set k v ex 10: OK", "get k" -> "get k: v". // e.g. "set k v ex 10" -> "set k v ex 10: OK", "get k" -> "get k: v".
String() string String() string
@ -126,6 +129,7 @@ type baseCmd struct {
args []interface{} args []interface{}
err error err error
keyPos int8 keyPos int8
keys []string
rawVal interface{} rawVal interface{}
_readTimeout *time.Duration _readTimeout *time.Duration
} }
@ -159,6 +163,10 @@ func (cmd *baseCmd) Args() []interface{} {
return cmd.args return cmd.args
} }
func (cmd *baseCmd) Keys() []string {
return cmd.keys
}
func (cmd *baseCmd) stringArg(pos int) string { func (cmd *baseCmd) stringArg(pos int) string {
if pos < 0 || pos >= len(cmd.args) { if pos < 0 || pos >= len(cmd.args) {
return "" return ""
@ -167,6 +175,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
switch v := arg.(type) { switch v := arg.(type) {
case string: case string:
return v return v
case []byte:
return string(v)
default: default:
// TODO: consider using appendArg // TODO: consider using appendArg
return fmt.Sprint(v) return fmt.Sprint(v)
@ -202,6 +212,22 @@ func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) {
return err return err
} }
// getInterleavedArguments returns arguments at even indices starting from index 1.
func (cmd *baseCmd) getInterleavedArguments() []string {
return cmd.getInterleavedArgumentsWithOffset(1)
}
// getInterleavedArgumentsWithOffset returns arguments at even indices starting from the specified offset.
func (cmd *baseCmd) getInterleavedArgumentsWithOffset(offset int) []string {
var matchingArguments []string
for i := offset; i < len(cmd.args); i += 2 {
if arg, ok := cmd.args[i].(string); ok {
matchingArguments = append(matchingArguments, arg)
}
}
return matchingArguments
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Cmd struct { type Cmd struct {
@ -1403,27 +1429,63 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
} }
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) { func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen() readType, err := rd.PeekReplyType()
if err != nil { if err != nil {
return err return err
} }
cmd.val = make(map[string][]interface{}, n)
for i := 0; i < n; i++ { cmd.val = make(map[string][]interface{})
k, err := rd.ReadString()
if readType == proto.RespMap {
n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
nn, err := rd.ReadArrayLen() for i := 0; i < n; i++ {
if err != nil { k, err := rd.ReadString()
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
if err != nil { if err != nil {
return err return err
} }
cmd.val[k][j] = value nn, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[k][j] = value
}
}
} else if readType == proto.RespArray {
// RESP2 response
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
// Each entry in this array is itself an array with key details
itemLen, err := rd.ReadArrayLen()
if err != nil {
return err
}
key, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[key] = make([]interface{}, 0, itemLen-1)
for j := 1; j < itemLen; j++ {
// Read the inner array for timestamp-value pairs
data, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[key] = append(cmd.val[key], data)
}
} }
} }

View File

@ -7279,6 +7279,67 @@ var _ = Describe("Commands", func() {
}) })
}) })
var _ = Describe("Keys Extraction Tests", func() {
var client *redis.Client
var ctx = context.TODO()
BeforeEach(func() {
client = redis.NewClient(redisOptions())
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
// STRING COMMANDS
It("should test Append command", func() {
cmd := client.Append(ctx, "key1", "value")
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
It("should test Decr command", func() {
cmd := client.Decr(ctx, "key1")
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
It("should test Get command", func() {
cmd := client.Get(ctx, "key1")
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
It("should test MGet command", func() {
cmd := client.MGet(ctx, "key1", "key2", "key3")
Expect(cmd.Keys()).To(Equal([]string{"key1", "key2", "key3"}))
})
It("should test Set command", func() {
cmd := client.Set(ctx, "key1", "value", time.Second)
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
It("should test MSet command", func() {
cmd := client.MSet(ctx, "key1", "value1", "key2", "value2")
Expect(cmd.Keys()).To(Equal([]string{"key1", "key2"}))
})
It("should test IncrBy command", func() {
cmd := client.IncrBy(ctx, "key1", 10)
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
It("should test SetNX command", func() {
cmd := client.SetNX(ctx, "key1", "value", time.Second)
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
It("should test GetDel command", func() {
cmd := client.GetDel(ctx, "key1")
Expect(cmd.Keys()).To(Equal([]string{"key1"}))
})
})
type numberStruct struct { type numberStruct struct {
Number int Number int
} }

View File

@ -0,0 +1,199 @@
// EXAMPLE: go_home_json
// HIDE_START
package example_commands_test
// HIDE_END
// STEP_START import
import (
"context"
"fmt"
"sort"
"github.com/redis/go-redis/v9"
)
// STEP_END
func ExampleClient_search_json() {
// STEP_START connect
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
Protocol: 2,
})
// STEP_END
// REMOVE_START
rdb.Del(ctx, "user:1", "user:2", "user:3")
rdb.FTDropIndex(ctx, "idx:users")
// REMOVE_END
// STEP_START create_data
user1 := map[string]interface{}{
"name": "Paul John",
"email": "paul.john@example.com",
"age": 42,
"city": "London",
}
user2 := map[string]interface{}{
"name": "Eden Zamir",
"email": "eden.zamir@example.com",
"age": 29,
"city": "Tel Aviv",
}
user3 := map[string]interface{}{
"name": "Paul Zamir",
"email": "paul.zamir@example.com",
"age": 35,
"city": "Tel Aviv",
}
// STEP_END
// STEP_START make_index
_, err := rdb.FTCreate(
ctx,
"idx:users",
// Options:
&redis.FTCreateOptions{
OnJSON: true,
Prefix: []interface{}{"user:"},
},
// Index schema fields:
&redis.FieldSchema{
FieldName: "$.name",
As: "name",
FieldType: redis.SearchFieldTypeText,
},
&redis.FieldSchema{
FieldName: "$.city",
As: "city",
FieldType: redis.SearchFieldTypeTag,
},
&redis.FieldSchema{
FieldName: "$.age",
As: "age",
FieldType: redis.SearchFieldTypeNumeric,
},
).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START add_data
_, err = rdb.JSONSet(ctx, "user:1", "$", user1).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:2", "$", user2).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:3", "$", user3).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START query1
findPaulResult, err := rdb.FTSearch(
ctx,
"idx:users",
"Paul @age:[30 40]",
).Result()
if err != nil {
panic(err)
}
fmt.Println(findPaulResult)
// >>> {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv"...
// STEP_END
// STEP_START query2
citiesResult, err := rdb.FTSearchWithArgs(
ctx,
"idx:users",
"Paul",
&redis.FTSearchOptions{
Return: []redis.FTSearchReturn{
{
FieldName: "$.city",
As: "city",
},
},
},
).Result()
if err != nil {
panic(err)
}
sort.Slice(citiesResult.Docs, func(i, j int) bool {
return citiesResult.Docs[i].Fields["city"] < citiesResult.Docs[j].Fields["city"]
})
for _, result := range citiesResult.Docs {
fmt.Println(result.Fields["city"])
}
// >>> London
// >>> Tel Aviv
// STEP_END
// STEP_START query3
aggOptions := redis.FTAggregateOptions{
GroupBy: []redis.FTAggregateGroupBy{
{
Fields: []interface{}{"@city"},
Reduce: []redis.FTAggregateReducer{
{
Reducer: redis.SearchCount,
As: "count",
},
},
},
},
}
aggResult, err := rdb.FTAggregateWithArgs(
ctx,
"idx:users",
"*",
&aggOptions,
).Result()
if err != nil {
panic(err)
}
sort.Slice(aggResult.Rows, func(i, j int) bool {
return aggResult.Rows[i].Fields["city"].(string) <
aggResult.Rows[j].Fields["city"].(string)
})
for _, row := range aggResult.Rows {
fmt.Printf("%v - %v\n",
row.Fields["city"], row.Fields["count"],
)
}
// >>> City: London - 1
// >>> City: Tel Aviv - 2
// STEP_END
// Output:
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
// London
// Tel Aviv
// London - 1
// Tel Aviv - 2
}

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand" "github.com/redis/go-redis/v9/internal/rand"
) )
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be // ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic latency uint32 // atomic
generation uint32 // atomic generation uint32 // atomic
failing uint32 // atomic failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
} }
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes) latency = float64(dur) / float64(successes)
} }
atomic.StoreUint32(&n.latency, uint32(latency+0.5)) atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
} }
func (n *clusterNode) Latency() time.Duration { func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation) return atomic.LoadUint32(&n.generation)
} }
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) { func (n *clusterNode) SetGeneration(gen uint32) {
for { for {
v := atomic.LoadUint32(&n.generation) v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
} }
} }
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterNodes struct { type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock() c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0] c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes { for addr, node := range c.nodes {
if node.Generation() >= generation { if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr) c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency { if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency() go node.updateLatency()
} }
continue continue

View File

@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
It("determines hash slots correctly for generic commands", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
err := client.Do(ctx, "GET", "A").Err()
Expect(err).To(Equal(redis.Nil))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(Equal(redis.Nil))
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err = client.Do(ctx, "GET", "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("follows node redirection immediately", func() { It("follows node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection // Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions() opt := redisClusterOptions()

View File

@ -319,37 +319,69 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
} }
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) { func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen() result := BFInfo{}
// Create a mapping from key names to pointers of struct fields
respMapping := map[string]*int64{
"Capacity": &result.Capacity,
"CAPACITY": &result.Capacity,
"Size": &result.Size,
"SIZE": &result.Size,
"Number of filters": &result.Filters,
"FILTERS": &result.Filters,
"Number of items inserted": &result.ItemsInserted,
"ITEMS": &result.ItemsInserted,
"Expansion rate": &result.ExpansionRate,
"EXPANSION": &result.ExpansionRate,
}
// Helper function to read and assign a value based on the key
readAndAssignValue := func(key string) error {
fieldPtr, exists := respMapping[key]
if !exists {
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
}
// Read the integer and assign to the field via pointer dereferencing
val, err := rd.ReadInt()
if err != nil {
return err
}
*fieldPtr = val
return nil
}
readType, err := rd.PeekReplyType()
if err != nil { if err != nil {
return err return err
} }
var key string if len(cmd.args) > 2 && readType == proto.RespArray {
var result BFInfo n, err := rd.ReadArrayLen()
for f := 0; f < n; f++ {
key, err = rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
if key, ok := cmd.args[2].(string); ok && n == 1 {
switch key { if err := readAndAssignValue(key); err != nil {
case "Capacity": return err
result.Capacity, err = rd.ReadInt() }
case "Size": } else {
result.Size, err = rd.ReadInt() return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
case "Number of filters":
result.Filters, err = rd.ReadInt()
case "Number of items inserted":
result.ItemsInserted, err = rd.ReadInt()
case "Expansion rate":
result.ExpansionRate, err = rd.ReadInt()
default:
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
} }
} else {
n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
for i := 0; i < n; i++ {
key, err := rd.ReadString()
if err != nil {
return err
}
if err := readAndAssignValue(key); err != nil {
return err
}
}
} }
cmd.val = result cmd.val = result

File diff suppressed because it is too large Load Diff

View File

@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
} }
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) { func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr) return hs.current.dial(ctx, network, addr)
} }

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
})) }))
}) })
}) })
var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client
const dialSimulatedDelay = 1 * time.Second
BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup
const concurrency = 10
durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)
start := time.Now()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}
wg.Wait()
close(durations)
close(errs)
// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}
// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}
// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})

View File

@ -32,18 +32,21 @@ type StringCmdable interface {
func (c cmdable) Append(ctx context.Context, key, value string) *IntCmd { func (c cmdable) Append(ctx context.Context, key, value string) *IntCmd {
cmd := NewIntCmd(ctx, "append", key, value) cmd := NewIntCmd(ctx, "append", key, value)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) Decr(ctx context.Context, key string) *IntCmd { func (c cmdable) Decr(ctx context.Context, key string) *IntCmd {
cmd := NewIntCmd(ctx, "decr", key) cmd := NewIntCmd(ctx, "decr", key)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) DecrBy(ctx context.Context, key string, decrement int64) *IntCmd { func (c cmdable) DecrBy(ctx context.Context, key string, decrement int64) *IntCmd {
cmd := NewIntCmd(ctx, "decrby", key, decrement) cmd := NewIntCmd(ctx, "decrby", key, decrement)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -51,18 +54,21 @@ func (c cmdable) DecrBy(ctx context.Context, key string, decrement int64) *IntCm
// Get Redis `GET key` command. It returns redis.Nil error when key does not exist. // Get Redis `GET key` command. It returns redis.Nil error when key does not exist.
func (c cmdable) Get(ctx context.Context, key string) *StringCmd { func (c cmdable) Get(ctx context.Context, key string) *StringCmd {
cmd := NewStringCmd(ctx, "get", key) cmd := NewStringCmd(ctx, "get", key)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) GetRange(ctx context.Context, key string, start, end int64) *StringCmd { func (c cmdable) GetRange(ctx context.Context, key string, start, end int64) *StringCmd {
cmd := NewStringCmd(ctx, "getrange", key, start, end) cmd := NewStringCmd(ctx, "getrange", key, start, end)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) GetSet(ctx context.Context, key string, value interface{}) *StringCmd { func (c cmdable) GetSet(ctx context.Context, key string, value interface{}) *StringCmd {
cmd := NewStringCmd(ctx, "getset", key, value) cmd := NewStringCmd(ctx, "getset", key, value)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -83,6 +89,7 @@ func (c cmdable) GetEx(ctx context.Context, key string, expiration time.Duration
} }
cmd := NewStringCmd(ctx, args...) cmd := NewStringCmd(ctx, args...)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -90,24 +97,28 @@ func (c cmdable) GetEx(ctx context.Context, key string, expiration time.Duration
// GetDel redis-server version >= 6.2.0. // GetDel redis-server version >= 6.2.0.
func (c cmdable) GetDel(ctx context.Context, key string) *StringCmd { func (c cmdable) GetDel(ctx context.Context, key string) *StringCmd {
cmd := NewStringCmd(ctx, "getdel", key) cmd := NewStringCmd(ctx, "getdel", key)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) Incr(ctx context.Context, key string) *IntCmd { func (c cmdable) Incr(ctx context.Context, key string) *IntCmd {
cmd := NewIntCmd(ctx, "incr", key) cmd := NewIntCmd(ctx, "incr", key)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) IncrBy(ctx context.Context, key string, value int64) *IntCmd { func (c cmdable) IncrBy(ctx context.Context, key string, value int64) *IntCmd {
cmd := NewIntCmd(ctx, "incrby", key, value) cmd := NewIntCmd(ctx, "incrby", key, value)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) IncrByFloat(ctx context.Context, key string, value float64) *FloatCmd { func (c cmdable) IncrByFloat(ctx context.Context, key string, value float64) *FloatCmd {
cmd := NewFloatCmd(ctx, "incrbyfloat", key, value) cmd := NewFloatCmd(ctx, "incrbyfloat", key, value)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -125,6 +136,7 @@ func (c cmdable) MGet(ctx context.Context, keys ...string) *SliceCmd {
args[1+i] = key args[1+i] = key
} }
cmd := NewSliceCmd(ctx, args...) cmd := NewSliceCmd(ctx, args...)
cmd.keys = append(cmd.keys, keys...)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -139,6 +151,7 @@ func (c cmdable) MSet(ctx context.Context, values ...interface{}) *StatusCmd {
args[0] = "mset" args[0] = "mset"
args = appendArgs(args, values) args = appendArgs(args, values)
cmd := NewStatusCmd(ctx, args...) cmd := NewStatusCmd(ctx, args...)
cmd.keys = append(cmd.keys, cmd.getInterleavedArguments()...)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -153,6 +166,7 @@ func (c cmdable) MSetNX(ctx context.Context, values ...interface{}) *BoolCmd {
args[0] = "msetnx" args[0] = "msetnx"
args = appendArgs(args, values) args = appendArgs(args, values)
cmd := NewBoolCmd(ctx, args...) cmd := NewBoolCmd(ctx, args...)
cmd.keys = append(cmd.keys, cmd.getInterleavedArguments()...)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -179,6 +193,7 @@ func (c cmdable) Set(ctx context.Context, key string, value interface{}, expirat
} }
cmd := NewStatusCmd(ctx, args...) cmd := NewStatusCmd(ctx, args...)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -230,6 +245,7 @@ func (c cmdable) SetArgs(ctx context.Context, key string, value interface{}, a S
} }
cmd := NewStatusCmd(ctx, args...) cmd := NewStatusCmd(ctx, args...)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -237,6 +253,7 @@ func (c cmdable) SetArgs(ctx context.Context, key string, value interface{}, a S
// SetEx Redis `SETEx key expiration value` command. // SetEx Redis `SETEx key expiration value` command.
func (c cmdable) SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd { func (c cmdable) SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
cmd := NewStatusCmd(ctx, "setex", key, formatSec(ctx, expiration), value) cmd := NewStatusCmd(ctx, "setex", key, formatSec(ctx, expiration), value)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -261,7 +278,7 @@ func (c cmdable) SetNX(ctx context.Context, key string, value interface{}, expir
cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "nx") cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "nx")
} }
} }
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -285,19 +302,21 @@ func (c cmdable) SetXX(ctx context.Context, key string, value interface{}, expir
cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "xx") cmd = NewBoolCmd(ctx, "set", key, value, "ex", formatSec(ctx, expiration), "xx")
} }
} }
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd { func (c cmdable) SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd {
cmd := NewIntCmd(ctx, "setrange", key, offset, value) cmd := NewIntCmd(ctx, "setrange", key, offset, value)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
func (c cmdable) StrLen(ctx context.Context, key string) *IntCmd { func (c cmdable) StrLen(ctx context.Context, key string) *IntCmd {
cmd := NewIntCmd(ctx, "strlen", key) cmd := NewIntCmd(ctx, "strlen", key)
cmd.keys = append(cmd.keys, key)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }

File diff suppressed because it is too large Load Diff