mirror of https://github.com/go-redis/redis.git
Compare commits
8 Commits
275db505bd
...
54e8a0368d
Author | SHA1 | Date |
---|---|---|
Marco | 54e8a0368d | |
dependabot[bot] | e63669e170 | |
LINKIWI | fc32d0a01d | |
Justin | f1ffb55c9a | |
LINKIWI | 080e051124 | |
ofekshenawa | 930d904205 | |
ofekshenawa | 8b1073d2d6 | |
ofekshenawa | d1b4eaed41 |
|
@ -54,6 +54,7 @@ stunnel
|
||||||
SynDump
|
SynDump
|
||||||
TCP
|
TCP
|
||||||
TLS
|
TLS
|
||||||
|
UnstableResp
|
||||||
uri
|
uri
|
||||||
URI
|
URI
|
||||||
url
|
url
|
||||||
|
@ -62,3 +63,5 @@ RedisStack
|
||||||
RedisGears
|
RedisGears
|
||||||
RedisTimeseries
|
RedisTimeseries
|
||||||
RediSearch
|
RediSearch
|
||||||
|
RawResult
|
||||||
|
RawVal
|
|
@ -8,7 +8,7 @@ jobs:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Check Spelling
|
- name: Check Spelling
|
||||||
uses: rojopolis/spellcheck-github-actions@0.40.0
|
uses: rojopolis/spellcheck-github-actions@0.45.0
|
||||||
with:
|
with:
|
||||||
config_path: .github/spellcheck-settings.yml
|
config_path: .github/spellcheck-settings.yml
|
||||||
task_name: Markdown
|
task_name: Markdown
|
||||||
|
|
15
README.md
15
README.md
|
@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
|
||||||
#### Unstable RESP3 Structures for RediSearch Commands
|
#### Unstable RESP3 Structures for RediSearch Commands
|
||||||
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
|
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
|
||||||
|
|
||||||
|
To enable unstable RESP3, set the option in your client configuration:
|
||||||
|
|
||||||
|
```go
|
||||||
|
redis.NewClient(&redis.Options{
|
||||||
|
UnstableResp3: true,
|
||||||
|
})
|
||||||
|
```
|
||||||
|
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
|
||||||
|
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
|
||||||
|
|
||||||
|
```go
|
||||||
|
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
|
||||||
|
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
|
||||||
|
```
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!
|
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!
|
||||||
|
|
40
command.go
40
command.go
|
@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
|
||||||
switch v := arg.(type) {
|
switch v := arg.(type) {
|
||||||
case string:
|
case string:
|
||||||
return v
|
return v
|
||||||
|
case []byte:
|
||||||
|
return string(v)
|
||||||
default:
|
default:
|
||||||
// TODO: consider using appendArg
|
// TODO: consider using appendArg
|
||||||
return fmt.Sprint(v)
|
return fmt.Sprint(v)
|
||||||
|
@ -1403,11 +1405,18 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
|
readType, err := rd.PeekReplyType()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.val = make(map[string][]interface{})
|
||||||
|
|
||||||
|
if readType == proto.RespMap {
|
||||||
n, err := rd.ReadMapLen()
|
n, err := rd.ReadMapLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cmd.val = make(map[string][]interface{}, n)
|
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
k, err := rd.ReadString()
|
k, err := rd.ReadString()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1426,6 +1435,35 @@ func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.val[k][j] = value
|
cmd.val[k][j] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if readType == proto.RespArray {
|
||||||
|
// RESP2 response
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
// Each entry in this array is itself an array with key details
|
||||||
|
itemLen, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[key] = make([]interface{}, 0, itemLen-1)
|
||||||
|
for j := 1; j < itemLen; j++ {
|
||||||
|
// Read the inner array for timestamp-value pairs
|
||||||
|
data, err := rd.ReadReply()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[key] = append(cmd.val[key], data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,10 @@ import (
|
||||||
"github.com/redis/go-redis/v9/internal/rand"
|
"github.com/redis/go-redis/v9/internal/rand"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minLatencyMeasurementInterval = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||||
|
|
||||||
// ClusterOptions are used to configure a cluster client and should be
|
// ClusterOptions are used to configure a cluster client and should be
|
||||||
|
@ -316,6 +320,10 @@ type clusterNode struct {
|
||||||
latency uint32 // atomic
|
latency uint32 // atomic
|
||||||
generation uint32 // atomic
|
generation uint32 // atomic
|
||||||
failing uint32 // atomic
|
failing uint32 // atomic
|
||||||
|
|
||||||
|
// last time the latency measurement was performed for the node, stored in nanoseconds
|
||||||
|
// from epoch
|
||||||
|
lastLatencyMeasurement int64 // atomic
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
|
||||||
|
@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
|
||||||
latency = float64(dur) / float64(successes)
|
latency = float64(dur) / float64(successes)
|
||||||
}
|
}
|
||||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||||
|
n.SetLastLatencyMeasurement(time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *clusterNode) Latency() time.Duration {
|
func (n *clusterNode) Latency() time.Duration {
|
||||||
|
@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
|
||||||
return atomic.LoadUint32(&n.generation)
|
return atomic.LoadUint32(&n.generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) LastLatencyMeasurement() int64 {
|
||||||
|
return atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *clusterNode) SetGeneration(gen uint32) {
|
func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
for {
|
for {
|
||||||
v := atomic.LoadUint32(&n.generation)
|
v := atomic.LoadUint32(&n.generation)
|
||||||
|
@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
|
||||||
|
for {
|
||||||
|
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
|
||||||
|
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type clusterNodes struct {
|
type clusterNodes struct {
|
||||||
|
@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|
||||||
c.activeAddrs = c.activeAddrs[:0]
|
c.activeAddrs = c.activeAddrs[:0]
|
||||||
|
now := time.Now()
|
||||||
for addr, node := range c.nodes {
|
for addr, node := range c.nodes {
|
||||||
if node.Generation() >= generation {
|
if node.Generation() >= generation {
|
||||||
c.activeAddrs = append(c.activeAddrs, addr)
|
c.activeAddrs = append(c.activeAddrs, addr)
|
||||||
if c.opt.RouteByLatency {
|
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
|
||||||
go node.updateLatency()
|
go node.updateLatency()
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("determines hash slots correctly for generic commands", func() {
|
||||||
|
opt := redisClusterOptions()
|
||||||
|
opt.MaxRedirects = -1
|
||||||
|
client := cluster.newClusterClient(ctx, opt)
|
||||||
|
|
||||||
|
err := client.Do(ctx, "GET", "A").Err()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
|
||||||
|
Eventually(func() error {
|
||||||
|
return client.SwapNodes(ctx, "A")
|
||||||
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.Do(ctx, "GET", "A").Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
||||||
|
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
It("follows node redirection immediately", func() {
|
It("follows node redirection immediately", func() {
|
||||||
// Configure retry backoffs far in excess of the expected duration of redirection
|
// Configure retry backoffs far in excess of the expected duration of redirection
|
||||||
opt := redisClusterOptions()
|
opt := redisClusterOptions()
|
||||||
|
|
|
@ -319,38 +319,70 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
|
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
|
result := BFInfo{}
|
||||||
|
|
||||||
|
// Create a mapping from key names to pointers of struct fields
|
||||||
|
respMapping := map[string]*int64{
|
||||||
|
"Capacity": &result.Capacity,
|
||||||
|
"CAPACITY": &result.Capacity,
|
||||||
|
"Size": &result.Size,
|
||||||
|
"SIZE": &result.Size,
|
||||||
|
"Number of filters": &result.Filters,
|
||||||
|
"FILTERS": &result.Filters,
|
||||||
|
"Number of items inserted": &result.ItemsInserted,
|
||||||
|
"ITEMS": &result.ItemsInserted,
|
||||||
|
"Expansion rate": &result.ExpansionRate,
|
||||||
|
"EXPANSION": &result.ExpansionRate,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to read and assign a value based on the key
|
||||||
|
readAndAssignValue := func(key string) error {
|
||||||
|
fieldPtr, exists := respMapping[key]
|
||||||
|
if !exists {
|
||||||
|
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the integer and assign to the field via pointer dereferencing
|
||||||
|
val, err := rd.ReadInt()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*fieldPtr = val
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
readType, err := rd.PeekReplyType()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cmd.args) > 2 && readType == proto.RespArray {
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if key, ok := cmd.args[2].(string); ok && n == 1 {
|
||||||
|
if err := readAndAssignValue(key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
n, err := rd.ReadMapLen()
|
n, err := rd.ReadMapLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
var key string
|
key, err := rd.ReadString()
|
||||||
var result BFInfo
|
|
||||||
for f := 0; f < n; f++ {
|
|
||||||
key, err = rd.ReadString()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := readAndAssignValue(key); err != nil {
|
||||||
switch key {
|
|
||||||
case "Capacity":
|
|
||||||
result.Capacity, err = rd.ReadInt()
|
|
||||||
case "Size":
|
|
||||||
result.Size, err = rd.ReadInt()
|
|
||||||
case "Number of filters":
|
|
||||||
result.Filters, err = rd.ReadInt()
|
|
||||||
case "Number of items inserted":
|
|
||||||
result.ItemsInserted, err = rd.ReadInt()
|
|
||||||
case "Expansion rate":
|
|
||||||
result.ExpansionRate, err = rd.ReadInt()
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cmd.val = result
|
cmd.val = result
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -13,15 +13,32 @@ import (
|
||||||
|
|
||||||
var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
setupRedisClient := func(protocolVersion int) *redis.Client {
|
||||||
|
return redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
DB: 0,
|
||||||
|
Protocol: protocolVersion,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
protocols := []int{2, 3}
|
||||||
|
for _, protocol := range protocols {
|
||||||
|
protocol := protocol // capture loop variable for each context
|
||||||
|
|
||||||
|
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewClient(&redis.Options{Addr: ":6379"})
|
client = setupRedisClient(protocol)
|
||||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
if client != nil {
|
||||||
|
client.FlushDB(ctx)
|
||||||
|
client.Close()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("bloom", Label("bloom"), func() {
|
Describe("bloom", Label("bloom"), func() {
|
||||||
|
@ -117,7 +134,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||||
NoCreate: true,
|
NoCreate: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
resultInsert, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
_, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError("ERR not found"))
|
Expect(err).To(MatchError("ERR not found"))
|
||||||
|
|
||||||
|
@ -129,7 +146,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||||
NoCreate: false,
|
NoCreate: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
resultInsert, err = client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
resultInsert, err := client.BFInsert(ctx, "testbf1", options, "item1").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(resultInsert)).To(BeEquivalentTo(1))
|
Expect(len(resultInsert)).To(BeEquivalentTo(1))
|
||||||
|
|
||||||
|
@ -396,7 +413,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||||
NoCreate: true,
|
NoCreate: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := client.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result()
|
_, err := client.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result()
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
|
|
||||||
args = &redis.CFInsertOptions{
|
args = &redis.CFInsertOptions{
|
||||||
|
@ -404,7 +421,7 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||||
NoCreate: false,
|
NoCreate: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err = client.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result()
|
result, err := client.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(3))
|
Expect(len(result)).To(BeEquivalentTo(3))
|
||||||
Expect(result[0]).To(BeEquivalentTo(int64(1)))
|
Expect(result[0]).To(BeEquivalentTo(int64(1)))
|
||||||
|
@ -730,4 +747,6 @@ var _ = Describe("Probabilistic commands", Label("probabilistic"), func() {
|
||||||
Expect(max).To(BeEquivalentTo(float64(140)))
|
Expect(max).To(BeEquivalentTo(float64(140)))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
2
redis.go
2
redis.go
|
@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
hs.hooksMu.Lock()
|
|
||||||
defer hs.hooksMu.Unlock()
|
|
||||||
return hs.current.dial(ctx, network, addr)
|
return hs.current.dial(ctx, network, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Dialer connection timeouts", func() {
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
const dialSimulatedDelay = 1 * time.Second
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
options := redisOptions()
|
||||||
|
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
|
// Simulated slow dialer.
|
||||||
|
// Note that the following sleep is deliberately not context-aware.
|
||||||
|
time.Sleep(dialSimulatedDelay)
|
||||||
|
return net.Dial("tcp", options.Addr)
|
||||||
|
}
|
||||||
|
options.MinIdleConns = 1
|
||||||
|
client = redis.NewClient(options)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
err := client.Close()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("does not contend on connection dial for concurrent commands", func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
const concurrency = 10
|
||||||
|
|
||||||
|
durations := make(chan time.Duration, concurrency)
|
||||||
|
errs := make(chan error, concurrency)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
wg.Add(concurrency)
|
||||||
|
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := client.Ping(ctx).Err()
|
||||||
|
durations <- time.Since(start)
|
||||||
|
errs <- err
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(durations)
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
// All commands should eventually succeed, after acquiring a connection.
|
||||||
|
for err := range errs {
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each individual command should complete within the simulated dial duration bound.
|
||||||
|
for duration := range durations {
|
||||||
|
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Due to concurrent execution, the entire test suite should also complete within
|
||||||
|
// the same dial duration bound applied for individual commands.
|
||||||
|
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
|
@ -2,6 +2,7 @@ package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
. "github.com/bsm/ginkgo/v2"
|
. "github.com/bsm/ginkgo/v2"
|
||||||
|
@ -12,15 +13,33 @@ import (
|
||||||
|
|
||||||
var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
setupRedisClient := func(protocolVersion int) *redis.Client {
|
||||||
|
return redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
DB: 0,
|
||||||
|
Protocol: protocolVersion,
|
||||||
|
UnstableResp3: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
protocols := []int{2, 3}
|
||||||
|
for _, protocol := range protocols {
|
||||||
|
protocol := protocol // capture loop variable for each context
|
||||||
|
|
||||||
|
Context(fmt.Sprintf("with protocol version %d", protocol), func() {
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewClient(&redis.Options{Addr: rediStackAddr})
|
client = setupRedisClient(protocol)
|
||||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
if client != nil {
|
||||||
|
client.FlushDB(ctx)
|
||||||
|
client.Close()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
|
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
|
||||||
|
@ -42,7 +61,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
Expect(result).To(BeEquivalentTo("OK"))
|
Expect(result).To(BeEquivalentTo("OK"))
|
||||||
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
|
||||||
|
} else {
|
||||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||||
|
}
|
||||||
// Test chunk size
|
// Test chunk size
|
||||||
opt = &redis.TSOptions{ChunkSize: 128}
|
opt = &redis.TSOptions{ChunkSize: 128}
|
||||||
result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result()
|
result, err = client.TSCreateWithArgs(ctx, "ts-cs-1", opt).Result()
|
||||||
|
@ -134,7 +157,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
Expect(result).To(BeEquivalentTo(4))
|
Expect(result).To(BeEquivalentTo(4))
|
||||||
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
resultInfo, err := client.TSInfo(ctx, "4").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(resultInfo["labels"].([]interface{})).To(ContainElement([]interface{}{"Time", "Series"}))
|
||||||
|
} else {
|
||||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||||
|
}
|
||||||
// Test chunk size
|
// Test chunk size
|
||||||
opt = &redis.TSOptions{ChunkSize: 128}
|
opt = &redis.TSOptions{ChunkSize: 128}
|
||||||
result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result()
|
result, err = client.TSAddWithArgs(ctx, "ts-cs-1", 1, 10, opt).Result()
|
||||||
|
@ -223,7 +250,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
|
|
||||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(resultInfo["labels"]).To(BeEquivalentTo([]interface{}{}))
|
||||||
|
} else {
|
||||||
Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
Expect(resultInfo["labels"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||||
|
}
|
||||||
|
|
||||||
opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}}
|
opt = &redis.TSAlterOptions{Labels: map[string]string{"Time": "Series"}}
|
||||||
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
||||||
|
@ -232,9 +263,15 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
|
|
||||||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(resultInfo["labels"].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"Time", "Series"}))
|
||||||
|
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
||||||
|
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
|
||||||
|
} else {
|
||||||
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
Expect(resultInfo["labels"].(map[interface{}]interface{})["Time"]).To(BeEquivalentTo("Series"))
|
||||||
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
Expect(resultInfo["retentionTime"]).To(BeEquivalentTo(10))
|
||||||
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
|
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo(redis.Nil))
|
||||||
|
}
|
||||||
opt = &redis.TSAlterOptions{DuplicatePolicy: "min"}
|
opt = &redis.TSAlterOptions{DuplicatePolicy: "min"}
|
||||||
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
resultAlter, err = client.TSAlter(ctx, "1", opt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -304,7 +341,11 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
Expect(resultDeleteRule).To(BeEquivalentTo("OK"))
|
Expect(resultDeleteRule).To(BeEquivalentTo("OK"))
|
||||||
resultInfo, err := client.TSInfo(ctx, "1").Result()
|
resultInfo, err := client.TSInfo(ctx, "1").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(resultInfo["rules"]).To(BeEquivalentTo([]interface{}{}))
|
||||||
|
} else {
|
||||||
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
|
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
|
||||||
|
@ -501,12 +542,21 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
|
|
||||||
result, err := client.TSMGet(ctx, []string{"Test=This"}).Result()
|
result, err := client.TSMGet(ctx, []string{"Test=This"}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo("15"))
|
||||||
|
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo("25"))
|
||||||
|
} else {
|
||||||
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15))
|
Expect(result["a"][1].([]interface{})[1]).To(BeEquivalentTo(15))
|
||||||
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25))
|
Expect(result["b"][1].([]interface{})[1]).To(BeEquivalentTo(25))
|
||||||
|
}
|
||||||
mgetOpt := &redis.TSMGetOptions{WithLabels: true}
|
mgetOpt := &redis.TSMGetOptions{WithLabels: true}
|
||||||
result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result()
|
result, err = client.TSMGetWithArgs(ctx, []string{"Test=This"}, mgetOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["b"][0]).To(ConsistOf([]interface{}{"Test", "This"}, []interface{}{"Taste", "That"}))
|
||||||
|
} else {
|
||||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"}))
|
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "Taste": "That"}))
|
||||||
|
}
|
||||||
|
|
||||||
resultCreate, err = client.TSCreate(ctx, "c").Result()
|
resultCreate, err = client.TSCreate(ctx, "c").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@ -528,11 +578,19 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result()
|
result, err = client.TSMGet(ctx, []string{"is_compaction=true"}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), "4"}))
|
||||||
|
} else {
|
||||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0}))
|
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(0), 4.0}))
|
||||||
|
}
|
||||||
mgetOpt = &redis.TSMGetOptions{Latest: true}
|
mgetOpt = &redis.TSMGetOptions{Latest: true}
|
||||||
result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result()
|
result, err = client.TSMGetWithArgs(ctx, []string{"is_compaction=true"}, mgetOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), "8"}))
|
||||||
|
} else {
|
||||||
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0}))
|
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{int64(10), 8.0}))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() {
|
It("should TSQueryIndex", Label("timeseries", "tsqueryindex"), func() {
|
||||||
|
@ -830,12 +888,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
result, err := client.TSMRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(2))
|
Expect(len(result)).To(BeEquivalentTo(2))
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
|
||||||
|
} else {
|
||||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
||||||
|
}
|
||||||
// Test Count
|
// Test Count
|
||||||
mrangeOpt := &redis.TSMRangeOptions{Count: 10}
|
mrangeOpt := &redis.TSMRangeOptions{Count: 10}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
|
||||||
|
} else {
|
||||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
||||||
|
}
|
||||||
// Test Aggregation and BucketDuration
|
// Test Aggregation and BucketDuration
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
||||||
|
@ -845,19 +911,36 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(2))
|
Expect(len(result)).To(BeEquivalentTo(2))
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
|
||||||
|
} else {
|
||||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
||||||
|
}
|
||||||
// Test WithLabels
|
// Test WithLabels
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||||
|
}
|
||||||
mrangeOpt = &redis.TSMRangeOptions{WithLabels: true}
|
mrangeOpt = &redis.TSMRangeOptions{WithLabels: true}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
||||||
|
}
|
||||||
// Test SelectedLabels
|
// Test SelectedLabels
|
||||||
mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}}
|
mrangeOpt = &redis.TSMRangeOptions{SelectedLabels: []interface{}{"team"}}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
|
||||||
|
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
||||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
||||||
|
}
|
||||||
// Test FilterBy
|
// Test FilterBy
|
||||||
fts := make([]int, 0)
|
fts := make([]int, 0)
|
||||||
for i := 10; i < 20; i++ {
|
for i := 10; i < 20; i++ {
|
||||||
|
@ -866,34 +949,58 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
mrangeOpt = &redis.TSMRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1].([]interface{})).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), "1"}, []interface{}{int64(16), "2"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}}))
|
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(15), 1.0}, []interface{}{int64(16), 2.0}}))
|
||||||
|
}
|
||||||
// Test GroupBy
|
// Test GroupBy
|
||||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "2"}, []interface{}{int64(2), "4"}, []interface{}{int64(3), "6"}}))
|
||||||
|
} else {
|
||||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}}))
|
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(3), 6.0}}))
|
||||||
|
}
|
||||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
|
||||||
|
} else {
|
||||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||||
|
}
|
||||||
|
|
||||||
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
mrangeOpt = &redis.TSMRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(2))
|
Expect(len(result)).To(BeEquivalentTo(2))
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
|
||||||
|
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "0"}, []interface{}{int64(1), "1"}, []interface{}{int64(2), "2"}, []interface{}{int64(3), "3"}}))
|
||||||
|
} else {
|
||||||
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||||
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 0.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(3), 3.0}}))
|
||||||
|
}
|
||||||
// Test Align
|
// Test Align
|
||||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "10"}, []interface{}{int64(10), "1"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}}))
|
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 10.0}, []interface{}{int64(10), 1.0}}))
|
||||||
|
}
|
||||||
|
|
||||||
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5}
|
mrangeOpt = &redis.TSMRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 5}
|
||||||
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
result, err = client.TSMRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), "5"}, []interface{}{int64(5), "6"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}}))
|
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 5.0}, []interface{}{int64(5), 6.0}}))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() {
|
It("should TSMRangeWithArgs Latest", Label("timeseries", "tsmrangeWithArgs", "tsmrangelatest", "NonRedisEnterprise"), func() {
|
||||||
|
@ -940,8 +1047,13 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
mrangeOpt := &redis.TSMRangeOptions{Latest: true}
|
mrangeOpt := &redis.TSMRangeOptions{Latest: true}
|
||||||
result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
result, err := client.TSMRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["b"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
|
||||||
|
Expect(result["d"][1]).To(ConsistOf([]interface{}{int64(0), "4"}, []interface{}{int64(10), "8"}))
|
||||||
|
} else {
|
||||||
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
||||||
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(0), 4.0}, []interface{}{int64(10), 8.0}}))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() {
|
It("should TSMRevRange and TSMRevRangeWithArgs", Label("timeseries", "tsmrevrange", "tsmrevrangeWithArgs"), func() {
|
||||||
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
|
createOpt := &redis.TSOptions{Labels: map[string]string{"Test": "This", "team": "ny"}}
|
||||||
|
@ -962,12 +1074,20 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
result, err := client.TSMRevRange(ctx, 0, 200, []string{"Test=This"}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(2))
|
Expect(len(result)).To(BeEquivalentTo(2))
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(100))
|
||||||
|
} else {
|
||||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(100))
|
||||||
|
}
|
||||||
// Test Count
|
// Test Count
|
||||||
mrangeOpt := &redis.TSMRevRangeOptions{Count: 10}
|
mrangeOpt := &redis.TSMRevRangeOptions{Count: 10}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(10))
|
||||||
|
} else {
|
||||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(10))
|
||||||
|
}
|
||||||
// Test Aggregation and BucketDuration
|
// Test Aggregation and BucketDuration
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
_, err := client.TSAdd(ctx, "a", i+200, float64(i%7)).Result()
|
||||||
|
@ -977,20 +1097,32 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 500, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(2))
|
Expect(len(result)).To(BeEquivalentTo(2))
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(len(result["a"][1].([]interface{}))).To(BeEquivalentTo(20))
|
||||||
|
Expect(result["a"][0]).To(BeEquivalentTo([]interface{}{}))
|
||||||
|
} else {
|
||||||
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
Expect(len(result["a"][2].([]interface{}))).To(BeEquivalentTo(20))
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||||
// Test WithLabels
|
}
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true}
|
mrangeOpt = &redis.TSMRevRangeOptions{WithLabels: true}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][0]).To(ConsistOf([]interface{}{[]interface{}{"Test", "This"}, []interface{}{"team", "ny"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"Test": "This", "team": "ny"}))
|
||||||
|
}
|
||||||
// Test SelectedLabels
|
// Test SelectedLabels
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}}
|
mrangeOpt = &redis.TSMRevRangeOptions{SelectedLabels: []interface{}{"team"}}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "ny"}))
|
||||||
|
Expect(result["b"][0].([]interface{})[0]).To(BeEquivalentTo([]interface{}{"team", "sf"}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
Expect(result["a"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "ny"}))
|
||||||
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
Expect(result["b"][0]).To(BeEquivalentTo(map[interface{}]interface{}{"team": "sf"}))
|
||||||
|
}
|
||||||
// Test FilterBy
|
// Test FilterBy
|
||||||
fts := make([]int, 0)
|
fts := make([]int, 0)
|
||||||
for i := 10; i < 20; i++ {
|
for i := 10; i < 20; i++ {
|
||||||
|
@ -999,34 +1131,56 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
mrangeOpt = &redis.TSMRevRangeOptions{FilterByTS: fts, FilterByValue: []int{1, 2}}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 200, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1].([]interface{})).To(ConsistOf([]interface{}{int64(16), "2"}, []interface{}{int64(15), "1"}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}}))
|
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(16), 2.0}, []interface{}{int64(15), 1.0}}))
|
||||||
|
}
|
||||||
// Test GroupBy
|
// Test GroupBy
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "sum"}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "6"}, []interface{}{int64(2), "4"}, []interface{}{int64(1), "2"}, []interface{}{int64(0), "0"}}))
|
||||||
|
} else {
|
||||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}}))
|
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 6.0}, []interface{}{int64(2), 4.0}, []interface{}{int64(1), 2.0}, []interface{}{int64(0), 0.0}}))
|
||||||
|
}
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "Test", Reducer: "max"}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["Test=This"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
|
||||||
|
} else {
|
||||||
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
Expect(result["Test=This"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||||
|
}
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
mrangeOpt = &redis.TSMRevRangeOptions{GroupByLabel: "team", Reducer: "min"}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 3, []string{"Test=This"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(len(result)).To(BeEquivalentTo(2))
|
Expect(len(result)).To(BeEquivalentTo(2))
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["team=ny"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
|
||||||
|
Expect(result["team=sf"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), "3"}, []interface{}{int64(2), "2"}, []interface{}{int64(1), "1"}, []interface{}{int64(0), "0"}}))
|
||||||
|
} else {
|
||||||
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
Expect(result["team=ny"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||||
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
Expect(result["team=sf"][3]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(3), 3.0}, []interface{}{int64(2), 2.0}, []interface{}{int64(1), 1.0}, []interface{}{int64(0), 0.0}}))
|
||||||
|
}
|
||||||
// Test Align
|
// Test Align
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: "-"}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "1"}, []interface{}{int64(0), "10"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}}))
|
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 1.0}, []interface{}{int64(0), 10.0}}))
|
||||||
|
}
|
||||||
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1}
|
mrangeOpt = &redis.TSMRevRangeOptions{Aggregator: redis.Count, BucketDuration: 10, Align: 1}
|
||||||
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
result, err = client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"team=ny"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["a"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), "10"}, []interface{}{int64(0), "1"}}))
|
||||||
|
} else {
|
||||||
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}}))
|
Expect(result["a"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(1), 10.0}, []interface{}{int64(0), 1.0}}))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() {
|
It("should TSMRevRangeWithArgs Latest", Label("timeseries", "tsmrevrangeWithArgs", "tsmrevrangelatest", "NonRedisEnterprise"), func() {
|
||||||
|
@ -1073,7 +1227,14 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
||||||
mrangeOpt := &redis.TSMRevRangeOptions{Latest: true}
|
mrangeOpt := &redis.TSMRevRangeOptions{Latest: true}
|
||||||
result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
result, err := client.TSMRevRangeWithArgs(ctx, 0, 10, []string{"is_compaction=true"}, mrangeOpt).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if client.Options().Protocol == 2 {
|
||||||
|
Expect(result["b"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
|
||||||
|
Expect(result["d"][1]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), "8"}, []interface{}{int64(0), "4"}}))
|
||||||
|
} else {
|
||||||
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
Expect(result["b"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
||||||
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
Expect(result["d"][2]).To(BeEquivalentTo([]interface{}{[]interface{}{int64(10), 8.0}, []interface{}{int64(0), 4.0}}))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue