Compare commits

..

9 Commits

Author SHA1 Message Date
LINKIWI 605198a5f9
Merge fb59c1a2a1 into f1ffb55c9a 2024-11-20 14:37:11 +02:00
ofekshenawa fb59c1a2a1
Merge branch 'master' into command-key-heuristic-bytes 2024-11-20 14:37:07 +02:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
ofekshenawa df12957bd7
Merge branch 'master' into command-key-heuristic-bytes 2024-11-20 13:42:43 +02:00
LINKIWI 080e051124
Eliminate redundant dial mutex causing unbounded connection queue contention (#3088)
* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 13:38:06 +02:00
ofekshenawa 930d904205
Add guidance on unstable RESP3 support for RediSearch commands to README (#3177)
* Add UnstableResp3 to docs

* Add RawVal and RawResult to wordlist

* Explain more about SetVal

* Add UnstableResp to wordlist
2024-11-13 13:20:59 +02:00
ofekshenawa 8b1073d2d6
Support Probabilistic commands with RESP 2 protocol (#3176)
* Support bloom resp 2

* Support Resp2 for BF.Info

* simplify BFInfoCmd field assignment using map-based key-to-field references
2024-11-13 11:15:19 +02:00
ofekshenawa d1b4eaed41
Support TimeSeries commands with RESP 2 protocol (#3184)
* Support Timeseries resp 2

* Change to resp 2

* Support Resp2 for TimeSeries commands
2024-11-13 10:27:00 +02:00
andy-stark-redis 80c9f5bb77
DOC-4345 added JSON samples for home page (#3183) 2024-11-06 17:25:46 +02:00
10 changed files with 2359 additions and 1808 deletions

View File

@ -54,6 +54,7 @@ stunnel
SynDump SynDump
TCP TCP
TLS TLS
UnstableResp
uri uri
URI URI
url url
@ -62,3 +63,5 @@ RedisStack
RedisGears RedisGears
RedisTimeseries RedisTimeseries
RediSearch RediSearch
RawResult
RawVal

View File

@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
#### Unstable RESP3 Structures for RediSearch Commands #### Unstable RESP3 Structures for RediSearch Commands
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes. When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
To enable unstable RESP3, set the option in your client configuration:
```go
redis.NewClient(&redis.Options{
UnstableResp3: true,
})
```
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
```go
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
```
## Contributing ## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library! Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

View File

@ -1405,27 +1405,63 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
} }
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) { func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen() readType, err := rd.PeekReplyType()
if err != nil { if err != nil {
return err return err
} }
cmd.val = make(map[string][]interface{}, n)
for i := 0; i < n; i++ { cmd.val = make(map[string][]interface{})
k, err := rd.ReadString()
if readType == proto.RespMap {
n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
nn, err := rd.ReadArrayLen() for i := 0; i < n; i++ {
if err != nil { k, err := rd.ReadString()
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
if err != nil { if err != nil {
return err return err
} }
cmd.val[k][j] = value nn, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[k][j] = value
}
}
} else if readType == proto.RespArray {
// RESP2 response
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
// Each entry in this array is itself an array with key details
itemLen, err := rd.ReadArrayLen()
if err != nil {
return err
}
key, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[key] = make([]interface{}, 0, itemLen-1)
for j := 1; j < itemLen; j++ {
// Read the inner array for timestamp-value pairs
data, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[key] = append(cmd.val[key], data)
}
} }
} }

View File

@ -0,0 +1,199 @@
// EXAMPLE: go_home_json
// HIDE_START
package example_commands_test
// HIDE_END
// STEP_START import
import (
"context"
"fmt"
"sort"
"github.com/redis/go-redis/v9"
)
// STEP_END
func ExampleClient_search_json() {
// STEP_START connect
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
Protocol: 2,
})
// STEP_END
// REMOVE_START
rdb.Del(ctx, "user:1", "user:2", "user:3")
rdb.FTDropIndex(ctx, "idx:users")
// REMOVE_END
// STEP_START create_data
user1 := map[string]interface{}{
"name": "Paul John",
"email": "paul.john@example.com",
"age": 42,
"city": "London",
}
user2 := map[string]interface{}{
"name": "Eden Zamir",
"email": "eden.zamir@example.com",
"age": 29,
"city": "Tel Aviv",
}
user3 := map[string]interface{}{
"name": "Paul Zamir",
"email": "paul.zamir@example.com",
"age": 35,
"city": "Tel Aviv",
}
// STEP_END
// STEP_START make_index
_, err := rdb.FTCreate(
ctx,
"idx:users",
// Options:
&redis.FTCreateOptions{
OnJSON: true,
Prefix: []interface{}{"user:"},
},
// Index schema fields:
&redis.FieldSchema{
FieldName: "$.name",
As: "name",
FieldType: redis.SearchFieldTypeText,
},
&redis.FieldSchema{
FieldName: "$.city",
As: "city",
FieldType: redis.SearchFieldTypeTag,
},
&redis.FieldSchema{
FieldName: "$.age",
As: "age",
FieldType: redis.SearchFieldTypeNumeric,
},
).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START add_data
_, err = rdb.JSONSet(ctx, "user:1", "$", user1).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:2", "$", user2).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:3", "$", user3).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START query1
findPaulResult, err := rdb.FTSearch(
ctx,
"idx:users",
"Paul @age:[30 40]",
).Result()
if err != nil {
panic(err)
}
fmt.Println(findPaulResult)
// >>> {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv"...
// STEP_END
// STEP_START query2
citiesResult, err := rdb.FTSearchWithArgs(
ctx,
"idx:users",
"Paul",
&redis.FTSearchOptions{
Return: []redis.FTSearchReturn{
{
FieldName: "$.city",
As: "city",
},
},
},
).Result()
if err != nil {
panic(err)
}
sort.Slice(citiesResult.Docs, func(i, j int) bool {
return citiesResult.Docs[i].Fields["city"] < citiesResult.Docs[j].Fields["city"]
})
for _, result := range citiesResult.Docs {
fmt.Println(result.Fields["city"])
}
// >>> London
// >>> Tel Aviv
// STEP_END
// STEP_START query3
aggOptions := redis.FTAggregateOptions{
GroupBy: []redis.FTAggregateGroupBy{
{
Fields: []interface{}{"@city"},
Reduce: []redis.FTAggregateReducer{
{
Reducer: redis.SearchCount,
As: "count",
},
},
},
},
}
aggResult, err := rdb.FTAggregateWithArgs(
ctx,
"idx:users",
"*",
&aggOptions,
).Result()
if err != nil {
panic(err)
}
sort.Slice(aggResult.Rows, func(i, j int) bool {
return aggResult.Rows[i].Fields["city"].(string) <
aggResult.Rows[j].Fields["city"].(string)
})
for _, row := range aggResult.Rows {
fmt.Printf("%v - %v\n",
row.Fields["city"], row.Fields["count"],
)
}
// >>> City: London - 1
// >>> City: Tel Aviv - 2
// STEP_END
// Output:
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
// London
// Tel Aviv
// London - 1
// Tel Aviv - 2
}

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand" "github.com/redis/go-redis/v9/internal/rand"
) )
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be // ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic latency uint32 // atomic
generation uint32 // atomic generation uint32 // atomic
failing uint32 // atomic failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
} }
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes) latency = float64(dur) / float64(successes)
} }
atomic.StoreUint32(&n.latency, uint32(latency+0.5)) atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
} }
func (n *clusterNode) Latency() time.Duration { func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation) return atomic.LoadUint32(&n.generation)
} }
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) { func (n *clusterNode) SetGeneration(gen uint32) {
for { for {
v := atomic.LoadUint32(&n.generation) v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
} }
} }
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterNodes struct { type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock() c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0] c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes { for addr, node := range c.nodes {
if node.Generation() >= generation { if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr) c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency { if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency() go node.updateLatency()
} }
continue continue

View File

@ -319,37 +319,69 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
} }
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) { func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen() result := BFInfo{}
// Create a mapping from key names to pointers of struct fields
respMapping := map[string]*int64{
"Capacity": &result.Capacity,
"CAPACITY": &result.Capacity,
"Size": &result.Size,
"SIZE": &result.Size,
"Number of filters": &result.Filters,
"FILTERS": &result.Filters,
"Number of items inserted": &result.ItemsInserted,
"ITEMS": &result.ItemsInserted,
"Expansion rate": &result.ExpansionRate,
"EXPANSION": &result.ExpansionRate,
}
// Helper function to read and assign a value based on the key
readAndAssignValue := func(key string) error {
fieldPtr, exists := respMapping[key]
if !exists {
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
}
// Read the integer and assign to the field via pointer dereferencing
val, err := rd.ReadInt()
if err != nil {
return err
}
*fieldPtr = val
return nil
}
readType, err := rd.PeekReplyType()
if err != nil { if err != nil {
return err return err
} }
var key string if len(cmd.args) > 2 && readType == proto.RespArray {
var result BFInfo n, err := rd.ReadArrayLen()
for f := 0; f < n; f++ {
key, err = rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
if key, ok := cmd.args[2].(string); ok && n == 1 {
switch key { if err := readAndAssignValue(key); err != nil {
case "Capacity": return err
result.Capacity, err = rd.ReadInt() }
case "Size": } else {
result.Size, err = rd.ReadInt() return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
case "Number of filters":
result.Filters, err = rd.ReadInt()
case "Number of items inserted":
result.ItemsInserted, err = rd.ReadInt()
case "Expansion rate":
result.ExpansionRate, err = rd.ReadInt()
default:
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
} }
} else {
n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
for i := 0; i < n; i++ {
key, err := rd.ReadString()
if err != nil {
return err
}
if err := readAndAssignValue(key); err != nil {
return err
}
}
} }
cmd.val = result cmd.val = result

File diff suppressed because it is too large Load Diff

View File

@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
} }
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) { func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr) return hs.current.dial(ctx, network, addr)
} }

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
})) }))
}) })
}) })
var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client
const dialSimulatedDelay = 1 * time.Second
BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup
const concurrency = 10
durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)
start := time.Now()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}
wg.Wait()
close(durations)
close(errs)
// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}
// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}
// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})

File diff suppressed because it is too large Load Diff