Support Resp2 for TimeSeries commands

This commit is contained in:
ofekshenawa 2024-11-05 14:25:04 +02:00
parent 49f128caad
commit 7e81c9c933
2 changed files with 1236 additions and 1049 deletions

View File

@ -1403,11 +1403,18 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
} }
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) { func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
cmd.val = make(map[string][]interface{})
if readType == proto.RespMap {
n, err := rd.ReadMapLen() n, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
cmd.val = make(map[string][]interface{}, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
k, err := rd.ReadString() k, err := rd.ReadString()
if err != nil { if err != nil {
@ -1426,6 +1433,35 @@ func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
cmd.val[k][j] = value cmd.val[k][j] = value
} }
} }
} else if readType == proto.RespArray {
// RESP2 response
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
// Each entry in this array is itself an array with key details
itemLen, err := rd.ReadArrayLen()
if err != nil {
return err
}
key, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[key] = make([]interface{}, 0, itemLen-1)
for j := 1; j < itemLen; j++ {
// Read the inner array for timestamp-value pairs
data, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[key] = append(cmd.val[key], data)
}
}
}
return nil return nil
} }

View File

@ -2,6 +2,7 @@ package redis_test
import ( import (
"context" "context"
"fmt"
"strings" "strings"
. "github.com/bsm/ginkgo/v2" . "github.com/bsm/ginkgo/v2"
@ -12,8 +13,28 @@ import (
var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
ctx := context.TODO() ctx := context.TODO()
setupRedisClient := func(protocolVersion int) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 0,
Protocol: protocolVersion,
UnstableResp3: true,
})
}
protocols := []int{2, 3}
for _, protocol := range protocols {
protocol := protocol // capture loop variable for each context
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
var client *redis.Client var client *redis.Client
BeforeEach(func() {
client = setupRedisClient(protocol)
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
})
AfterEach(func() { AfterEach(func() {
if client != nil { if client != nil {
client.FlushDB(ctx) client.FlushDB(ctx)
@ -21,17 +42,6 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
} }
}) })
protocols := []int{2}
for _, protocol := range protocols {
BeforeEach(func() {
client = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 0,
Protocol: protocol,
})
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
})
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() { It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
result, err := client.TSCreate(ctx, "1").Result() result, err := client.TSCreate(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -51,7 +61,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(result).To(BeEquivalentTo("OK")) Expect(result).To(BeEquivalentTo("OK"))
resultInfo, err := client.TSInfo(ctx, "4").Result() resultInfo, err := client.TSInfo(ctx, "4").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
} else {
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
}
// Test chunk size // Test chunk size
opt = &redis.TSOptions{ChunkSize: 128} opt = &redis.TSOptions{ChunkSize: 128}
result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result() result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result()
@ -143,7 +157,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(result).To(BeEquivalentTo(4)) Expect(result).To(BeEquivalentTo(4))
resultInfo, err := client.TSInfo(ctx, "4").Result() resultInfo, err := client.TSInfo(ctx, "4").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"].([]interface{})).To(ContainElement([]interface{}{"Time", "Series"}))
} else {
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
}
// Test chunk size // Test chunk size
opt = &redis.TSOptions{ChunkSize: 128} opt = &redis.TSOptions{ChunkSize: 128}
result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result() result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result()
@ -232,7 +250,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "1").Result() resultInfo, err = client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
}
opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}} opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}}
resultAlter, err = client.TSAlter(ctx, "1", opt).Result() resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
@ -241,9 +263,15 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "1").Result() resultInfo, err = client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
} else {
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series")) Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10)) Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil)) Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
}
opt = &redis.TSAlterOptions{DuplicatePolicy: "min"} opt = &redis.TSAlterOptions{DuplicatePolicy: "min"}
resultAlter, err = client.TSAlter(ctx, "1", opt).Result() resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -313,7 +341,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(resultDeleteRule).To(BeEquivalentTo("OK")) Expect(resultDeleteRule).To(BeEquivalentTo("OK"))
resultInfo, err := client.TSInfo(ctx, "1").Result() resultInfo, err := client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(resultInfo["rules"]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
}
}) })
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() { It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
@ -510,12 +542,21 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err := client.TSMGet(ctx, []string{"Test=This"}).Result() result, err := client.TSMGet(ctx, []string{"Test=This"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo("15"))
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo("25"))
} else {
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15)) Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15))
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25)) Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25))
}
mgetOpt := &redis.TSMGetOptions{WithLabels: true} mgetOpt := &redis.TSMGetOptions{WithLabels: true}
result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result() result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["b"][0]).To(ConsistOf([]interface{}{"Test", "This"}, []interface{}{"Taste", "That"}))
} else {
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"})) Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"}))
}
resultCreate, err = client.TSCreate(ctx, "c").Result() resultCreate, err = client.TSCreate(ctx, "c").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -537,11 +578,19 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result() result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), "4"}))
} else {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0})) Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0}))
}
mgetOpt = &redis.TSMGetOptions{Latest: true} mgetOpt = &redis.TSMGetOptions{Latest: true}
result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result() result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), "8"}))
} else {
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0})) Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0}))
}
}) })
It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() { It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() {
@ -839,12 +888,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result() result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
}
// Test Count // Test Count
mrangeOpt := &redis.TSMRangeOptions{Count: 10} mrangeOpt := &redis.TSMRangeOptions{Count: 10}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
}
// Test Aggregation and BucketDuration // Test Aggregation and BucketDuration
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result() _, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
@ -854,19 +911,36 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
}
// Test WithLabels // Test WithLabels
if client.Options().Protocol == 2 {
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
}
mrangeOpt = &redis.TSMRangeOptions{WithLabels: true} mrangeOpt = &redis.TSMRangeOptions{WithLabels: true}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
}
// Test SelectedLabels // Test SelectedLabels
mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}} mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"})) Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
}
// Test FilterBy // Test FilterBy
fts := make([]int, 0) fts := make([]int, 0)
for i := 10; i < 20; i++ { for i := 10; i < 20; i++ {
@ -875,34 +949,58 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1].([]interface{})).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), "1"}, []interface{}{int64(16), "2"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}}))
}
// Test GroupBy // Test GroupBy
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"} mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "2"}, []interface{}{int64(2), "4"}, []interface{}{int64(3), "6"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}}))
}
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"} mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"}
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
}
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"} mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"}
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
} else {
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}})) Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
}
// Test Align // Test Align
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"} mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "10"}, []interface{}{int64(10), "1"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}}))
}
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5} mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5}
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "5"}, []interface{}{int64(5), "6"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}}))
}
}) })
It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() { It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() {
@ -949,8 +1047,13 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt := &redis.TSMRangeOptions{Latest: true} mrangeOpt := &redis.TSMRangeOptions{Latest: true}
result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result() result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["b"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
Expect(result["d"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
} else {
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}})) Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}})) Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
}
}) })
It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() { It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() {
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}} createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
@ -971,12 +1074,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result() result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
}
// Test Count // Test Count
mrangeOpt := &redis.TSMRevRangeOptions{Count: 10} mrangeOpt := &redis.TSMRevRangeOptions{Count: 10}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
}
// Test Aggregation and BucketDuration // Test Aggregation and BucketDuration
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result() _, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
@ -986,20 +1097,32 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
} else {
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20)) Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
// Test WithLabels }
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true} mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
}
// Test SelectedLabels // Test SelectedLabels
mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}} mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
} else {
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"})) Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"})) Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
}
// Test FilterBy // Test FilterBy
fts := make([]int, 0) fts := make([]int, 0)
for i := 10; i < 20; i++ { for i := 10; i < 20; i++ {
@ -1008,34 +1131,56 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}} mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1].([]interface{})).To(ConsistOf([]interface{}{int64(16), "2"}, []interface{}{int64(15), "1"}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}}))
}
// Test GroupBy // Test GroupBy
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"} mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "6"}, []interface{}{int64(2), "4"}, []interface{}{int64(1), "2"}, []interface{}{int64(0), "0"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}}))
}
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"} mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
} else {
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
}
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"} mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(BeEquivalentTo(2)) Expect(len(result)).To(BeEquivalentTo(2))
if client.Options().Protocol == 2 {
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
} else {
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}})) Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
}
// Test Align // Test Align
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"} mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "1"}, []interface{}{int64(0), "10"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}}))
}
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1} mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1}
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result() result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), "10"}, []interface{}{int64(0), "1"}}))
} else {
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}})) Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}}))
}
}) })
It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() { It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() {
@ -1082,8 +1227,14 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
mrangeOpt := &redis.TSMRevRangeOptions{Latest: true} mrangeOpt := &redis.TSMRevRangeOptions{Latest: true}
result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result() result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
if client.Options().Protocol == 2 {
Expect(result["b"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
} else {
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}})) Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}})) Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
}
})
}) })
} }
}) })