Compare commits

...

17 Commits

Author SHA1 Message Date
Vladyslav Vildanov 0942cec0ab
Merge 9c0b623d20 into e63669e170 2024-11-22 10:14:37 +09:00
dependabot[bot] e63669e170
chore(deps): bump rojopolis/spellcheck-github-actions (#3188)
Bumps [rojopolis/spellcheck-github-actions](https://github.com/rojopolis/spellcheck-github-actions) from 0.40.0 to 0.45.0.
- [Release notes](https://github.com/rojopolis/spellcheck-github-actions/releases)
- [Changelog](https://github.com/rojopolis/spellcheck-github-actions/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rojopolis/spellcheck-github-actions/compare/0.40.0...0.45.0)

---
updated-dependencies:
- dependency-name: rojopolis/spellcheck-github-actions
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:38 +02:00
LINKIWI fc32d0a01d
Recognize byte slice for key argument in cluster client hash slot computation (#3049)
Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-21 14:38:11 +02:00
Justin f1ffb55c9a
Only check latencies once every 10 seconds with `routeByLatency` (#2795)
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes https://github.com/redis/go-redis/issues/2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 14:36:39 +02:00
LINKIWI 080e051124
Eliminate redundant dial mutex causing unbounded connection queue contention (#3088)
* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-11-20 13:38:06 +02:00
ofekshenawa 930d904205
Add guidance on unstable RESP3 support for RediSearch commands to README (#3177)
* Add UnstableResp3 to docs

* Add RawVal and RawResult to wordlist

* Explain more about SetVal

* Add UnstableResp to wordlist
2024-11-13 13:20:59 +02:00
ofekshenawa 8b1073d2d6
Support Probabilistic commands with RESP 2 protocol (#3176)
* Support bloom resp 2

* Support Resp2 for BF.Info

* simplify BFInfoCmd field assignment using map-based key-to-field references
2024-11-13 11:15:19 +02:00
ofekshenawa d1b4eaed41
Support TimeSeries commands with RESP 2 protocol (#3184)
* Support Timeseries resp 2

* Change to resp 2

* Support Resp2 for TimeSeries commands
2024-11-13 10:27:00 +02:00
andy-stark-redis 80c9f5bb77
DOC-4345 added JSON samples for home page (#3183) 2024-11-06 17:25:46 +02:00
vladvildanov 9c0b623d20 Proper retract 2024-10-17 13:35:34 +03:00
vladvildanov 76d12b4cc4 Removed branch setup 2024-10-17 13:33:18 +03:00
vladvildanov 1e2c9ef900 Merge branch 'master' of github.com:redis/go-redis into vv-9.7-master-sync 2024-10-17 13:15:41 +03:00
Vladyslav Vildanov ed37c33a90
Updated package version [9.7] (#3159) 2024-10-16 10:01:32 +03:00
Vladyslav Vildanov 135f8e3b12
Fix field name spellings (#3132) (#3156)
Co-authored-by: andy-stark-redis <164213578+andy-stark-redis@users.noreply.github.com>
2024-10-14 17:13:57 +03:00
Vladyslav Vildanov ac2e91d9d9
Support Json with Resp 2 (#3146) (#3155)
* Support ReJSON resp 2 && Test ReJSON against RESP 2 and 3 && Add complex search and json test

* Remove comments

* Remove unnecessary changes

Co-authored-by: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com>
2024-10-14 14:59:45 +03:00
Vladyslav Vildanov ec680aec14
Remove direct read from TLS underlying conn (#3138) (#3154)
Co-authored-by: Francesco Renzi <rentziass@github.com>
2024-10-14 14:55:45 +03:00
Vladyslav Vildanov ad131f49b0
Updated package version (#3134)
* Updated package version

* Changed version format according to specification

* Updated submodule versions
2024-09-26 13:14:30 +03:00
23 changed files with 2403 additions and 1824 deletions

View File

@ -54,6 +54,7 @@ stunnel
SynDump SynDump
TCP TCP
TLS TLS
UnstableResp
uri uri
URI URI
url url
@ -62,3 +63,5 @@ RedisStack
RedisGears RedisGears
RedisTimeseries RedisTimeseries
RediSearch RediSearch
RawResult
RawVal

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Check Spelling - name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.40.0 uses: rojopolis/spellcheck-github-actions@0.45.0
with: with:
config_path: .github/spellcheck-settings.yml config_path: .github/spellcheck-settings.yml
task_name: Markdown task_name: Markdown

View File

@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
#### Unstable RESP3 Structures for RediSearch Commands #### Unstable RESP3 Structures for RediSearch Commands
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes. When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
To enable unstable RESP3, set the option in your client configuration:
```go
redis.NewClient(&redis.Options{
UnstableResp3: true,
})
```
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
```go
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
```
## Contributing ## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library! Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

View File

@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
switch v := arg.(type) { switch v := arg.(type) {
case string: case string:
return v return v
case []byte:
return string(v)
default: default:
// TODO: consider using appendArg // TODO: consider using appendArg
return fmt.Sprint(v) return fmt.Sprint(v)
@ -1403,11 +1405,18 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
} }
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) { func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
cmd.val = make(map[string][]interface{})
if readType == proto.RespMap {
n, err := rd.ReadMapLen() n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
cmd.val = make(map[string][]interface{}, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
k, err := rd.ReadString() k, err := rd.ReadString()
if err != nil { if err != nil {
@ -1426,6 +1435,35 @@ func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
cmd.val[k][j] = value cmd.val[k][j] = value
} }
} }
} else if readType == proto.RespArray {
// RESP2 response
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
// Each entry in this array is itself an array with key details
itemLen, err := rd.ReadArrayLen()
if err != nil {
return err
}
key, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[key] = make([]interface{}, 0, itemLen-1)
for j := 1; j < itemLen; j++ {
// Read the inner array for timestamp-value pairs
data, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[key] = append(cmd.val[key], data)
}
}
}
return nil return nil
} }

View File

@ -0,0 +1,199 @@
// EXAMPLE: go_home_json
// HIDE_START
package example_commands_test
// HIDE_END
// STEP_START import
import (
"context"
"fmt"
"sort"
"github.com/redis/go-redis/v9"
)
// STEP_END
func ExampleClient_search_json() {
// STEP_START connect
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
Protocol: 2,
})
// STEP_END
// REMOVE_START
rdb.Del(ctx, "user:1", "user:2", "user:3")
rdb.FTDropIndex(ctx, "idx:users")
// REMOVE_END
// STEP_START create_data
user1 := map[string]interface{}{
"name": "Paul John",
"email": "paul.john@example.com",
"age": 42,
"city": "London",
}
user2 := map[string]interface{}{
"name": "Eden Zamir",
"email": "eden.zamir@example.com",
"age": 29,
"city": "Tel Aviv",
}
user3 := map[string]interface{}{
"name": "Paul Zamir",
"email": "paul.zamir@example.com",
"age": 35,
"city": "Tel Aviv",
}
// STEP_END
// STEP_START make_index
_, err := rdb.FTCreate(
ctx,
"idx:users",
// Options:
&redis.FTCreateOptions{
OnJSON: true,
Prefix: []interface{}{"user:"},
},
// Index schema fields:
&redis.FieldSchema{
FieldName: "$.name",
As: "name",
FieldType: redis.SearchFieldTypeText,
},
&redis.FieldSchema{
FieldName: "$.city",
As: "city",
FieldType: redis.SearchFieldTypeTag,
},
&redis.FieldSchema{
FieldName: "$.age",
As: "age",
FieldType: redis.SearchFieldTypeNumeric,
},
).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START add_data
_, err = rdb.JSONSet(ctx, "user:1", "$", user1).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:2", "$", user2).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:3", "$", user3).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START query1
findPaulResult, err := rdb.FTSearch(
ctx,
"idx:users",
"Paul @age:[30 40]",
).Result()
if err != nil {
panic(err)
}
fmt.Println(findPaulResult)
// >>> {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv"...
// STEP_END
// STEP_START query2
citiesResult, err := rdb.FTSearchWithArgs(
ctx,
"idx:users",
"Paul",
&redis.FTSearchOptions{
Return: []redis.FTSearchReturn{
{
FieldName: "$.city",
As: "city",
},
},
},
).Result()
if err != nil {
panic(err)
}
sort.Slice(citiesResult.Docs, func(i, j int) bool {
return citiesResult.Docs[i].Fields["city"] < citiesResult.Docs[j].Fields["city"]
})
for _, result := range citiesResult.Docs {
fmt.Println(result.Fields["city"])
}
// >>> London
// >>> Tel Aviv
// STEP_END
// STEP_START query3
aggOptions := redis.FTAggregateOptions{
GroupBy: []redis.FTAggregateGroupBy{
{
Fields: []interface{}{"@city"},
Reduce: []redis.FTAggregateReducer{
{
Reducer: redis.SearchCount,
As: "count",
},
},
},
},
}
aggResult, err := rdb.FTAggregateWithArgs(
ctx,
"idx:users",
"*",
&aggOptions,
).Result()
if err != nil {
panic(err)
}
sort.Slice(aggResult.Rows, func(i, j int) bool {
return aggResult.Rows[i].Fields["city"].(string) <
aggResult.Rows[j].Fields["city"].(string)
})
for _, row := range aggResult.Rows {
fmt.Printf("%v - %v\n",
row.Fields["city"], row.Fields["count"],
)
}
// >>> City: London - 1
// >>> City: Tel Aviv - 2
// STEP_END
// Output:
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
// London
// Tel Aviv
// London - 1
// Tel Aviv - 2
}

View File

@ -5,7 +5,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../.. replace github.com/redis/go-redis/v9 => ../..
require ( require (
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
) )

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../.. replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.6.2 require github.com/redis/go-redis/v9 v9.7.0
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../.. replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.6.2 require github.com/redis/go-redis/v9 v9.7.0
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect

View File

@ -9,8 +9,8 @@ replace github.com/redis/go-redis/extra/redisotel/v9 => ../../extra/redisotel
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd
require ( require (
github.com/redis/go-redis/extra/redisotel/v9 v9.6.2 github.com/redis/go-redis/extra/redisotel/v9 v9.7.0
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
github.com/uptrace/uptrace-go v1.21.0 github.com/uptrace/uptrace-go v1.21.0
go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel v1.22.0
) )
@ -23,7 +23,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2 // indirect github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect

View File

@ -4,7 +4,7 @@ go 1.18
replace github.com/redis/go-redis/v9 => ../.. replace github.com/redis/go-redis/v9 => ../..
require github.com/redis/go-redis/v9 v9.6.2 require github.com/redis/go-redis/v9 v9.7.0
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require ( require (
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
) )
require ( require (

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require ( require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2 github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
go.opencensus.io v0.24.0 go.opencensus.io v0.24.0
) )

View File

@ -7,7 +7,7 @@ replace github.com/redis/go-redis/v9 => ../..
require ( require (
github.com/bsm/ginkgo/v2 v2.12.0 github.com/bsm/ginkgo/v2 v2.12.0
github.com/bsm/gomega v1.27.10 github.com/bsm/gomega v1.27.10
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
) )
require ( require (

View File

@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
require ( require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2 github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/metric v1.22.0
go.opentelemetry.io/otel/sdk v1.22.0 go.opentelemetry.io/otel/sdk v1.22.0

View File

@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
require ( require (
github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_golang v1.14.0
github.com/redis/go-redis/v9 v9.6.2 github.com/redis/go-redis/v9 v9.7.0
) )
require ( require (

View File

@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand" "github.com/redis/go-redis/v9/internal/rand"
) )
const (
minLatencyMeasurementInterval = 10 * time.Second
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
// ClusterOptions are used to configure a cluster client and should be // ClusterOptions are used to configure a cluster client and should be
@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic latency uint32 // atomic
generation uint32 // atomic generation uint32 // atomic
failing uint32 // atomic failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
} }
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes) latency = float64(dur) / float64(successes)
} }
atomic.StoreUint32(&n.latency, uint32(latency+0.5)) atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
} }
func (n *clusterNode) Latency() time.Duration { func (n *clusterNode) Latency() time.Duration {
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation) return atomic.LoadUint32(&n.generation)
} }
func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}
func (n *clusterNode) SetGeneration(gen uint32) { func (n *clusterNode) SetGeneration(gen uint32) {
for { for {
v := atomic.LoadUint32(&n.generation) v := atomic.LoadUint32(&n.generation)
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
} }
} }
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type clusterNodes struct { type clusterNodes struct {
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock() c.mu.Lock()
c.activeAddrs = c.activeAddrs[:0] c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes { for addr, node := range c.nodes {
if node.Generation() >= generation { if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr) c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency { if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency() go node.updateLatency()
} }
continue continue

View File

@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
It("determines hash slots correctly for generic commands", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
err := client.Do(ctx, "GET", "A").Err()
Expect(err).To(Equal(redis.Nil))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(Equal(redis.Nil))
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err = client.Do(ctx, "GET", "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("follows node redirection immediately", func() { It("follows node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection // Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions() opt := redisClusterOptions()

View File

@ -319,38 +319,70 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
} }
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) { func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
result := BFInfo{}
// Create a mapping from key names to pointers of struct fields
respMapping := map[string]*int64{
"Capacity": &result.Capacity,
"CAPACITY": &result.Capacity,
"Size": &result.Size,
"SIZE": &result.Size,
"Number of filters": &result.Filters,
"FILTERS": &result.Filters,
"Number of items inserted": &result.ItemsInserted,
"ITEMS": &result.ItemsInserted,
"Expansion rate": &result.ExpansionRate,
"EXPANSION": &result.ExpansionRate,
}
// Helper function to read and assign a value based on the key
readAndAssignValue := func(key string) error {
fieldPtr, exists := respMapping[key]
if !exists {
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
}
// Read the integer and assign to the field via pointer dereferencing
val, err := rd.ReadInt()
if err != nil {
return err
}
*fieldPtr = val
return nil
}
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
if len(cmd.args) > 2 && readType == proto.RespArray {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
if key, ok := cmd.args[2].(string); ok && n == 1 {
if err := readAndAssignValue(key); err != nil {
return err
}
} else {
return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
}
} else {
n, err := rd.ReadMapLen() n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
for i := 0; i < n; i++ {
var key string key, err := rd.ReadString()
var result BFInfo
for f := 0; f < n; f++ {
key, err = rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
if err := readAndAssignValue(key); err != nil {
switch key {
case "Capacity":
result.Capacity, err = rd.ReadInt()
case "Size":
result.Size, err = rd.ReadInt()
case "Number of filters":
result.Filters, err = rd.ReadInt()
case "Number of items inserted":
result.ItemsInserted, err = rd.ReadInt()
case "Expansion rate":
result.ExpansionRate, err = rd.ReadInt()
default:
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
}
if err != nil {
return err return err
} }
} }
}
cmd.val = result cmd.val = result
return nil return nil

View File

@ -13,15 +13,32 @@ import (
var _ = Describe("Probabilistic commands", Label("probabilistic"), func() { var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
ctx := context.TODO() ctx := context.TODO()
setupRedisClient := func(protocolVersion int) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 0,
Protocol: protocolVersion,
})
}
protocols := []int{2, 3}
for _, protocol := range protocols {
protocol := protocol // capture loop variable for each context
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
var client *redis.Client var client *redis.Client
BeforeEach(func() { BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"}) client = setupRedisClient(protocol)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
}) })
AfterEach(func() { AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred()) if client != nil {
client.FlushDB(ctx)
client.Close()
}
}) })
Describe("bloom", Label("bloom"), func() { Describe("bloom", Label("bloom"), func() {
@ -117,7 +134,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
NoCreate: true, NoCreate: true,
} }
resultInsert, err := client.BFInsert(ctx, "testbf1", options, "item1").Result() _, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError("ERR not found")) Expect(err).To(MatchError("ERR not found"))
@ -129,7 +146,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
NoCreate: false, NoCreate: false,
} }
resultInsert, err = client.BFInsert(ctx, "testbf1", options, "item1").Result() resultInsert, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(resultInsert)).To(BeEquivalentTo(1)) Expect(len(resultInsert)).To(BeEquivalentTo(1))
@ -396,7 +413,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
NoCreate: true, NoCreate: true,
} }
result, err := client.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result() _, err := client.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
args = &redis.CFInsertOptions{ args = &redis.CFInsertOptions{
@ -404,7 +421,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
NoCreate: false, NoCreate: false,
} }
result, err = client.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result() result, err := client.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(3)) Expect(len(result)).To(BeEquivalentTo(3))
Expect(result[0]).To(BeEquivalentTo(int64(1))) Expect(result[0]).To(BeEquivalentTo(int64(1)))
@ -731,3 +748,5 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
}) })
}) })
}) })
}
})

View File

@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
} }
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) { func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr) return hs.current.dial(ctx, network, addr)
} }

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
})) }))
}) })
}) })
var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client
const dialSimulatedDelay = 1 * time.Second
BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup
const concurrency = 10
durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)
start := time.Now()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}
wg.Wait()
close(durations)
close(errs)
// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}
// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}
// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})

View File

@ -2,6 +2,7 @@ package redis_test
import ( import (
"context" "context"
"fmt"
"strings" "strings"
. "github.com/bsm/ginkgo/v2" . "github.com/bsm/ginkgo/v2"
@ -12,15 +13,33 @@ import (
var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
ctx := context.TODO() ctx := context.TODO()
setupRedisClient := func(protocolVersion int) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 0,
Protocol: protocolVersion,
UnstableResp3: true,
})
}
protocols := []int{2, 3}
for _, protocol := range protocols {
protocol := protocol // capture loop variable for each context
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
var client *redis.Client var client *redis.Client
BeforeEach(func() { BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: rediStackAddr}) client = setupRedisClient(protocol)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
}) })
AfterEach(func() { AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred()) if client != nil {
client.FlushDB(ctx)
client.Close()
}
}) })
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() { It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
@ -42,7 +61,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(result).To(BeEquivalentTo("OK")) Expect(result).To(BeEquivalentTo("OK"))
resultInfo, err := client.TSInfo(ctx, "4").Result() resultInfo, err := client.TSInfo(ctx, "4").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
} else {
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
}
// Test chunk size // Test chunk size
opt = &redis.TSOptions{ChunkSize: 128} opt = &redis.TSOptions{ChunkSize: 128}
result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result() result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result()
@ -134,7 +157,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(result).To(BeEquivalentTo(4)) Expect(result).To(BeEquivalentTo(4))
resultInfo, err := client.TSInfo(ctx, "4").Result() resultInfo, err := client.TSInfo(ctx, "4").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"].([]interface{})).To(ContainElement([]interface{}{"Time", "Series"}))
} else {
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
}
// Test chunk size // Test chunk size
opt = &redis.TSOptions{ChunkSize: 128} opt = &redis.TSOptions{ChunkSize: 128}
result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result() result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result()
@ -223,7 +250,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "1").Result() resultInfo, err = client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
}
opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}} opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}}
resultAlter, err = client.TSAlter(ctx, "1", opt).Result() resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
@ -232,9 +263,15 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "1").Result() resultInfo, err = client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
} else {
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10)) Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil)) Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
}
opt = &redis.TSAlterOptions{DuplicatePolicy: "min"} opt = &redis.TSAlterOptions{DuplicatePolicy: "min"}
resultAlter, err = client.TSAlter(ctx, "1", opt).Result() resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -304,7 +341,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(resultDeleteRule).To(BeEquivalentTo("OK")) Expect(resultDeleteRule).To(BeEquivalentTo("OK"))
resultInfo, err := client.TSInfo(ctx, "1").Result() resultInfo, err := client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["rules"]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
}
}) })
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() { It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
@ -501,12 +542,21 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err := client.TSMGet(ctx, []string{"Test=This"}).Result() result, err := client.TSMGet(ctx, []string{"Test=This"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo("15"))
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo("25"))
} else {
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15)) Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15))
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25)) Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25))
}
mgetOpt := &redis.TSMGetOptions{WithLabels: true} mgetOpt := &redis.TSMGetOptions{WithLabels: true}
result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result() result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["b"][0]).To(ConsistOf([]interface{}{"Test", "This"}, []interface{}{"Taste", "That"}))
} else {
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"})) Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"}))
}
resultCreate, err = client.TSCreate(ctx, "c").Result() resultCreate, err = client.TSCreate(ctx, "c").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -528,11 +578,19 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result() result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), "4"}))
} else {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0})) Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0}))
}
mgetOpt = &redis.TSMGetOptions{Latest: true} mgetOpt = &redis.TSMGetOptions{Latest: true}
result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result() result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), "8"}))
} else {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0})) Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0}))
}
}) })
It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() { It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() {
@ -830,12 +888,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result() result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
}
// Test Count // Test Count
mrangeOpt := &redis.TSMRangeOptions{Count: 10} mrangeOpt := &redis.TSMRangeOptions{Count: 10}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
}
// Test Aggregation and BucketDuration // Test Aggregation and BucketDuration
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result() _, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
@ -845,19 +911,36 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
}
// Test WithLabels // Test WithLabels
if client.Options().Protocol == 2 {
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
}
mrangeOpt = &redis.TSMRangeOptions{WithLabels: true} mrangeOpt = &redis.TSMRangeOptions{WithLabels: true}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
}
// Test SelectedLabels // Test SelectedLabels
mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}} mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"})) Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
}
// Test FilterBy // Test FilterBy
fts := make([]int, 0) fts := make([]int, 0)
for i := 10; i < 20; i++ { for i := 10; i < 20; i++ {
@ -866,34 +949,58 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1].([]interface{})).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), "1"}, []interface{}{int64(16), "2"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}}))
}
// Test GroupBy // Test GroupBy
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"} mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "2"}, []interface{}{int64(2), "4"}, []interface{}{int64(3), "6"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}}))
}
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"} mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"}
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
}
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"} mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"}
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
} else {
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
}
// Test Align // Test Align
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"} mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "10"}, []interface{}{int64(10), "1"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}}))
}
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5} mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5}
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "5"}, []interface{}{int64(5), "6"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}}))
}
}) })
It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() { It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() {
@ -940,8 +1047,13 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt := &redis.TSMRangeOptions{Latest: true} mrangeOpt := &redis.TSMRangeOptions{Latest: true}
result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result() result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["b"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
Expect(result["d"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
} else {
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}})) Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}})) Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
}
}) })
It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() { It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() {
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}} createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
@ -962,12 +1074,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result() result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
}
// Test Count // Test Count
mrangeOpt := &redis.TSMRevRangeOptions{Count: 10} mrangeOpt := &redis.TSMRevRangeOptions{Count: 10}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
}
// Test Aggregation and BucketDuration // Test Aggregation and BucketDuration
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result() _, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
@ -977,20 +1097,32 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
// Test WithLabels }
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true} mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
}
// Test SelectedLabels // Test SelectedLabels
mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}} mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"})) Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
}
// Test FilterBy // Test FilterBy
fts := make([]int, 0) fts := make([]int, 0)
for i := 10; i < 20; i++ { for i := 10; i < 20; i++ {
@ -999,34 +1131,56 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1].([]interface{})).To(ConsistOf([]interface{}{int64(16), "2"}, []interface{}{int64(15), "1"}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}}))
}
// Test GroupBy // Test GroupBy
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"} mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "6"}, []interface{}{int64(2), "4"}, []interface{}{int64(1), "2"}, []interface{}{int64(0), "0"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}}))
}
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"} mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
}
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"} mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
} else {
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
}
// Test Align // Test Align
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"} mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "1"}, []interface{}{int64(0), "10"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}}))
}
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1} mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), "10"}, []interface{}{int64(0), "1"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}}))
}
}) })
It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() { It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() {
@ -1073,7 +1227,14 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt := &redis.TSMRevRangeOptions{Latest: true} mrangeOpt := &redis.TSMRevRangeOptions{Latest: true}
result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result() result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["b"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
} else {
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}})) Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}})) Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
}
}) })
}) })
}
})

View File

@ -2,5 +2,5 @@ package redis
// Version is the current release version. // Version is the current release version.
func Version() string { func Version() string {
return "9.6.2" return "9.7.0"
} }