Compare commits

...

8 Commits

Author SHA1 Message Date
ofekshenawa 2df7ee73f8
Merge branch 'master' into dependabot/github_actions/rojopolis/spellcheck-github-actions-0.45.0 2024-11-20 14:36:59 +02:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
ofekshenawa 7ba1f6ace6
Merge branch 'master' into dependabot/github_actions/rojopolis/spellcheck-github-actions-0.45.0 2024-11-20 13:44:49 +02:00
LINKIWI 080e051124
Eliminate redundant dial mutex causing unbounded connection queue contention (#3088)
* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 13:38:06 +02:00
ofekshenawa 96fe05849a
Merge branch 'master' into dependabot/github_actions/rojopolis/spellcheck-github-actions-0.45.0 2024-11-20 13:36:30 +02:00
ofekshenawa 930d904205
Add guidance on unstable RESP3 support for RediSearch commands to README (#3177)
* Add UnstableResp3 to docs

* Add RawVal and RawResult to wordlist

* Explain more about SetVal

* Add UnstableResp to wordlist
2024-11-13 13:20:59 +02:00
ofekshenawa 8b1073d2d6
Support Probabilistic commands with RESP 2 protocol (#3176)
* Support bloom resp 2

* Support Resp2 for BF.Info

* simplify BFInfoCmd field assignment using map-based key-to-field references
2024-11-13 11:15:19 +02:00
ofekshenawa d1b4eaed41
Support TimeSeries commands with RESP 2 protocol (#3184)
* Support Timeseries resp 2

* Change to resp 2

* Support Resp2 for TimeSeries commands
2024-11-13 10:27:00 +02:00
9 changed files with 2160 additions and 1808 deletions

View File

@ -54,6 +54,7 @@ stunnel
SynDump
TCP
TLS
UnstableResp
uri
URI
url
@ -62,3 +63,5 @@ RedisStack
RedisGears
RedisTimeseries
RediSearch
RawResult
RawVal

View File

@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
#### Unstable RESP3 Structures for RediSearch Commands
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
To enable unstable RESP3, set the option in your client configuration:
```go
redis.NewClient(&redis.Options{
UnstableResp3: true,
})
```
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
```go
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
```
## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

View File

@ -1403,27 +1403,63 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
}
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
cmd.val = make(map[string][]interface{}, n)
for i := 0; i < n; i++ {
k, err := rd.ReadString()
cmd.val = make(map[string][]interface{})
if readType == proto.RespMap {
n, err := rd.ReadMapLen()
if err != nil {
return err
}
nn, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
for i := 0; i < n; i++ {
k, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[k][j] = value
nn, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[k][j] = value
}
}
} else if readType == proto.RespArray {
// RESP2 response
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
// Each entry in this array is itself an array with key details
itemLen, err := rd.ReadArrayLen()
if err != nil {
return err
}
key, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[key] = make([]interface{}, 0, itemLen-1)
for j := 1; j < itemLen; j++ {
// Read the inner array for timestamp-value pairs
data, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[key] = append(cmd.val[key], data)
}
}
}

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand"
)
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes)
}
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
}
func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation)
}
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) {
for {
v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
}
}
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------
type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes {
if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency {
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency()
}
continue

View File

@ -319,37 +319,69 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
}
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
result := BFInfo{}
// Create a mapping from key names to pointers of struct fields
respMapping := map[string]*int64{
"Capacity": &result.Capacity,
"CAPACITY": &result.Capacity,
"Size": &result.Size,
"SIZE": &result.Size,
"Number of filters": &result.Filters,
"FILTERS": &result.Filters,
"Number of items inserted": &result.ItemsInserted,
"ITEMS": &result.ItemsInserted,
"Expansion rate": &result.ExpansionRate,
"EXPANSION": &result.ExpansionRate,
}
// Helper function to read and assign a value based on the key
readAndAssignValue := func(key string) error {
fieldPtr, exists := respMapping[key]
if !exists {
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
}
// Read the integer and assign to the field via pointer dereferencing
val, err := rd.ReadInt()
if err != nil {
return err
}
*fieldPtr = val
return nil
}
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
var key string
var result BFInfo
for f := 0; f < n; f++ {
key, err = rd.ReadString()
if len(cmd.args) > 2 && readType == proto.RespArray {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
switch key {
case "Capacity":
result.Capacity, err = rd.ReadInt()
case "Size":
result.Size, err = rd.ReadInt()
case "Number of filters":
result.Filters, err = rd.ReadInt()
case "Number of items inserted":
result.ItemsInserted, err = rd.ReadInt()
case "Expansion rate":
result.ExpansionRate, err = rd.ReadInt()
default:
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
if key, ok := cmd.args[2].(string); ok && n == 1 {
if err := readAndAssignValue(key); err != nil {
return err
}
} else {
return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
}
} else {
n, err := rd.ReadMapLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
key, err := rd.ReadString()
if err != nil {
return err
}
if err := readAndAssignValue(key); err != nil {
return err
}
}
}
cmd.val = result

File diff suppressed because it is too large Load Diff

View File

@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
}
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr)
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"testing"
"time"
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
}))
})
})
var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client
const dialSimulatedDelay = 1 * time.Second
BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup
const concurrency = 10
durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)
start := time.Now()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}
wg.Wait()
close(durations)
close(errs)
// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}
// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}
// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})

File diff suppressed because it is too large Load Diff