diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index a139f5da..5210ccfa 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -23,4 +23,4 @@ jobs: steps: - uses: actions/checkout@v4 - name: golangci-lint - uses: golangci/golangci-lint-action@v4 + uses: golangci/golangci-lint-action@v6 diff --git a/Makefile b/Makefile index 00cf1de5..d8d00759 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ build: testdata/redis: mkdir -p $@ - wget -qO- https://download.redis.io/releases/redis-7.4-rc1.tar.gz | tar xvz --strip-components=1 -C $@ + wget -qO- https://download.redis.io/releases/redis-7.4-rc2.tar.gz | tar xvz --strip-components=1 -C $@ testdata/redis/src/redis-server: testdata/redis cd $< && make all diff --git a/commands_test.go b/commands_test.go index edc95694..9554bf9a 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2486,35 +2486,40 @@ var _ = Describe("Commands", func() { }) It("should HExpire", Label("hash-expiration", "NonRedisEnterprise"), func() { - res, err := client.HExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result() + res, err := client.HExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err = client.HExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result() + res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, 1, -2})) }) It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() { - _, err := client.HPExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result() + res, err := client.HPExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err := client.HPExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result() + res, err = client.HPExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, 1, -2})) }) It("should HExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result() + resEmpty, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) @@ -2526,9 +2531,10 @@ var _ = Describe("Commands", func() { }) It("should HPExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result() + resEmpty, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) @@ -2540,9 +2546,10 @@ var _ = Describe("Commands", func() { }) It("should HPersist", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result() + resEmpty, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) @@ -2552,7 +2559,7 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{-1, -1, -2})) - res, err = client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, -2})) @@ -2562,15 +2569,16 @@ var _ = Describe("Commands", func() { }) It("should HExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result() + resEmpty, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, -2})) @@ -2580,9 +2588,10 @@ var _ = Describe("Commands", func() { }) It("should HPExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result() + resEmpty, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) @@ -2599,15 +2608,16 @@ var _ = Describe("Commands", func() { }) It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result() + resEmpty, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, -2})) @@ -2617,15 +2627,16 @@ var _ = Describe("Commands", func() { }) It("should HPTTL", Label("hash-expiration", "NonRedisEnterprise"), func() { - - _, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result() + resEmpty, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result() Expect(err).To(BeNil()) + Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2})) + for i := 0; i < 100; i++ { sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello") Expect(sadd.Err()).NotTo(HaveOccurred()) } - res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result() + res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result() Expect(err).NotTo(HaveOccurred()) Expect(res).To(Equal([]int64{1, -2})) diff --git a/hash_commands.go b/hash_commands.go index ef69064e..dcffdcdd 100644 --- a/hash_commands.go +++ b/hash_commands.go @@ -23,6 +23,19 @@ type HashCmdable interface { HVals(ctx context.Context, key string) *StringSliceCmd HRandField(ctx context.Context, key string, count int) *StringSliceCmd HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd + HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd + HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd + HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd + HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd + HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd + HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd + HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd + HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd + HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd + HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd + HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd + HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd + HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd } func (c cmdable) HDel(ctx context.Context, key string, fields ...string) *IntCmd { @@ -202,7 +215,7 @@ type HExpireArgs struct { // The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields. // For more information - https://redis.io/commands/hexpire/ func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd { - args := []interface{}{"HEXPIRE", key, expiration, "FIELDS", len(fields)} + args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration), "FIELDS", len(fields)} for _, field := range fields { args = append(args, field) @@ -217,7 +230,7 @@ func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Durati // The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields. // For more information - https://redis.io/commands/hexpire/ func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd { - args := []interface{}{"HEXPIRE", key, expiration} + args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration)} // only if one argument is true, we can add it to the args // if more than one argument is true, it will cause an error diff --git a/internal/pool/conn_check.go b/internal/pool/conn_check.go index 83190d39..07c261c2 100644 --- a/internal/pool/conn_check.go +++ b/internal/pool/conn_check.go @@ -3,6 +3,7 @@ package pool import ( + "crypto/tls" "errors" "io" "net" @@ -16,6 +17,10 @@ func connCheck(conn net.Conn) error { // Reset previous timeout. _ = conn.SetDeadline(time.Time{}) + // Check if tls.Conn. + if c, ok := conn.(*tls.Conn); ok { + conn = c.NetConn() + } sysConn, ok := conn.(syscall.Conn) if !ok { return nil diff --git a/internal/pool/conn_check_test.go b/internal/pool/conn_check_test.go index 2ade8a0b..21499333 100644 --- a/internal/pool/conn_check_test.go +++ b/internal/pool/conn_check_test.go @@ -3,6 +3,7 @@ package pool import ( + "crypto/tls" "net" "net/http/httptest" "time" @@ -14,12 +15,17 @@ import ( var _ = Describe("tests conn_check with real conns", func() { var ts *httptest.Server var conn net.Conn + var tlsConn *tls.Conn var err error BeforeEach(func() { ts = httptest.NewServer(nil) conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second) Expect(err).NotTo(HaveOccurred()) + tlsTestServer := httptest.NewUnstartedServer(nil) + tlsTestServer.StartTLS() + tlsConn, err = tls.DialWithDialer(&net.Dialer{Timeout: time.Second}, tlsTestServer.Listener.Addr().Network(), tlsTestServer.Listener.Addr().String(), &tls.Config{InsecureSkipVerify: true}) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -33,11 +39,23 @@ var _ = Describe("tests conn_check with real conns", func() { Expect(connCheck(conn)).To(HaveOccurred()) }) + It("good tls conn check", func() { + Expect(connCheck(tlsConn)).NotTo(HaveOccurred()) + + Expect(tlsConn.Close()).NotTo(HaveOccurred()) + Expect(connCheck(tlsConn)).To(HaveOccurred()) + }) + It("bad conn check", func() { Expect(conn.Close()).NotTo(HaveOccurred()) Expect(connCheck(conn)).To(HaveOccurred()) }) + It("bad tls conn check", func() { + Expect(tlsConn.Close()).NotTo(HaveOccurred()) + Expect(connCheck(tlsConn)).To(HaveOccurred()) + }) + It("check conn deadline", func() { Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred()) time.Sleep(time.Millisecond * 10) diff --git a/osscluster.go b/osscluster.go index e28cb1e3..ce258ff3 100644 --- a/osscluster.go +++ b/osscluster.go @@ -341,6 +341,8 @@ func (n *clusterNode) Close() error { return n.Client.Close() } +const maximumNodeLatency = 1 * time.Minute + func (n *clusterNode) updateLatency() { const numProbe = 10 var dur uint64 @@ -361,7 +363,7 @@ func (n *clusterNode) updateLatency() { if successes == 0 { // If none of the pings worked, set latency to some arbitrarily high value so this node gets // least priority. - latency = float64((1 * time.Minute) / time.Microsecond) + latency = float64((maximumNodeLatency) / time.Microsecond) } else { latency = float64(dur) / float64(successes) } @@ -735,20 +737,40 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { return c.nodes.Random() } - var node *clusterNode + var allNodesFailing = true + var ( + closestNonFailingNode *clusterNode + closestNode *clusterNode + minLatency time.Duration + ) + + // setting the max possible duration as zerovalue for minlatency + minLatency = time.Duration(math.MaxInt64) + for _, n := range nodes { - if n.Failing() { - continue + if closestNode == nil || n.Latency() < minLatency { + closestNode = n + minLatency = n.Latency() + if !n.Failing() { + closestNonFailingNode = n + allNodesFailing = false + } } - if node == nil || n.Latency() < node.Latency() { - node = n - } - } - if node != nil { - return node, nil } - // If all nodes are failing - return random node + // pick the healthly node with the lowest latency + if !allNodesFailing && closestNonFailingNode != nil { + return closestNonFailingNode, nil + } + + // if all nodes are failing, we will pick the temporarily failing node with lowest latency + if minLatency < maximumNodeLatency && closestNode != nil { + internal.Logger.Printf(context.TODO(), "redis: all nodes are marked as failed, picking the temporarily failing node with lowest latency") + return closestNode, nil + } + + // If all nodes are having the maximum latency(all pings are failing) - return a random node across the cluster + internal.Logger.Printf(context.TODO(), "redis: pings to all nodes are failing, picking a random node across the cluster") return c.nodes.Random() } @@ -916,10 +938,13 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { slot := c.cmdSlot(ctx, cmd) var node *clusterNode + var moved bool var ask bool var lastErr error for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { + // MOVED and ASK responses are not transient errors that require retry delay; they + // should be attempted immediately. + if attempt > 0 && !moved && !ask { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { return err } @@ -963,7 +988,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { continue } - var moved bool var addr string moved, ask, addr = isMovedError(lastErr) if moved || ask { diff --git a/osscluster_test.go b/osscluster_test.go index 3d2f8071..f7bd1683 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) + It("follows node redirection immediately", func() { + // Configure retry backoffs far in excess of the expected duration of redirection + opt := redisClusterOptions() + opt.MinRetryBackoff = 10 * time.Minute + opt.MaxRetryBackoff = 20 * time.Minute + client := cluster.newClusterClient(ctx, opt) + + Eventually(func() error { + return client.SwapNodes(ctx, "A") + }, 30*time.Second).ShouldNot(HaveOccurred()) + + // Note that this context sets a deadline more aggressive than the lowest possible bound + // of the retry backoff; this verifies that redirection completes immediately. + redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + err := client.Set(redirCtx, "A", "VALUE", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + v, err := client.Get(redirCtx, "A").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal("VALUE")) + + Expect(client.Close()).NotTo(HaveOccurred()) + }) + It("calls fn for every master node", func() { for i := 0; i < 10; i++ { Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred()) diff --git a/pubsub.go b/pubsub.go index aea96241..72b18f49 100644 --- a/pubsub.go +++ b/pubsub.go @@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er } func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error { - return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error { + return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmd) }) } diff --git a/search_commands.go b/search_commands.go index 8214a570..f5118c77 100644 --- a/search_commands.go +++ b/search_commands.go @@ -75,6 +75,8 @@ type FieldSchema struct { WithSuffixtrie bool VectorArgs *FTVectorArgs GeoShapeFieldType string + IndexEmpty bool + IndexMissing bool } type FTVectorArgs struct { @@ -1002,6 +1004,13 @@ func (c cmdable) FTCreate(ctx context.Context, index string, options *FTCreateOp if schema.WithSuffixtrie { args = append(args, "WITHSUFFIXTRIE") } + if schema.IndexEmpty { + args = append(args, "INDEXEMPTY") + } + if schema.IndexMissing { + args = append(args, "INDEXMISSING") + + } } cmd := NewStatusCmd(ctx, args...) _ = c(ctx, cmd) diff --git a/search_test.go b/search_test.go index 60888ef5..0d66f243 100644 --- a/search_test.go +++ b/search_test.go @@ -1017,6 +1017,110 @@ var _ = Describe("RediSearch commands", Label("search"), func() { Expect(res.Attributes[0].WithSuffixtrie).To(BeTrue()) }) + It("should test dialect 4", Label("search", "ftcreate", "ftsearch", "NonRedisEnterprise"), func() { + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{ + Prefix: []interface{}{"resource:"}, + }, &redis.FieldSchema{ + FieldName: "uuid", + FieldType: redis.SearchFieldTypeTag, + }, &redis.FieldSchema{ + FieldName: "tags", + FieldType: redis.SearchFieldTypeTag, + }, &redis.FieldSchema{ + FieldName: "description", + FieldType: redis.SearchFieldTypeText, + }, &redis.FieldSchema{ + FieldName: "rating", + FieldType: redis.SearchFieldTypeNumeric, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + + client.HSet(ctx, "resource:1", map[string]interface{}{ + "uuid": "123e4567-e89b-12d3-a456-426614174000", + "tags": "finance|crypto|$btc|blockchain", + "description": "Analysis of blockchain technologies & Bitcoin's potential.", + "rating": 5, + }) + client.HSet(ctx, "resource:2", map[string]interface{}{ + "uuid": "987e6543-e21c-12d3-a456-426614174999", + "tags": "health|well-being|fitness|new-year's-resolutions", + "description": "Health trends for the new year, including fitness regimes.", + "rating": 4, + }) + + res, err := client.FTSearchWithArgs(ctx, "idx1", "@uuid:{$uuid}", + &redis.FTSearchOptions{ + DialectVersion: 2, + Params: map[string]interface{}{"uuid": "123e4567-e89b-12d3-a456-426614174000"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeEquivalentTo(int64(1))) + Expect(res.Docs[0].ID).To(BeEquivalentTo("resource:1")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "@uuid:{$uuid}", + &redis.FTSearchOptions{ + DialectVersion: 4, + Params: map[string]interface{}{"uuid": "123e4567-e89b-12d3-a456-426614174000"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Total).To(BeEquivalentTo(int64(1))) + Expect(res.Docs[0].ID).To(BeEquivalentTo("resource:1")) + + client.HSet(ctx, "test:1", map[string]interface{}{ + "uuid": "3d3586fe-0416-4572-8ce", + "email": "adriano@acme.com.ie", + "num": 5, + }) + + // Create the index + ftCreateOptions := &redis.FTCreateOptions{ + Prefix: []interface{}{"test:"}, + } + schema := []*redis.FieldSchema{ + { + FieldName: "uuid", + FieldType: redis.SearchFieldTypeTag, + }, + { + FieldName: "email", + FieldType: redis.SearchFieldTypeTag, + }, + { + FieldName: "num", + FieldType: redis.SearchFieldTypeNumeric, + }, + } + + val, err = client.FTCreate(ctx, "idx_hash", ftCreateOptions, schema...).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("OK")) + + ftSearchOptions := &redis.FTSearchOptions{ + DialectVersion: 4, + Params: map[string]interface{}{ + "uuid": "3d3586fe-0416-4572-8ce", + "email": "adriano@acme.com.ie", + }, + } + + res, err = client.FTSearchWithArgs(ctx, "idx_hash", "@uuid:{$uuid}", ftSearchOptions).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("test:1")) + Expect(res.Docs[0].Fields["uuid"]).To(BeEquivalentTo("3d3586fe-0416-4572-8ce")) + + res, err = client.FTSearchWithArgs(ctx, "idx_hash", "@email:{$email}", ftSearchOptions).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("test:1")) + Expect(res.Docs[0].Fields["email"]).To(BeEquivalentTo("adriano@acme.com.ie")) + + ftSearchOptions.Params = map[string]interface{}{"num": 5} + res, err = client.FTSearchWithArgs(ctx, "idx_hash", "@num:[5]", ftSearchOptions).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("test:1")) + Expect(res.Docs[0].Fields["num"]).To(BeEquivalentTo("5")) + }) + It("should FTCreate GeoShape", Label("search", "ftcreate", "ftsearch"), func() { val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "geom", FieldType: redis.SearchFieldTypeGeoShape, GeoShapeFieldType: "FLAT"}).Result() Expect(err).NotTo(HaveOccurred()) @@ -1043,8 +1147,175 @@ var _ = Describe("RediSearch commands", Label("search"), func() { Expect(err).NotTo(HaveOccurred()) Expect(res2.Total).To(BeEquivalentTo(int64(2))) }) + + It("should test geoshapes query intersects and disjoint", Label("NonRedisEnterprise"), func() { + _, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{ + FieldName: "g", + FieldType: redis.SearchFieldTypeGeoShape, + GeoShapeFieldType: "FLAT", + }).Result() + Expect(err).NotTo(HaveOccurred()) + + client.HSet(ctx, "doc_point1", "g", "POINT (10 10)") + client.HSet(ctx, "doc_point2", "g", "POINT (50 50)") + client.HSet(ctx, "doc_polygon1", "g", "POLYGON ((20 20, 25 35, 35 25, 20 20))") + client.HSet(ctx, "doc_polygon2", "g", "POLYGON ((60 60, 65 75, 70 70, 65 55, 60 60))") + + intersection, err := client.FTSearchWithArgs(ctx, "idx1", "@g:[intersects $shape]", + &redis.FTSearchOptions{ + DialectVersion: 3, + Params: map[string]interface{}{"shape": "POLYGON((15 15, 75 15, 50 70, 20 40, 15 15))"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + _assert_geosearch_result(&intersection, []string{"doc_point2", "doc_polygon1"}) + + disjunction, err := client.FTSearchWithArgs(ctx, "idx1", "@g:[disjoint $shape]", + &redis.FTSearchOptions{ + DialectVersion: 3, + Params: map[string]interface{}{"shape": "POLYGON((15 15, 75 15, 50 70, 20 40, 15 15))"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + _assert_geosearch_result(&disjunction, []string{"doc_point1", "doc_polygon2"}) + }) + + It("should test geoshapes query contains and within", func() { + _, err := client.FTCreate(ctx, "idx2", &redis.FTCreateOptions{}, &redis.FieldSchema{ + FieldName: "g", + FieldType: redis.SearchFieldTypeGeoShape, + GeoShapeFieldType: "FLAT", + }).Result() + Expect(err).NotTo(HaveOccurred()) + + client.HSet(ctx, "doc_point1", "g", "POINT (10 10)") + client.HSet(ctx, "doc_point2", "g", "POINT (50 50)") + client.HSet(ctx, "doc_polygon1", "g", "POLYGON ((20 20, 25 35, 35 25, 20 20))") + client.HSet(ctx, "doc_polygon2", "g", "POLYGON ((60 60, 65 75, 70 70, 65 55, 60 60))") + + containsA, err := client.FTSearchWithArgs(ctx, "idx2", "@g:[contains $shape]", + &redis.FTSearchOptions{ + DialectVersion: 3, + Params: map[string]interface{}{"shape": "POINT(25 25)"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + _assert_geosearch_result(&containsA, []string{"doc_polygon1"}) + + containsB, err := client.FTSearchWithArgs(ctx, "idx2", "@g:[contains $shape]", + &redis.FTSearchOptions{ + DialectVersion: 3, + Params: map[string]interface{}{"shape": "POLYGON((24 24, 24 26, 25 25, 24 24))"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + _assert_geosearch_result(&containsB, []string{"doc_polygon1"}) + + within, err := client.FTSearchWithArgs(ctx, "idx2", "@g:[within $shape]", + &redis.FTSearchOptions{ + DialectVersion: 3, + Params: map[string]interface{}{"shape": "POLYGON((15 15, 75 15, 50 70, 20 40, 15 15))"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + _assert_geosearch_result(&within, []string{"doc_point2", "doc_polygon1"}) + }) + + It("should search missing fields", Label("search", "ftcreate", "ftsearch", "NonRedisEnterprise"), func() { + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{Prefix: []interface{}{"property:"}}, + &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: true}, + &redis.FieldSchema{FieldName: "features", FieldType: redis.SearchFieldTypeTag, IndexMissing: true}, + &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, IndexMissing: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + client.HSet(ctx, "property:1", map[string]interface{}{ + "title": "Luxury Villa in Malibu", + "features": "pool,sea view,modern", + "description": "A stunning modern villa overlooking the Pacific Ocean.", + }) + + client.HSet(ctx, "property:2", map[string]interface{}{ + "title": "Downtown Flat", + "description": "Modern flat in central Paris with easy access to metro.", + }) + + client.HSet(ctx, "property:3", map[string]interface{}{ + "title": "Beachfront Bungalow", + "features": "beachfront,sun deck", + }) + + res, err := client.FTSearchWithArgs(ctx, "idx1", "ismissing(@features)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:2")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "-ismissing(@features)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1")) + Expect(res.Docs[1].ID).To(BeEquivalentTo("property:3")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "ismissing(@description)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:3")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "-ismissing(@description)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1")) + Expect(res.Docs[1].ID).To(BeEquivalentTo("property:2")) + }) + + It("should search empty fields", Label("search", "ftcreate", "ftsearch", "NonRedisEnterprise"), func() { + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{Prefix: []interface{}{"property:"}}, + &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: true}, + &redis.FieldSchema{FieldName: "features", FieldType: redis.SearchFieldTypeTag, IndexEmpty: true}, + &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, IndexEmpty: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + client.HSet(ctx, "property:1", map[string]interface{}{ + "title": "Luxury Villa in Malibu", + "features": "pool,sea view,modern", + "description": "A stunning modern villa overlooking the Pacific Ocean.", + }) + + client.HSet(ctx, "property:2", map[string]interface{}{ + "title": "Downtown Flat", + "features": "", + "description": "Modern flat in central Paris with easy access to metro.", + }) + + client.HSet(ctx, "property:3", map[string]interface{}{ + "title": "Beachfront Bungalow", + "features": "beachfront,sun deck", + "description": "", + }) + + res, err := client.FTSearchWithArgs(ctx, "idx1", "@features:{\"\"}", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:2")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "-@features:{\"\"}", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1")) + Expect(res.Docs[1].ID).To(BeEquivalentTo("property:3")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "@description:''", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:3")) + + res, err = client.FTSearchWithArgs(ctx, "idx1", "-@description:''", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1")) + Expect(res.Docs[1].ID).To(BeEquivalentTo("property:2")) + }) }) +func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []string) { + ids := make([]string, len(result.Docs)) + for i, doc := range result.Docs { + ids[i] = doc.ID + } + Expect(ids).To(ConsistOf(expectedDocIDs)) + Expect(result.Total).To(BeEquivalentTo(len(expectedDocIDs))) +} + // It("should FTProfile Search and Aggregate", Label("search", "ftprofile"), func() { // val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "t", FieldType: redis.SearchFieldTypeText}).Result() // Expect(err).NotTo(HaveOccurred()) diff --git a/timeseries_commands.go b/timeseries_commands.go index 6f1b2fa4..82d8cdfc 100644 --- a/timeseries_commands.go +++ b/timeseries_commands.go @@ -40,25 +40,32 @@ type TimeseriesCmdable interface { } type TSOptions struct { - Retention int - ChunkSize int - Encoding string - DuplicatePolicy string - Labels map[string]string + Retention int + ChunkSize int + Encoding string + DuplicatePolicy string + Labels map[string]string + IgnoreMaxTimeDiff int64 + IgnoreMaxValDiff float64 } type TSIncrDecrOptions struct { - Timestamp int64 - Retention int - ChunkSize int - Uncompressed bool - Labels map[string]string + Timestamp int64 + Retention int + ChunkSize int + Uncompressed bool + DuplicatePolicy string + Labels map[string]string + IgnoreMaxTimeDiff int64 + IgnoreMaxValDiff float64 } type TSAlterOptions struct { - Retention int - ChunkSize int - DuplicatePolicy string - Labels map[string]string + Retention int + ChunkSize int + DuplicatePolicy string + Labels map[string]string + IgnoreMaxTimeDiff int64 + IgnoreMaxValDiff float64 } type TSCreateRuleOptions struct { @@ -223,6 +230,9 @@ func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interf args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) @@ -264,6 +274,9 @@ func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOp args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewStatusCmd(ctx, args...) _ = c(ctx, cmd) @@ -292,6 +305,9 @@ func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOption args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewStatusCmd(ctx, args...) _ = c(ctx, cmd) @@ -351,12 +367,18 @@ func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp flo if options.Uncompressed { args = append(args, "UNCOMPRESSED") } + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } if options.Labels != nil { args = append(args, "LABELS") for label, value := range options.Labels { args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) @@ -391,12 +413,18 @@ func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp flo if options.Uncompressed { args = append(args, "UNCOMPRESSED") } + if options.DuplicatePolicy != "" { + args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy) + } if options.Labels != nil { args = append(args, "LABELS") for label, value := range options.Labels { args = append(args, label, value) } } + if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 { + args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff) + } } cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) diff --git a/timeseries_commands_test.go b/timeseries_commands_test.go index 563f24e7..c62367a7 100644 --- a/timeseries_commands_test.go +++ b/timeseries_commands_test.go @@ -23,7 +23,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { Expect(client.Close()).NotTo(HaveOccurred()) }) - It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() { + It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() { result, err := client.TSCreate(ctx, "1").Result() Expect(err).NotTo(HaveOccurred()) Expect(result).To(BeEquivalentTo("OK")) @@ -62,10 +62,60 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultInfo, err = client.TSInfo(ctx, keyName).Result() Expect(err).NotTo(HaveOccurred()) Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup)) - } + // Test insertion filters + opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, DuplicatePolicy: "LAST", IgnoreMaxValDiff: 10.0} + result, err = client.TSCreateWithArgs(ctx, "ts-if-1", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1000)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1010)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1010)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1020, 11.5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1020)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1021, 22.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1021)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1021).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(4)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{ + {Timestamp: 1000, Value: 1.0}, + {Timestamp: 1010, Value: 11.0}, + {Timestamp: 1020, Value: 11.5}, + {Timestamp: 1021, Value: 22.0}})) + // Test insertion filters with other duplicate policy + opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0} + result, err = client.TSCreateWithArgs(ctx, "ts-if-2", opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo("OK")) + resultAdd1, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd1).To(BeEquivalentTo(1000)) + resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd1).To(BeEquivalentTo(1010)) + resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd1).To(BeEquivalentTo(1013)) + + rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1013).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(3)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{ + {Timestamp: 1000, Value: 1.0}, + {Timestamp: 1010, Value: 11.0}, + {Timestamp: 1013, Value: 10.0}})) }) - It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() { + It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs", "NonRedisEnterprise"), func() { result, err := client.TSAdd(ctx, "1", 1, 1).Result() Expect(err).NotTo(HaveOccurred()) Expect(result).To(BeEquivalentTo(1)) @@ -138,9 +188,23 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultGet, err = client.TSGet(ctx, "tsami-1").Result() Expect(err).NotTo(HaveOccurred()) Expect(resultGet.Value).To(BeEquivalentTo(5)) + // Insertion filters + opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1000, 1.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1000)) + + result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1004, 3.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(1000)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}})) }) - It("should TSAlter", Label("timeseries", "tsalter"), func() { + It("should TSAlter", Label("timeseries", "tsalter", "NonRedisEnterprise"), func() { result, err := client.TSCreate(ctx, "1").Result() Expect(err).NotTo(HaveOccurred()) Expect(result).To(BeEquivalentTo("OK")) @@ -179,6 +243,33 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultInfo, err = client.TSInfo(ctx, "1").Result() Expect(err).NotTo(HaveOccurred()) Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min")) + // Test insertion filters + resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1000)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1010)) + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1013)) + + alterOpt := &redis.TSAlterOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + resultAlter, err = client.TSAlter(ctx, "ts-if-1", alterOpt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAlter).To(BeEquivalentTo("OK")) + + resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1015, 11.5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeEquivalentTo(1013)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1013).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(3)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{ + {Timestamp: 1000, Value: 1.0}, + {Timestamp: 1010, Value: 11.0}, + {Timestamp: 1013, Value: 10.0}})) }) It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() { @@ -216,7 +307,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{})) }) - It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() { + It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() { for i := 0; i < 100; i++ { _, err := client.TSIncrBy(ctx, "1", 1).Result() Expect(err).NotTo(HaveOccurred()) @@ -277,6 +368,54 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() { resultInfo, err = client.TSInfo(ctx, "4").Result() Expect(err).NotTo(HaveOccurred()) Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128)) + + // Test insertion filters INCRBY + opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + res, err := client.TSIncrByWithArgs(ctx, "ts-if-1", 1.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}})) + + res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 11.1}})) + + // Test insertion filters DECRBY + opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"} + res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 1.0, opt).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -1.0}})) + + res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(BeEquivalentTo(1000)) + + rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rangePoints)).To(BeEquivalentTo(1)) + Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -11.1}})) }) It("should TSGet", Label("timeseries", "tsget"), func() {