mirror of https://github.com/go-redis/redis.git
Compare commits
14 Commits
19ea36f761
...
3796eb66af
Author | SHA1 | Date |
---|---|---|
dependabot[bot] | 3796eb66af | |
dependabot[bot] | e63669e170 | |
LINKIWI | fc32d0a01d | |
Justin | f1ffb55c9a | |
LINKIWI | 080e051124 | |
ofekshenawa | 930d904205 | |
ofekshenawa | 8b1073d2d6 | |
ofekshenawa | d1b4eaed41 | |
andy-stark-redis | 80c9f5bb77 | |
andy-stark-redis | 1ed936eb09 | |
Vladyslav Vildanov | cc9bcb0c0f | |
andy-stark-redis | a8590e9879 | |
ofekshenawa | 2076975008 | |
dependabot[bot] | 48a8dd05a0 |
|
@ -54,6 +54,7 @@ stunnel
|
|||
SynDump
|
||||
TCP
|
||||
TLS
|
||||
UnstableResp
|
||||
uri
|
||||
URI
|
||||
url
|
||||
|
@ -62,3 +63,5 @@ RedisStack
|
|||
RedisGears
|
||||
RedisTimeseries
|
||||
RediSearch
|
||||
RawResult
|
||||
RawVal
|
|
@ -8,7 +8,7 @@ jobs:
|
|||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Check Spelling
|
||||
uses: rojopolis/spellcheck-github-actions@0.40.0
|
||||
uses: rojopolis/spellcheck-github-actions@0.45.0
|
||||
with:
|
||||
config_path: .github/spellcheck-settings.yml
|
||||
task_name: Markdown
|
||||
|
|
15
README.md
15
README.md
|
@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
|
|||
#### Unstable RESP3 Structures for RediSearch Commands
|
||||
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
|
||||
|
||||
To enable unstable RESP3, set the option in your client configuration:
|
||||
|
||||
```go
|
||||
redis.NewClient(&redis.Options{
|
||||
UnstableResp3: true,
|
||||
})
|
||||
```
|
||||
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
|
||||
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
|
||||
|
||||
```go
|
||||
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
|
||||
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
|
||||
```
|
||||
|
||||
## Contributing
|
||||
|
||||
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!
|
||||
|
|
40
command.go
40
command.go
|
@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
|
|||
switch v := arg.(type) {
|
||||
case string:
|
||||
return v
|
||||
case []byte:
|
||||
return string(v)
|
||||
default:
|
||||
// TODO: consider using appendArg
|
||||
return fmt.Sprint(v)
|
||||
|
@ -1403,11 +1405,18 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
|
|||
}
|
||||
|
||||
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
||||
readType, err := rd.PeekReplyType()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.val = make(map[string][]interface{})
|
||||
|
||||
if readType == proto.RespMap {
|
||||
n, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val = make(map[string][]interface{}, n)
|
||||
for i := 0; i < n; i++ {
|
||||
k, err := rd.ReadString()
|
||||
if err != nil {
|
||||
|
@ -1426,6 +1435,35 @@ func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
|||
cmd.val[k][j] = value
|
||||
}
|
||||
}
|
||||
} else if readType == proto.RespArray {
|
||||
// RESP2 response
|
||||
n, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
// Each entry in this array is itself an array with key details
|
||||
itemLen, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val[key] = make([]interface{}, 0, itemLen-1)
|
||||
for j := 1; j < itemLen; j++ {
|
||||
// Read the inner array for timestamp-value pairs
|
||||
data, err := rd.ReadReply()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val[key] = append(cmd.val[key], data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
// EXAMPLE: go_home_json
|
||||
// HIDE_START
|
||||
package example_commands_test
|
||||
|
||||
// HIDE_END
|
||||
// STEP_START import
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// STEP_END
|
||||
|
||||
func ExampleClient_search_json() {
|
||||
// STEP_START connect
|
||||
ctx := context.Background()
|
||||
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
Password: "", // no password docs
|
||||
DB: 0, // use default DB
|
||||
Protocol: 2,
|
||||
})
|
||||
// STEP_END
|
||||
// REMOVE_START
|
||||
rdb.Del(ctx, "user:1", "user:2", "user:3")
|
||||
rdb.FTDropIndex(ctx, "idx:users")
|
||||
// REMOVE_END
|
||||
|
||||
// STEP_START create_data
|
||||
user1 := map[string]interface{}{
|
||||
"name": "Paul John",
|
||||
"email": "paul.john@example.com",
|
||||
"age": 42,
|
||||
"city": "London",
|
||||
}
|
||||
|
||||
user2 := map[string]interface{}{
|
||||
"name": "Eden Zamir",
|
||||
"email": "eden.zamir@example.com",
|
||||
"age": 29,
|
||||
"city": "Tel Aviv",
|
||||
}
|
||||
|
||||
user3 := map[string]interface{}{
|
||||
"name": "Paul Zamir",
|
||||
"email": "paul.zamir@example.com",
|
||||
"age": 35,
|
||||
"city": "Tel Aviv",
|
||||
}
|
||||
// STEP_END
|
||||
|
||||
// STEP_START make_index
|
||||
_, err := rdb.FTCreate(
|
||||
ctx,
|
||||
"idx:users",
|
||||
// Options:
|
||||
&redis.FTCreateOptions{
|
||||
OnJSON: true,
|
||||
Prefix: []interface{}{"user:"},
|
||||
},
|
||||
// Index schema fields:
|
||||
&redis.FieldSchema{
|
||||
FieldName: "$.name",
|
||||
As: "name",
|
||||
FieldType: redis.SearchFieldTypeText,
|
||||
},
|
||||
&redis.FieldSchema{
|
||||
FieldName: "$.city",
|
||||
As: "city",
|
||||
FieldType: redis.SearchFieldTypeTag,
|
||||
},
|
||||
&redis.FieldSchema{
|
||||
FieldName: "$.age",
|
||||
As: "age",
|
||||
FieldType: redis.SearchFieldTypeNumeric,
|
||||
},
|
||||
).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// STEP_END
|
||||
|
||||
// STEP_START add_data
|
||||
_, err = rdb.JSONSet(ctx, "user:1", "$", user1).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = rdb.JSONSet(ctx, "user:2", "$", user2).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = rdb.JSONSet(ctx, "user:3", "$", user3).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// STEP_END
|
||||
|
||||
// STEP_START query1
|
||||
findPaulResult, err := rdb.FTSearch(
|
||||
ctx,
|
||||
"idx:users",
|
||||
"Paul @age:[30 40]",
|
||||
).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println(findPaulResult)
|
||||
// >>> {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv"...
|
||||
// STEP_END
|
||||
|
||||
// STEP_START query2
|
||||
citiesResult, err := rdb.FTSearchWithArgs(
|
||||
ctx,
|
||||
"idx:users",
|
||||
"Paul",
|
||||
&redis.FTSearchOptions{
|
||||
Return: []redis.FTSearchReturn{
|
||||
{
|
||||
FieldName: "$.city",
|
||||
As: "city",
|
||||
},
|
||||
},
|
||||
},
|
||||
).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
sort.Slice(citiesResult.Docs, func(i, j int) bool {
|
||||
return citiesResult.Docs[i].Fields["city"] < citiesResult.Docs[j].Fields["city"]
|
||||
})
|
||||
|
||||
for _, result := range citiesResult.Docs {
|
||||
fmt.Println(result.Fields["city"])
|
||||
}
|
||||
// >>> London
|
||||
// >>> Tel Aviv
|
||||
// STEP_END
|
||||
|
||||
// STEP_START query3
|
||||
aggOptions := redis.FTAggregateOptions{
|
||||
GroupBy: []redis.FTAggregateGroupBy{
|
||||
{
|
||||
Fields: []interface{}{"@city"},
|
||||
Reduce: []redis.FTAggregateReducer{
|
||||
{
|
||||
Reducer: redis.SearchCount,
|
||||
As: "count",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
aggResult, err := rdb.FTAggregateWithArgs(
|
||||
ctx,
|
||||
"idx:users",
|
||||
"*",
|
||||
&aggOptions,
|
||||
).Result()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
sort.Slice(aggResult.Rows, func(i, j int) bool {
|
||||
return aggResult.Rows[i].Fields["city"].(string) <
|
||||
aggResult.Rows[j].Fields["city"].(string)
|
||||
})
|
||||
|
||||
for _, row := range aggResult.Rows {
|
||||
fmt.Printf("%v - %v\n",
|
||||
row.Fields["city"], row.Fields["count"],
|
||||
)
|
||||
}
|
||||
// >>> City: London - 1
|
||||
// >>> City: Tel Aviv - 2
|
||||
// STEP_END
|
||||
|
||||
// Output:
|
||||
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
|
||||
// London
|
||||
// Tel Aviv
|
||||
// London - 1
|
||||
// Tel Aviv - 2
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -5,7 +5,7 @@ go 1.18
|
|||
replace github.com/redis/go-redis/v9 => ../..
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
go.uber.org/zap v1.24.0
|
||||
)
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ go 1.18
|
|||
|
||||
replace github.com/redis/go-redis/v9 => ../..
|
||||
|
||||
require github.com/redis/go-redis/v9 v9.6.1
|
||||
require github.com/redis/go-redis/v9 v9.6.2
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
|
|
@ -4,7 +4,7 @@ go 1.18
|
|||
|
||||
replace github.com/redis/go-redis/v9 => ../..
|
||||
|
||||
require github.com/redis/go-redis/v9 v9.6.1
|
||||
require github.com/redis/go-redis/v9 v9.6.2
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
|
|
@ -9,8 +9,8 @@ replace github.com/redis/go-redis/extra/redisotel/v9 => ../../extra/redisotel
|
|||
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../../extra/rediscmd
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/extra/redisotel/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/extra/redisotel/v9 v9.6.2
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
github.com/uptrace/uptrace-go v1.21.0
|
||||
go.opentelemetry.io/otel v1.22.0
|
||||
)
|
||||
|
@ -23,7 +23,7 @@ require (
|
|||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.1 // indirect
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
|
||||
|
|
|
@ -4,7 +4,7 @@ go 1.18
|
|||
|
||||
replace github.com/redis/go-redis/v9 => ../..
|
||||
|
||||
require github.com/redis/go-redis/v9 v9.6.1
|
||||
require github.com/redis/go-redis/v9 v9.6.2
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
|
|
@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
|
|||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
|
@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
|
|||
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
go.opencensus.io v0.24.0
|
||||
)
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ replace github.com/redis/go-redis/v9 => ../..
|
|||
require (
|
||||
github.com/bsm/ginkgo/v2 v2.12.0
|
||||
github.com/bsm/gomega v1.27.10
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
|
@ -7,8 +7,8 @@ replace github.com/redis/go-redis/v9 => ../..
|
|||
replace github.com/redis/go-redis/extra/rediscmd/v9 => ../rediscmd
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
go.opentelemetry.io/otel v1.22.0
|
||||
go.opentelemetry.io/otel/metric v1.22.0
|
||||
go.opentelemetry.io/otel/sdk v1.22.0
|
||||
|
|
|
@ -6,7 +6,7 @@ replace github.com/redis/go-redis/v9 => ../..
|
|||
|
||||
require (
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
github.com/redis/go-redis/v9 v9.6.2
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
2
go.mod
2
go.mod
|
@ -5,7 +5,7 @@ go 1.18
|
|||
require (
|
||||
github.com/bsm/ginkgo/v2 v2.12.0
|
||||
github.com/bsm/gomega v1.27.10
|
||||
github.com/cespare/xxhash/v2 v2.2.0
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
|
||||
)
|
||||
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2,7 +2,7 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
|||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
|
|
8
json.go
8
json.go
|
@ -60,7 +60,7 @@ type JSONArrTrimArgs struct {
|
|||
type JSONCmd struct {
|
||||
baseCmd
|
||||
val string
|
||||
expanded []interface{}
|
||||
expanded interface{}
|
||||
}
|
||||
|
||||
var _ Cmder = (*JSONCmd)(nil)
|
||||
|
@ -100,11 +100,11 @@ func (cmd *JSONCmd) Result() (string, error) {
|
|||
return cmd.Val(), cmd.Err()
|
||||
}
|
||||
|
||||
func (cmd JSONCmd) Expanded() (interface{}, error) {
|
||||
func (cmd *JSONCmd) Expanded() (interface{}, error) {
|
||||
if len(cmd.val) != 0 && cmd.expanded == nil {
|
||||
err := json.Unmarshal([]byte(cmd.val), &cmd.expanded)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -494,7 +494,7 @@ func (c cmdable) JSONMSet(ctx context.Context, params ...interface{}) *StatusCmd
|
|||
}
|
||||
|
||||
// JSONNumIncrBy increments the number value stored at the specified path by the provided number.
|
||||
// For more information, see https://redis.io/commands/json.numincreby
|
||||
// For more information, see https://redis.io/docs/latest/commands/json.numincrby/
|
||||
func (c cmdable) JSONNumIncrBy(ctx context.Context, key, path string, value float64) *JSONCmd {
|
||||
args := []interface{}{"JSON.NUMINCRBY", key, path, value}
|
||||
cmd := newJSONCmd(ctx, args...)
|
||||
|
|
166
json_test.go
166
json_test.go
|
@ -2,6 +2,8 @@ package redis_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
. "github.com/bsm/ginkgo/v2"
|
||||
. "github.com/bsm/gomega"
|
||||
|
@ -17,13 +19,27 @@ var _ = Describe("JSON Commands", Label("json"), func() {
|
|||
ctx := context.TODO()
|
||||
var client *redis.Client
|
||||
|
||||
BeforeEach(func() {
|
||||
client = redis.NewClient(&redis.Options{Addr: ":6379"})
|
||||
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
|
||||
setupRedisClient := func(protocolVersion int) *redis.Client {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 0,
|
||||
Protocol: protocolVersion,
|
||||
UnstableResp3: true,
|
||||
})
|
||||
}
|
||||
|
||||
AfterEach(func() {
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
if client != nil {
|
||||
client.FlushDB(ctx)
|
||||
client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
protocols := []int{2, 3}
|
||||
for _, protocol := range protocols {
|
||||
BeforeEach(func() {
|
||||
client = setupRedisClient(protocol)
|
||||
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
Describe("arrays", Label("arrays"), func() {
|
||||
|
@ -657,4 +673,146 @@ var _ = Describe("JSON Commands", Label("json"), func() {
|
|||
Expect(cmd2.Val()[0]).To(Or(Equal([]interface{}{"boolean"}), Equal("boolean")))
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
var _ = Describe("Go-Redis Advanced JSON and RediSearch Tests", func() {
|
||||
var client *redis.Client
|
||||
var ctx = context.Background()
|
||||
|
||||
setupRedisClient := func(protocolVersion int) *redis.Client {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 0,
|
||||
Protocol: protocolVersion, // Setting RESP2 or RESP3 protocol
|
||||
UnstableResp3: true, // Enable RESP3 features
|
||||
})
|
||||
}
|
||||
|
||||
AfterEach(func() {
|
||||
if client != nil {
|
||||
client.FlushDB(ctx)
|
||||
client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
Context("when testing with RESP2 and RESP3", func() {
|
||||
protocols := []int{2, 3}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
When("using protocol version", func() {
|
||||
BeforeEach(func() {
|
||||
client = setupRedisClient(protocol)
|
||||
})
|
||||
|
||||
It("should perform complex JSON and RediSearch operations", func() {
|
||||
jsonDoc := map[string]interface{}{
|
||||
"person": map[string]interface{}{
|
||||
"name": "Alice",
|
||||
"age": 30,
|
||||
"status": true,
|
||||
"address": map[string]interface{}{
|
||||
"city": "Wonderland",
|
||||
"postcode": "12345",
|
||||
},
|
||||
"contacts": []map[string]interface{}{
|
||||
{"type": "email", "value": "alice@example.com"},
|
||||
{"type": "phone", "value": "+123456789"},
|
||||
{"type": "fax", "value": "+987654321"},
|
||||
},
|
||||
"friends": []map[string]interface{}{
|
||||
{"name": "Bob", "age": 35, "status": true},
|
||||
{"name": "Charlie", "age": 28, "status": false},
|
||||
},
|
||||
},
|
||||
"settings": map[string]interface{}{
|
||||
"notifications": map[string]interface{}{
|
||||
"email": true,
|
||||
"sms": false,
|
||||
"alerts": []string{"low battery", "door open"},
|
||||
},
|
||||
"theme": "dark",
|
||||
},
|
||||
}
|
||||
|
||||
setCmd := client.JSONSet(ctx, "person:1", ".", jsonDoc)
|
||||
Expect(setCmd.Err()).NotTo(HaveOccurred(), "JSON.SET failed")
|
||||
|
||||
getCmdRaw := client.JSONGet(ctx, "person:1", ".")
|
||||
rawJSON, err := getCmdRaw.Result()
|
||||
Expect(err).NotTo(HaveOccurred(), "JSON.GET (raw) failed")
|
||||
GinkgoWriter.Printf("Raw JSON: %s\n", rawJSON)
|
||||
|
||||
getCmdExpanded := client.JSONGet(ctx, "person:1", ".")
|
||||
expandedJSON, err := getCmdExpanded.Expanded()
|
||||
Expect(err).NotTo(HaveOccurred(), "JSON.GET (expanded) failed")
|
||||
GinkgoWriter.Printf("Expanded JSON: %+v\n", expandedJSON)
|
||||
|
||||
Expect(rawJSON).To(MatchJSON(jsonMustMarshal(expandedJSON)))
|
||||
|
||||
arrAppendCmd := client.JSONArrAppend(ctx, "person:1", "$.person.contacts", `{"type": "social", "value": "@alice_wonder"}`)
|
||||
Expect(arrAppendCmd.Err()).NotTo(HaveOccurred(), "JSON.ARRAPPEND failed")
|
||||
arrLenCmd := client.JSONArrLen(ctx, "person:1", "$.person.contacts")
|
||||
arrLen, err := arrLenCmd.Result()
|
||||
Expect(err).NotTo(HaveOccurred(), "JSON.ARRLEN failed")
|
||||
Expect(arrLen).To(Equal([]int64{4}), "Array length mismatch after append")
|
||||
|
||||
arrInsertCmd := client.JSONArrInsert(ctx, "person:1", "$.person.friends", 1, `{"name": "Diana", "age": 25, "status": true}`)
|
||||
Expect(arrInsertCmd.Err()).NotTo(HaveOccurred(), "JSON.ARRINSERT failed")
|
||||
|
||||
start := 0
|
||||
stop := 1
|
||||
arrTrimCmd := client.JSONArrTrimWithArgs(ctx, "person:1", "$.person.friends", &redis.JSONArrTrimArgs{Start: start, Stop: &stop})
|
||||
Expect(arrTrimCmd.Err()).NotTo(HaveOccurred(), "JSON.ARRTRIM failed")
|
||||
|
||||
mergeData := map[string]interface{}{
|
||||
"status": false,
|
||||
"nickname": "WonderAlice",
|
||||
"lastLogin": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
mergeCmd := client.JSONMerge(ctx, "person:1", "$.person", jsonMustMarshal(mergeData))
|
||||
Expect(mergeCmd.Err()).NotTo(HaveOccurred(), "JSON.MERGE failed")
|
||||
|
||||
typeCmd := client.JSONType(ctx, "person:1", "$.person.nickname")
|
||||
nicknameType, err := typeCmd.Result()
|
||||
Expect(err).NotTo(HaveOccurred(), "JSON.TYPE failed")
|
||||
Expect(nicknameType[0]).To(Equal([]interface{}{"string"}), "JSON.TYPE mismatch for nickname")
|
||||
|
||||
createIndexCmd := client.Do(ctx, "FT.CREATE", "person_idx", "ON", "JSON",
|
||||
"PREFIX", "1", "person:", "SCHEMA",
|
||||
"$.person.name", "AS", "name", "TEXT",
|
||||
"$.person.age", "AS", "age", "NUMERIC",
|
||||
"$.person.address.city", "AS", "city", "TEXT",
|
||||
"$.person.contacts[*].value", "AS", "contact_value", "TEXT",
|
||||
)
|
||||
Expect(createIndexCmd.Err()).NotTo(HaveOccurred(), "FT.CREATE failed")
|
||||
|
||||
searchCmd := client.FTSearchWithArgs(ctx, "person_idx", "@contact_value:(alice\\@example\\.com alice_wonder)", &redis.FTSearchOptions{Return: []redis.FTSearchReturn{{FieldName: "$.person.name"}, {FieldName: "$.person.age"}, {FieldName: "$.person.address.city"}}})
|
||||
searchResult, err := searchCmd.Result()
|
||||
Expect(err).NotTo(HaveOccurred(), "FT.SEARCH failed")
|
||||
GinkgoWriter.Printf("Advanced Search result: %+v\n", searchResult)
|
||||
|
||||
incrCmd := client.JSONNumIncrBy(ctx, "person:1", "$.person.age", 5)
|
||||
incrResult, err := incrCmd.Result()
|
||||
Expect(err).NotTo(HaveOccurred(), "JSON.NUMINCRBY failed")
|
||||
Expect(incrResult).To(Equal("[35]"), "Age increment mismatch")
|
||||
|
||||
delCmd := client.JSONDel(ctx, "person:1", "$.settings.notifications.email")
|
||||
Expect(delCmd.Err()).NotTo(HaveOccurred(), "JSON.DEL failed")
|
||||
|
||||
typeCmd = client.JSONType(ctx, "person:1", "$.settings.notifications.email")
|
||||
typeResult, err := typeCmd.Result()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(typeResult[0]).To(BeEmpty(), "Expected JSON.TYPE to be empty for deleted field")
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Helper function to marshal data into JSON for comparisons
|
||||
func jsonMustMarshal(v interface{}) string {
|
||||
bytes, err := json.Marshal(v)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
return string(bytes)
|
||||
}
|
||||
|
|
|
@ -21,6 +21,10 @@ import (
|
|||
"github.com/redis/go-redis/v9/internal/rand"
|
||||
)
|
||||
|
||||
const (
|
||||
minLatencyMeasurementInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||
|
||||
// ClusterOptions are used to configure a cluster client and should be
|
||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
|||
latency uint32 // atomic
|
||||
generation uint32 // atomic
|
||||
failing uint32 // atomic
|
||||
|
||||
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||
// from epoch
|
||||
lastLatencyMeasurement int64 // atomic
|
||||
}
|
||||
|
||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
|||
latency = float64(dur) / float64(successes)
|
||||
}
|
||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||
n.SetLastLatencyMeasurement(time.Now())
|
||||
}
|
||||
|
||||
func (n *clusterNode) Latency() time.Duration {
|
||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
|||
return atomic.LoadUint32(&n.generation)
|
||||
}
|
||||
|
||||
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||
}
|
||||
|
||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||
for {
|
||||
v := atomic.LoadUint32(&n.generation)
|
||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||
for {
|
||||
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type clusterNodes struct {
|
||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
|||
c.mu.Lock()
|
||||
|
||||
c.activeAddrs = c.activeAddrs[:0]
|
||||
now := time.Now()
|
||||
for addr, node := range c.nodes {
|
||||
if node.Generation() >= generation {
|
||||
c.activeAddrs = append(c.activeAddrs, addr)
|
||||
if c.opt.RouteByLatency {
|
||||
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||
go node.updateLatency()
|
||||
}
|
||||
continue
|
||||
|
|
|
@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
|
|||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("determines hash slots correctly for generic commands", func() {
|
||||
opt := redisClusterOptions()
|
||||
opt.MaxRedirects = -1
|
||||
client := cluster.newClusterClient(ctx, opt)
|
||||
|
||||
err := client.Do(ctx, "GET", "A").Err()
|
||||
Expect(err).To(Equal(redis.Nil))
|
||||
|
||||
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||
Expect(err).To(Equal(redis.Nil))
|
||||
|
||||
Eventually(func() error {
|
||||
return client.SwapNodes(ctx, "A")
|
||||
}, 30*time.Second).ShouldNot(HaveOccurred())
|
||||
|
||||
err = client.Do(ctx, "GET", "A").Err()
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||
|
||||
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("follows node redirection immediately", func() {
|
||||
// Configure retry backoffs far in excess of the expected duration of redirection
|
||||
opt := redisClusterOptions()
|
||||
|
|
|
@ -319,38 +319,70 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
|
|||
}
|
||||
|
||||
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
|
||||
result := BFInfo{}
|
||||
|
||||
// Create a mapping from key names to pointers of struct fields
|
||||
respMapping := map[string]*int64{
|
||||
"Capacity": &result.Capacity,
|
||||
"CAPACITY": &result.Capacity,
|
||||
"Size": &result.Size,
|
||||
"SIZE": &result.Size,
|
||||
"Number of filters": &result.Filters,
|
||||
"FILTERS": &result.Filters,
|
||||
"Number of items inserted": &result.ItemsInserted,
|
||||
"ITEMS": &result.ItemsInserted,
|
||||
"Expansion rate": &result.ExpansionRate,
|
||||
"EXPANSION": &result.ExpansionRate,
|
||||
}
|
||||
|
||||
// Helper function to read and assign a value based on the key
|
||||
readAndAssignValue := func(key string) error {
|
||||
fieldPtr, exists := respMapping[key]
|
||||
if !exists {
|
||||
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
|
||||
}
|
||||
|
||||
// Read the integer and assign to the field via pointer dereferencing
|
||||
val, err := rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*fieldPtr = val
|
||||
return nil
|
||||
}
|
||||
|
||||
readType, err := rd.PeekReplyType()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(cmd.args) > 2 && readType == proto.RespArray {
|
||||
n, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if key, ok := cmd.args[2].(string); ok && n == 1 {
|
||||
if err := readAndAssignValue(key); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
|
||||
}
|
||||
} else {
|
||||
n, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var key string
|
||||
var result BFInfo
|
||||
for f := 0; f < n; f++ {
|
||||
key, err = rd.ReadString()
|
||||
for i := 0; i < n; i++ {
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch key {
|
||||
case "Capacity":
|
||||
result.Capacity, err = rd.ReadInt()
|
||||
case "Size":
|
||||
result.Size, err = rd.ReadInt()
|
||||
case "Number of filters":
|
||||
result.Filters, err = rd.ReadInt()
|
||||
case "Number of items inserted":
|
||||
result.ItemsInserted, err = rd.ReadInt()
|
||||
case "Expansion rate":
|
||||
result.ExpansionRate, err = rd.ReadInt()
|
||||
default:
|
||||
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err := readAndAssignValue(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cmd.val = result
|
||||
return nil
|
||||
|
|
|
@ -13,15 +13,32 @@ import (
|
|||
|
||||
var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||
ctx := context.TODO()
|
||||
|
||||
setupRedisClient := func(protocolVersion int) *redis.Client {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 0,
|
||||
Protocol: protocolVersion,
|
||||
})
|
||||
}
|
||||
|
||||
protocols := []int{2, 3}
|
||||
for _, protocol := range protocols {
|
||||
protocol := protocol // capture loop variable for each context
|
||||
|
||||
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
|
||||
var client *redis.Client
|
||||
|
||||
BeforeEach(func() {
|
||||
client = redis.NewClient(&redis.Options{Addr: ":6379"})
|
||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||
client = setupRedisClient(protocol)
|
||||
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
if client != nil {
|
||||
client.FlushDB(ctx)
|
||||
client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
Describe("bloom", Label("bloom"), func() {
|
||||
|
@ -117,7 +134,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
|||
NoCreate: true,
|
||||
}
|
||||
|
||||
resultInsert, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
||||
_, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err).To(MatchError("ERR not found"))
|
||||
|
||||
|
@ -129,7 +146,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
|||
NoCreate: false,
|
||||
}
|
||||
|
||||
resultInsert, err = client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
||||
resultInsert, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(resultInsert)).To(BeEquivalentTo(1))
|
||||
|
||||
|
@ -396,7 +413,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
|||
NoCreate: true,
|
||||
}
|
||||
|
||||
result, err := client.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result()
|
||||
_, err := client.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result()
|
||||
Expect(err).To(HaveOccurred())
|
||||
|
||||
args = &redis.CFInsertOptions{
|
||||
|
@ -404,7 +421,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
|||
NoCreate: false,
|
||||
}
|
||||
|
||||
result, err = client.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result()
|
||||
result, err := client.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(3))
|
||||
Expect(result[0]).To(BeEquivalentTo(int64(1)))
|
||||
|
@ -730,4 +747,6 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
|||
Expect(max).To(BeEquivalentTo(float64(140)))
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
|
|
2
redis.go
2
redis.go
|
@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
|
|||
}
|
||||
|
||||
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
hs.hooksMu.Lock()
|
||||
defer hs.hooksMu.Unlock()
|
||||
return hs.current.dial(ctx, network, addr)
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
|
|||
}))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("Dialer connection timeouts", func() {
|
||||
var client *redis.Client
|
||||
|
||||
const dialSimulatedDelay = 1 * time.Second
|
||||
|
||||
BeforeEach(func() {
|
||||
options := redisOptions()
|
||||
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
// Simulated slow dialer.
|
||||
// Note that the following sleep is deliberately not context-aware.
|
||||
time.Sleep(dialSimulatedDelay)
|
||||
return net.Dial("tcp", options.Addr)
|
||||
}
|
||||
options.MinIdleConns = 1
|
||||
client = redis.NewClient(options)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
err := client.Close()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("does not contend on connection dial for concurrent commands", func() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
const concurrency = 10
|
||||
|
||||
durations := make(chan time.Duration, concurrency)
|
||||
errs := make(chan error, concurrency)
|
||||
|
||||
start := time.Now()
|
||||
wg.Add(concurrency)
|
||||
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
start := time.Now()
|
||||
err := client.Ping(ctx).Err()
|
||||
durations <- time.Since(start)
|
||||
errs <- err
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(durations)
|
||||
close(errs)
|
||||
|
||||
// All commands should eventually succeed, after acquiring a connection.
|
||||
for err := range errs {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Each individual command should complete within the simulated dial duration bound.
|
||||
for duration := range durations {
|
||||
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||
}
|
||||
|
||||
// Due to concurrent execution, the entire test suite should also complete within
|
||||
// the same dial duration bound applied for individual commands.
|
||||
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||
})
|
||||
})
|
||||
|
|
|
@ -16,7 +16,7 @@ type SearchCmdable interface {
|
|||
FTAliasAdd(ctx context.Context, index string, alias string) *StatusCmd
|
||||
FTAliasDel(ctx context.Context, alias string) *StatusCmd
|
||||
FTAliasUpdate(ctx context.Context, index string, alias string) *StatusCmd
|
||||
FTAlter(ctx context.Context, index string, skipInitalScan bool, definition []interface{}) *StatusCmd
|
||||
FTAlter(ctx context.Context, index string, skipInitialScan bool, definition []interface{}) *StatusCmd
|
||||
FTConfigGet(ctx context.Context, option string) *MapMapStringInterfaceCmd
|
||||
FTConfigSet(ctx context.Context, option string, value interface{}) *StatusCmd
|
||||
FTCreate(ctx context.Context, index string, options *FTCreateOptions, schema ...*FieldSchema) *StatusCmd
|
||||
|
@ -57,7 +57,7 @@ type FTCreateOptions struct {
|
|||
NoFields bool
|
||||
NoFreqs bool
|
||||
StopWords []interface{}
|
||||
SkipInitalScan bool
|
||||
SkipInitialScan bool
|
||||
}
|
||||
|
||||
type FieldSchema struct {
|
||||
|
@ -70,7 +70,7 @@ type FieldSchema struct {
|
|||
NoIndex bool
|
||||
PhoneticMatcher string
|
||||
Weight float64
|
||||
Seperator string
|
||||
Separator string
|
||||
CaseSensitive bool
|
||||
WithSuffixtrie bool
|
||||
VectorArgs *FTVectorArgs
|
||||
|
@ -285,7 +285,7 @@ type FTSearchSortBy struct {
|
|||
type FTSearchOptions struct {
|
||||
NoContent bool
|
||||
Verbatim bool
|
||||
NoStopWrods bool
|
||||
NoStopWords bool
|
||||
WithScores bool
|
||||
WithPayloads bool
|
||||
WithSortKeys bool
|
||||
|
@ -808,13 +808,13 @@ func (c cmdable) FTAliasUpdate(ctx context.Context, index string, alias string)
|
|||
}
|
||||
|
||||
// FTAlter - Alters the definition of an existing index.
|
||||
// The 'index' parameter specifies the index to alter, and the 'skipInitalScan' parameter specifies whether to skip the initial scan.
|
||||
// The 'index' parameter specifies the index to alter, and the 'skipInitialScan' parameter specifies whether to skip the initial scan.
|
||||
// The 'definition' parameter specifies the new definition for the index.
|
||||
// For more information, please refer to the Redis documentation:
|
||||
// [FT.ALTER]: (https://redis.io/commands/ft.alter/)
|
||||
func (c cmdable) FTAlter(ctx context.Context, index string, skipInitalScan bool, definition []interface{}) *StatusCmd {
|
||||
func (c cmdable) FTAlter(ctx context.Context, index string, skipInitialScan bool, definition []interface{}) *StatusCmd {
|
||||
args := []interface{}{"FT.ALTER", index}
|
||||
if skipInitalScan {
|
||||
if skipInitialScan {
|
||||
args = append(args, "SKIPINITIALSCAN")
|
||||
}
|
||||
args = append(args, "SCHEMA", "ADD")
|
||||
|
@ -907,7 +907,7 @@ func (c cmdable) FTCreate(ctx context.Context, index string, options *FTCreateOp
|
|||
args = append(args, "STOPWORDS", len(options.StopWords))
|
||||
args = append(args, options.StopWords...)
|
||||
}
|
||||
if options.SkipInitalScan {
|
||||
if options.SkipInitialScan {
|
||||
args = append(args, "SKIPINITIALSCAN")
|
||||
}
|
||||
}
|
||||
|
@ -1003,8 +1003,8 @@ func (c cmdable) FTCreate(ctx context.Context, index string, options *FTCreateOp
|
|||
if schema.Weight > 0 {
|
||||
args = append(args, "WEIGHT", schema.Weight)
|
||||
}
|
||||
if schema.Seperator != "" {
|
||||
args = append(args, "SEPERATOR", schema.Seperator)
|
||||
if schema.Separator != "" {
|
||||
args = append(args, "SEPARATOR", schema.Separator)
|
||||
}
|
||||
if schema.CaseSensitive {
|
||||
args = append(args, "CASESENSITIVE")
|
||||
|
@ -1694,7 +1694,7 @@ func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
|
|||
if options.Verbatim {
|
||||
queryArgs = append(queryArgs, "VERBATIM")
|
||||
}
|
||||
if options.NoStopWrods {
|
||||
if options.NoStopWords {
|
||||
queryArgs = append(queryArgs, "NOSTOPWORDS")
|
||||
}
|
||||
if options.WithScores {
|
||||
|
@ -1808,7 +1808,7 @@ func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query strin
|
|||
if options.Verbatim {
|
||||
args = append(args, "VERBATIM")
|
||||
}
|
||||
if options.NoStopWrods {
|
||||
if options.NoStopWords {
|
||||
args = append(args, "NOSTOPWORDS")
|
||||
}
|
||||
if options.WithScores {
|
||||
|
|
|
@ -637,11 +637,11 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
|
|||
|
||||
})
|
||||
|
||||
It("should FTSearch SkipInitalScan", Label("search", "ftsearch"), func() {
|
||||
It("should FTSearch SkipInitialScan", Label("search", "ftsearch"), func() {
|
||||
client.HSet(ctx, "doc1", "foo", "bar")
|
||||
|
||||
text1 := &redis.FieldSchema{FieldName: "foo", FieldType: redis.SearchFieldTypeText}
|
||||
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{SkipInitalScan: true}, text1).Result()
|
||||
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{SkipInitialScan: true}, text1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(val).To(BeEquivalentTo("OK"))
|
||||
WaitForIndexing(client, "idx1")
|
||||
|
|
|
@ -2,6 +2,7 @@ package redis_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
. "github.com/bsm/ginkgo/v2"
|
||||
|
@ -12,15 +13,33 @@ import (
|
|||
|
||||
var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||
ctx := context.TODO()
|
||||
|
||||
setupRedisClient := func(protocolVersion int) *redis.Client {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 0,
|
||||
Protocol: protocolVersion,
|
||||
UnstableResp3: true,
|
||||
})
|
||||
}
|
||||
|
||||
protocols := []int{2, 3}
|
||||
for _, protocol := range protocols {
|
||||
protocol := protocol // capture loop variable for each context
|
||||
|
||||
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
|
||||
var client *redis.Client
|
||||
|
||||
BeforeEach(func() {
|
||||
client = redis.NewClient(&redis.Options{Addr: rediStackAddr})
|
||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||
client = setupRedisClient(protocol)
|
||||
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
if client != nil {
|
||||
client.FlushDB(ctx)
|
||||
client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
|
||||
|
@ -42,7 +61,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
|
||||
} else {
|
||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||
}
|
||||
// Test chunk size
|
||||
opt = &redis.TSOptions{ChunkSize: 128}
|
||||
result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result()
|
||||
|
@ -134,7 +157,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
Expect(result).To(BeEquivalentTo(4))
|
||||
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(resultInfo["labels"].([]interface{})).To(ContainElement([]interface{}{"Time", "Series"}))
|
||||
} else {
|
||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||
}
|
||||
// Test chunk size
|
||||
opt = &redis.TSOptions{ChunkSize: 128}
|
||||
result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result()
|
||||
|
@ -223,7 +250,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
|
||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(resultInfo["labels"]).To(BeEquivalentTo([]interface{}{}))
|
||||
} else {
|
||||
Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
}
|
||||
|
||||
opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}}
|
||||
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
||||
|
@ -232,9 +263,15 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
|
||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
|
||||
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
||||
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
|
||||
} else {
|
||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
||||
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
|
||||
}
|
||||
opt = &redis.TSAlterOptions{DuplicatePolicy: "min"}
|
||||
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
@ -304,7 +341,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
Expect(resultDeleteRule).To(BeEquivalentTo("OK"))
|
||||
resultInfo, err := client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(resultInfo["rules"]).To(BeEquivalentTo([]interface{}{}))
|
||||
} else {
|
||||
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
}
|
||||
})
|
||||
|
||||
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
|
||||
|
@ -501,12 +542,21 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
|
||||
result, err := client.TSMGet(ctx, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo("15"))
|
||||
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo("25"))
|
||||
} else {
|
||||
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15))
|
||||
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25))
|
||||
}
|
||||
mgetOpt := &redis.TSMGetOptions{WithLabels: true}
|
||||
result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["b"][0]).To(ConsistOf([]interface{}{"Test", "This"}, []interface{}{"Taste", "That"}))
|
||||
} else {
|
||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"}))
|
||||
}
|
||||
|
||||
resultCreate, err = client.TSCreate(ctx, "c").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
@ -528,11 +578,19 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
Expect(err).NotTo(HaveOccurred())
|
||||
result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), "4"}))
|
||||
} else {
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0}))
|
||||
}
|
||||
mgetOpt = &redis.TSMGetOptions{Latest: true}
|
||||
result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), "8"}))
|
||||
} else {
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0}))
|
||||
}
|
||||
})
|
||||
|
||||
It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() {
|
||||
|
@ -830,12 +888,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
|
||||
} else {
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
||||
}
|
||||
// Test Count
|
||||
mrangeOpt := &redis.TSMRangeOptions{Count: 10}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
|
||||
} else {
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
||||
}
|
||||
// Test Aggregation and BucketDuration
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
||||
|
@ -845,19 +911,36 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
|
||||
} else {
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
||||
}
|
||||
// Test WithLabels
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
|
||||
} else {
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
}
|
||||
mrangeOpt = &redis.TSMRangeOptions{WithLabels: true}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
|
||||
} else {
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
||||
}
|
||||
// Test SelectedLabels
|
||||
mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
|
||||
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
|
||||
} else {
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
||||
}
|
||||
// Test FilterBy
|
||||
fts := make([]int, 0)
|
||||
for i := 10; i < 20; i++ {
|
||||
|
@ -866,34 +949,58 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1].([]interface{})).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), "1"}, []interface{}{int64(16), "2"}}))
|
||||
} else {
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}}))
|
||||
}
|
||||
// Test GroupBy
|
||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "2"}, []interface{}{int64(2), "4"}, []interface{}{int64(3), "6"}}))
|
||||
} else {
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}}))
|
||||
|
||||
}
|
||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
|
||||
} else {
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||
}
|
||||
|
||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
|
||||
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
|
||||
} else {
|
||||
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||
}
|
||||
// Test Align
|
||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "10"}, []interface{}{int64(10), "1"}}))
|
||||
} else {
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}}))
|
||||
}
|
||||
|
||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5}
|
||||
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "5"}, []interface{}{int64(5), "6"}}))
|
||||
} else {
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}}))
|
||||
}
|
||||
})
|
||||
|
||||
It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() {
|
||||
|
@ -940,8 +1047,13 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
mrangeOpt := &redis.TSMRangeOptions{Latest: true}
|
||||
result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["b"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
|
||||
Expect(result["d"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
|
||||
} else {
|
||||
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
||||
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
||||
}
|
||||
})
|
||||
It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() {
|
||||
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
|
||||
|
@ -962,12 +1074,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
|
||||
} else {
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
||||
}
|
||||
// Test Count
|
||||
mrangeOpt := &redis.TSMRevRangeOptions{Count: 10}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
|
||||
} else {
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
||||
}
|
||||
// Test Aggregation and BucketDuration
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
||||
|
@ -977,20 +1097,32 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
|
||||
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
|
||||
} else {
|
||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
// Test WithLabels
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
}
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
|
||||
} else {
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
||||
}
|
||||
// Test SelectedLabels
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
|
||||
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
|
||||
} else {
|
||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
||||
}
|
||||
// Test FilterBy
|
||||
fts := make([]int, 0)
|
||||
for i := 10; i < 20; i++ {
|
||||
|
@ -999,34 +1131,56 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1].([]interface{})).To(ConsistOf([]interface{}{int64(16), "2"}, []interface{}{int64(15), "1"}))
|
||||
} else {
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}}))
|
||||
}
|
||||
// Test GroupBy
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "6"}, []interface{}{int64(2), "4"}, []interface{}{int64(1), "2"}, []interface{}{int64(0), "0"}}))
|
||||
} else {
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}}))
|
||||
|
||||
}
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
|
||||
} else {
|
||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||
|
||||
}
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(result)).To(BeEquivalentTo(2))
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
|
||||
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
|
||||
} else {
|
||||
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||
}
|
||||
// Test Align
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "1"}, []interface{}{int64(0), "10"}}))
|
||||
} else {
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}}))
|
||||
|
||||
}
|
||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1}
|
||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), "10"}, []interface{}{int64(0), "1"}}))
|
||||
} else {
|
||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}}))
|
||||
}
|
||||
})
|
||||
|
||||
It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() {
|
||||
|
@ -1073,7 +1227,14 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
mrangeOpt := &redis.TSMRevRangeOptions{Latest: true}
|
||||
result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if client.Options().Protocol == 2 {
|
||||
Expect(result["b"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
|
||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
|
||||
} else {
|
||||
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
||||
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
|
|
|
@ -2,5 +2,5 @@ package redis
|
|||
|
||||
// Version is the current release version.
|
||||
func Version() string {
|
||||
return "9.6.1"
|
||||
return "9.6.2"
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue