Merge branch 'master' into conn-checker

This commit is contained in:
Kent Wang 2024-07-17 17:12:21 +08:00 committed by GitHub
commit 550b25c20e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 603 additions and 59 deletions

View File

@ -23,4 +23,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6

View File

@ -31,7 +31,7 @@ build:
testdata/redis:
mkdir -p $@
wget -qO- https://download.redis.io/releases/redis-7.4-rc1.tar.gz | tar xvz --strip-components=1 -C $@
wget -qO- https://download.redis.io/releases/redis-7.4-rc2.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
cd $< && make all

View File

@ -2486,35 +2486,40 @@ var _ = Describe("Commands", func() {
})
It("should HExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
res, err := client.HExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result()
res, err := client.HExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err = client.HExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result()
res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result()
res, err := client.HPExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HPExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result()
res, err = client.HPExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})
It("should HExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
resEmpty, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
@ -2526,9 +2531,10 @@ var _ = Describe("Commands", func() {
})
It("should HPExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
resEmpty, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
@ -2540,9 +2546,10 @@ var _ = Describe("Commands", func() {
})
It("should HPersist", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
@ -2552,7 +2559,7 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{-1, -1, -2}))
res, err = client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
@ -2562,15 +2569,16 @@ var _ = Describe("Commands", func() {
})
It("should HExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
@ -2580,9 +2588,10 @@ var _ = Describe("Commands", func() {
})
It("should HPExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
@ -2599,15 +2608,16 @@ var _ = Describe("Commands", func() {
})
It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))
@ -2617,15 +2627,16 @@ var _ = Describe("Commands", func() {
})
It("should HPTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))

View File

@ -23,6 +23,19 @@ type HashCmdable interface {
HVals(ctx context.Context, key string) *StringSliceCmd
HRandField(ctx context.Context, key string, count int) *StringSliceCmd
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd
HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd
HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd
HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd
HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd
HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd
HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd
HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd
HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd
}
func (c cmdable) HDel(ctx context.Context, key string, fields ...string) *IntCmd {
@ -202,7 +215,7 @@ type HExpireArgs struct {
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, expiration, "FIELDS", len(fields)}
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration), "FIELDS", len(fields)}
for _, field := range fields {
args = append(args, field)
@ -217,7 +230,7 @@ func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Durati
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, expiration}
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration)}
// only if one argument is true, we can add it to the args
// if more than one argument is true, it will cause an error

View File

@ -3,6 +3,7 @@
package pool
import (
"crypto/tls"
"errors"
"io"
"net"
@ -16,6 +17,10 @@ func connCheck(conn net.Conn) error {
// Reset previous timeout.
_ = conn.SetDeadline(time.Time{})
// Check if tls.Conn.
if c, ok := conn.(*tls.Conn); ok {
conn = c.NetConn()
}
sysConn, ok := conn.(syscall.Conn)
if !ok {
return nil

View File

@ -3,6 +3,7 @@
package pool
import (
"crypto/tls"
"net"
"net/http/httptest"
"time"
@ -14,12 +15,17 @@ import (
var _ = Describe("tests conn_check with real conns", func() {
var ts *httptest.Server
var conn net.Conn
var tlsConn *tls.Conn
var err error
BeforeEach(func() {
ts = httptest.NewServer(nil)
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
Expect(err).NotTo(HaveOccurred())
tlsTestServer := httptest.NewUnstartedServer(nil)
tlsTestServer.StartTLS()
tlsConn, err = tls.DialWithDialer(&net.Dialer{Timeout: time.Second}, tlsTestServer.Listener.Addr().Network(), tlsTestServer.Listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
@ -33,11 +39,23 @@ var _ = Describe("tests conn_check with real conns", func() {
Expect(connCheck(conn)).To(HaveOccurred())
})
It("good tls conn check", func() {
Expect(connCheck(tlsConn)).NotTo(HaveOccurred())
Expect(tlsConn.Close()).NotTo(HaveOccurred())
Expect(connCheck(tlsConn)).To(HaveOccurred())
})
It("bad conn check", func() {
Expect(conn.Close()).NotTo(HaveOccurred())
Expect(connCheck(conn)).To(HaveOccurred())
})
It("bad tls conn check", func() {
Expect(tlsConn.Close()).NotTo(HaveOccurred())
Expect(connCheck(tlsConn)).To(HaveOccurred())
})
It("check conn deadline", func() {
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
time.Sleep(time.Millisecond * 10)

View File

@ -341,6 +341,8 @@ func (n *clusterNode) Close() error {
return n.Client.Close()
}
const maximumNodeLatency = 1 * time.Minute
func (n *clusterNode) updateLatency() {
const numProbe = 10
var dur uint64
@ -361,7 +363,7 @@ func (n *clusterNode) updateLatency() {
if successes == 0 {
// If none of the pings worked, set latency to some arbitrarily high value so this node gets
// least priority.
latency = float64((1 * time.Minute) / time.Microsecond)
latency = float64((maximumNodeLatency) / time.Microsecond)
} else {
latency = float64(dur) / float64(successes)
}
@ -735,20 +737,40 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
return c.nodes.Random()
}
var node *clusterNode
var allNodesFailing = true
var (
closestNonFailingNode *clusterNode
closestNode *clusterNode
minLatency time.Duration
)
// setting the max possible duration as zerovalue for minlatency
minLatency = time.Duration(math.MaxInt64)
for _, n := range nodes {
if n.Failing() {
continue
if closestNode == nil || n.Latency() < minLatency {
closestNode = n
minLatency = n.Latency()
if !n.Failing() {
closestNonFailingNode = n
allNodesFailing = false
}
}
if node == nil || n.Latency() < node.Latency() {
node = n
}
}
if node != nil {
return node, nil
}
// If all nodes are failing - return random node
// pick the healthly node with the lowest latency
if !allNodesFailing && closestNonFailingNode != nil {
return closestNonFailingNode, nil
}
// if all nodes are failing, we will pick the temporarily failing node with lowest latency
if minLatency < maximumNodeLatency && closestNode != nil {
internal.Logger.Printf(context.TODO(), "redis: all nodes are marked as failed, picking the temporarily failing node with lowest latency")
return closestNode, nil
}
// If all nodes are having the maximum latency(all pings are failing) - return a random node across the cluster
internal.Logger.Printf(context.TODO(), "redis: pings to all nodes are failing, picking a random node across the cluster")
return c.nodes.Random()
}
@ -916,10 +938,13 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(ctx, cmd)
var node *clusterNode
var moved bool
var ask bool
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
// MOVED and ASK responses are not transient errors that require retry delay; they
// should be attempted immediately.
if attempt > 0 && !moved && !ask {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return err
}
@ -963,7 +988,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
continue
}
var moved bool
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {

View File

@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("follows node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions()
opt.MinRetryBackoff = 10 * time.Minute
opt.MaxRetryBackoff = 20 * time.Minute
client := cluster.newClusterClient(ctx, opt)
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
// Note that this context sets a deadline more aggressive than the lowest possible bound
// of the retry backoff; this verifies that redirection completes immediately.
redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := client.Set(redirCtx, "A", "VALUE", 0).Err()
Expect(err).NotTo(HaveOccurred())
v, err := client.Get(redirCtx, "A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal("VALUE"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("calls fn for every master node", func() {
for i := 0; i < 10; i++ {
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())

View File

@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er
}
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
})
}

View File

@ -75,6 +75,8 @@ type FieldSchema struct {
WithSuffixtrie bool
VectorArgs *FTVectorArgs
GeoShapeFieldType string
IndexEmpty bool
IndexMissing bool
}
type FTVectorArgs struct {
@ -1002,6 +1004,13 @@ func (c cmdable) FTCreate(ctx context.Context, index string, options *FTCreateOp
if schema.WithSuffixtrie {
args = append(args, "WITHSUFFIXTRIE")
}
if schema.IndexEmpty {
args = append(args, "INDEXEMPTY")
}
if schema.IndexMissing {
args = append(args, "INDEXMISSING")
}
}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)

View File

@ -1017,6 +1017,110 @@ var _ = Describe("RediSearch commands", Label("search"), func() {
Expect(res.Attributes[0].WithSuffixtrie).To(BeTrue())
})
It("should test dialect 4", Label("search", "ftcreate", "ftsearch", "NonRedisEnterprise"), func() {
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{
Prefix: []interface{}{"resource:"},
}, &redis.FieldSchema{
FieldName: "uuid",
FieldType: redis.SearchFieldTypeTag,
}, &redis.FieldSchema{
FieldName: "tags",
FieldType: redis.SearchFieldTypeTag,
}, &redis.FieldSchema{
FieldName: "description",
FieldType: redis.SearchFieldTypeText,
}, &redis.FieldSchema{
FieldName: "rating",
FieldType: redis.SearchFieldTypeNumeric,
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
client.HSet(ctx, "resource:1", map[string]interface{}{
"uuid": "123e4567-e89b-12d3-a456-426614174000",
"tags": "finance|crypto|$btc|blockchain",
"description": "Analysis of blockchain technologies & Bitcoin's potential.",
"rating": 5,
})
client.HSet(ctx, "resource:2", map[string]interface{}{
"uuid": "987e6543-e21c-12d3-a456-426614174999",
"tags": "health|well-being|fitness|new-year's-resolutions",
"description": "Health trends for the new year, including fitness regimes.",
"rating": 4,
})
res, err := client.FTSearchWithArgs(ctx, "idx1", "@uuid:{$uuid}",
&redis.FTSearchOptions{
DialectVersion: 2,
Params: map[string]interface{}{"uuid": "123e4567-e89b-12d3-a456-426614174000"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Total).To(BeEquivalentTo(int64(1)))
Expect(res.Docs[0].ID).To(BeEquivalentTo("resource:1"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "@uuid:{$uuid}",
&redis.FTSearchOptions{
DialectVersion: 4,
Params: map[string]interface{}{"uuid": "123e4567-e89b-12d3-a456-426614174000"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Total).To(BeEquivalentTo(int64(1)))
Expect(res.Docs[0].ID).To(BeEquivalentTo("resource:1"))
client.HSet(ctx, "test:1", map[string]interface{}{
"uuid": "3d3586fe-0416-4572-8ce",
"email": "adriano@acme.com.ie",
"num": 5,
})
// Create the index
ftCreateOptions := &redis.FTCreateOptions{
Prefix: []interface{}{"test:"},
}
schema := []*redis.FieldSchema{
{
FieldName: "uuid",
FieldType: redis.SearchFieldTypeTag,
},
{
FieldName: "email",
FieldType: redis.SearchFieldTypeTag,
},
{
FieldName: "num",
FieldType: redis.SearchFieldTypeNumeric,
},
}
val, err = client.FTCreate(ctx, "idx_hash", ftCreateOptions, schema...).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("OK"))
ftSearchOptions := &redis.FTSearchOptions{
DialectVersion: 4,
Params: map[string]interface{}{
"uuid": "3d3586fe-0416-4572-8ce",
"email": "adriano@acme.com.ie",
},
}
res, err = client.FTSearchWithArgs(ctx, "idx_hash", "@uuid:{$uuid}", ftSearchOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("test:1"))
Expect(res.Docs[0].Fields["uuid"]).To(BeEquivalentTo("3d3586fe-0416-4572-8ce"))
res, err = client.FTSearchWithArgs(ctx, "idx_hash", "@email:{$email}", ftSearchOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("test:1"))
Expect(res.Docs[0].Fields["email"]).To(BeEquivalentTo("adriano@acme.com.ie"))
ftSearchOptions.Params = map[string]interface{}{"num": 5}
res, err = client.FTSearchWithArgs(ctx, "idx_hash", "@num:[5]", ftSearchOptions).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("test:1"))
Expect(res.Docs[0].Fields["num"]).To(BeEquivalentTo("5"))
})
It("should FTCreate GeoShape", Label("search", "ftcreate", "ftsearch"), func() {
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "geom", FieldType: redis.SearchFieldTypeGeoShape, GeoShapeFieldType: "FLAT"}).Result()
Expect(err).NotTo(HaveOccurred())
@ -1043,8 +1147,175 @@ var _ = Describe("RediSearch commands", Label("search"), func() {
Expect(err).NotTo(HaveOccurred())
Expect(res2.Total).To(BeEquivalentTo(int64(2)))
})
It("should test geoshapes query intersects and disjoint", Label("NonRedisEnterprise"), func() {
_, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "g",
FieldType: redis.SearchFieldTypeGeoShape,
GeoShapeFieldType: "FLAT",
}).Result()
Expect(err).NotTo(HaveOccurred())
client.HSet(ctx, "doc_point1", "g", "POINT (10 10)")
client.HSet(ctx, "doc_point2", "g", "POINT (50 50)")
client.HSet(ctx, "doc_polygon1", "g", "POLYGON ((20 20, 25 35, 35 25, 20 20))")
client.HSet(ctx, "doc_polygon2", "g", "POLYGON ((60 60, 65 75, 70 70, 65 55, 60 60))")
intersection, err := client.FTSearchWithArgs(ctx, "idx1", "@g:[intersects $shape]",
&redis.FTSearchOptions{
DialectVersion: 3,
Params: map[string]interface{}{"shape": "POLYGON((15 15, 75 15, 50 70, 20 40, 15 15))"},
}).Result()
Expect(err).NotTo(HaveOccurred())
_assert_geosearch_result(&intersection, []string{"doc_point2", "doc_polygon1"})
disjunction, err := client.FTSearchWithArgs(ctx, "idx1", "@g:[disjoint $shape]",
&redis.FTSearchOptions{
DialectVersion: 3,
Params: map[string]interface{}{"shape": "POLYGON((15 15, 75 15, 50 70, 20 40, 15 15))"},
}).Result()
Expect(err).NotTo(HaveOccurred())
_assert_geosearch_result(&disjunction, []string{"doc_point1", "doc_polygon2"})
})
It("should test geoshapes query contains and within", func() {
_, err := client.FTCreate(ctx, "idx2", &redis.FTCreateOptions{}, &redis.FieldSchema{
FieldName: "g",
FieldType: redis.SearchFieldTypeGeoShape,
GeoShapeFieldType: "FLAT",
}).Result()
Expect(err).NotTo(HaveOccurred())
client.HSet(ctx, "doc_point1", "g", "POINT (10 10)")
client.HSet(ctx, "doc_point2", "g", "POINT (50 50)")
client.HSet(ctx, "doc_polygon1", "g", "POLYGON ((20 20, 25 35, 35 25, 20 20))")
client.HSet(ctx, "doc_polygon2", "g", "POLYGON ((60 60, 65 75, 70 70, 65 55, 60 60))")
containsA, err := client.FTSearchWithArgs(ctx, "idx2", "@g:[contains $shape]",
&redis.FTSearchOptions{
DialectVersion: 3,
Params: map[string]interface{}{"shape": "POINT(25 25)"},
}).Result()
Expect(err).NotTo(HaveOccurred())
_assert_geosearch_result(&containsA, []string{"doc_polygon1"})
containsB, err := client.FTSearchWithArgs(ctx, "idx2", "@g:[contains $shape]",
&redis.FTSearchOptions{
DialectVersion: 3,
Params: map[string]interface{}{"shape": "POLYGON((24 24, 24 26, 25 25, 24 24))"},
}).Result()
Expect(err).NotTo(HaveOccurred())
_assert_geosearch_result(&containsB, []string{"doc_polygon1"})
within, err := client.FTSearchWithArgs(ctx, "idx2", "@g:[within $shape]",
&redis.FTSearchOptions{
DialectVersion: 3,
Params: map[string]interface{}{"shape": "POLYGON((15 15, 75 15, 50 70, 20 40, 15 15))"},
}).Result()
Expect(err).NotTo(HaveOccurred())
_assert_geosearch_result(&within, []string{"doc_point2", "doc_polygon1"})
})
It("should search missing fields", Label("search", "ftcreate", "ftsearch", "NonRedisEnterprise"), func() {
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{Prefix: []interface{}{"property:"}},
&redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: true},
&redis.FieldSchema{FieldName: "features", FieldType: redis.SearchFieldTypeTag, IndexMissing: true},
&redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, IndexMissing: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
client.HSet(ctx, "property:1", map[string]interface{}{
"title": "Luxury Villa in Malibu",
"features": "pool,sea view,modern",
"description": "A stunning modern villa overlooking the Pacific Ocean.",
})
client.HSet(ctx, "property:2", map[string]interface{}{
"title": "Downtown Flat",
"description": "Modern flat in central Paris with easy access to metro.",
})
client.HSet(ctx, "property:3", map[string]interface{}{
"title": "Beachfront Bungalow",
"features": "beachfront,sun deck",
})
res, err := client.FTSearchWithArgs(ctx, "idx1", "ismissing(@features)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:2"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "-ismissing(@features)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1"))
Expect(res.Docs[1].ID).To(BeEquivalentTo("property:3"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "ismissing(@description)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:3"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "-ismissing(@description)", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1"))
Expect(res.Docs[1].ID).To(BeEquivalentTo("property:2"))
})
It("should search empty fields", Label("search", "ftcreate", "ftsearch", "NonRedisEnterprise"), func() {
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{Prefix: []interface{}{"property:"}},
&redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: true},
&redis.FieldSchema{FieldName: "features", FieldType: redis.SearchFieldTypeTag, IndexEmpty: true},
&redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, IndexEmpty: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
client.HSet(ctx, "property:1", map[string]interface{}{
"title": "Luxury Villa in Malibu",
"features": "pool,sea view,modern",
"description": "A stunning modern villa overlooking the Pacific Ocean.",
})
client.HSet(ctx, "property:2", map[string]interface{}{
"title": "Downtown Flat",
"features": "",
"description": "Modern flat in central Paris with easy access to metro.",
})
client.HSet(ctx, "property:3", map[string]interface{}{
"title": "Beachfront Bungalow",
"features": "beachfront,sun deck",
"description": "",
})
res, err := client.FTSearchWithArgs(ctx, "idx1", "@features:{\"\"}", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:2"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "-@features:{\"\"}", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1"))
Expect(res.Docs[1].ID).To(BeEquivalentTo("property:3"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "@description:''", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:3"))
res, err = client.FTSearchWithArgs(ctx, "idx1", "-@description:''", &redis.FTSearchOptions{DialectVersion: 4, Return: []redis.FTSearchReturn{{FieldName: "id"}}, NoContent: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res.Docs[0].ID).To(BeEquivalentTo("property:1"))
Expect(res.Docs[1].ID).To(BeEquivalentTo("property:2"))
})
})
func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []string) {
ids := make([]string, len(result.Docs))
for i, doc := range result.Docs {
ids[i] = doc.ID
}
Expect(ids).To(ConsistOf(expectedDocIDs))
Expect(result.Total).To(BeEquivalentTo(len(expectedDocIDs)))
}
// It("should FTProfile Search and Aggregate", Label("search", "ftprofile"), func() {
// val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "t", FieldType: redis.SearchFieldTypeText}).Result()
// Expect(err).NotTo(HaveOccurred())

View File

@ -40,25 +40,32 @@ type TimeseriesCmdable interface {
}
type TSOptions struct {
Retention int
ChunkSize int
Encoding string
DuplicatePolicy string
Labels map[string]string
Retention int
ChunkSize int
Encoding string
DuplicatePolicy string
Labels map[string]string
IgnoreMaxTimeDiff int64
IgnoreMaxValDiff float64
}
type TSIncrDecrOptions struct {
Timestamp int64
Retention int
ChunkSize int
Uncompressed bool
Labels map[string]string
Timestamp int64
Retention int
ChunkSize int
Uncompressed bool
DuplicatePolicy string
Labels map[string]string
IgnoreMaxTimeDiff int64
IgnoreMaxValDiff float64
}
type TSAlterOptions struct {
Retention int
ChunkSize int
DuplicatePolicy string
Labels map[string]string
Retention int
ChunkSize int
DuplicatePolicy string
Labels map[string]string
IgnoreMaxTimeDiff int64
IgnoreMaxValDiff float64
}
type TSCreateRuleOptions struct {
@ -223,6 +230,9 @@ func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interf
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
@ -264,6 +274,9 @@ func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOp
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
@ -292,6 +305,9 @@ func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOption
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
@ -351,12 +367,18 @@ func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp flo
if options.Uncompressed {
args = append(args, "UNCOMPRESSED")
}
if options.DuplicatePolicy != "" {
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
}
if options.Labels != nil {
args = append(args, "LABELS")
for label, value := range options.Labels {
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
@ -391,12 +413,18 @@ func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp flo
if options.Uncompressed {
args = append(args, "UNCOMPRESSED")
}
if options.DuplicatePolicy != "" {
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
}
if options.Labels != nil {
args = append(args, "LABELS")
for label, value := range options.Labels {
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)

View File

@ -23,7 +23,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() {
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
result, err := client.TSCreate(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
@ -62,10 +62,60 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, keyName).Result()
Expect(err).NotTo(HaveOccurred())
Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup))
}
// Test insertion filters
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, DuplicatePolicy: "LAST", IgnoreMaxValDiff: 10.0}
result, err = client.TSCreateWithArgs(ctx, "ts-if-1", opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1000))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1010))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1010))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1020, 11.5).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1020))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1021, 22.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1021))
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1021).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(4))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
{Timestamp: 1000, Value: 1.0},
{Timestamp: 1010, Value: 11.0},
{Timestamp: 1020, Value: 11.5},
{Timestamp: 1021, Value: 22.0}}))
// Test insertion filters with other duplicate policy
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0}
result, err = client.TSCreateWithArgs(ctx, "ts-if-2", opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
resultAdd1, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd1).To(BeEquivalentTo(1000))
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd1).To(BeEquivalentTo(1010))
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd1).To(BeEquivalentTo(1013))
rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(3))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
{Timestamp: 1000, Value: 1.0},
{Timestamp: 1010, Value: 11.0},
{Timestamp: 1013, Value: 10.0}}))
})
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() {
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs", "NonRedisEnterprise"), func() {
result, err := client.TSAdd(ctx, "1", 1, 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo(1))
@ -138,9 +188,23 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultGet, err = client.TSGet(ctx, "tsami-1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultGet.Value).To(BeEquivalentTo(5))
// Insertion filters
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1000, 1.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo(1000))
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1004, 3.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo(1000))
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
})
It("should TSAlter", Label("timeseries", "tsalter"), func() {
It("should TSAlter", Label("timeseries", "tsalter", "NonRedisEnterprise"), func() {
result, err := client.TSCreate(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
@ -179,6 +243,33 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min"))
// Test insertion filters
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1000))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1010))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1013))
alterOpt := &redis.TSAlterOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
resultAlter, err = client.TSAlter(ctx, "ts-if-1", alterOpt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAlter).To(BeEquivalentTo("OK"))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1015, 11.5).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1013))
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(3))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
{Timestamp: 1000, Value: 1.0},
{Timestamp: 1010, Value: 11.0},
{Timestamp: 1013, Value: 10.0}}))
})
It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() {
@ -216,7 +307,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
})
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() {
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
for i := 0; i < 100; i++ {
_, err := client.TSIncrBy(ctx, "1", 1).Result()
Expect(err).NotTo(HaveOccurred())
@ -277,6 +368,54 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "4").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
// Test insertion filters INCRBY
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
res, err := client.TSIncrByWithArgs(ctx, "ts-if-1", 1.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))
res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))
rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 11.1}}))
// Test insertion filters DECRBY
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 1.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))
rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -1.0}}))
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))
rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -11.1}}))
})
It("should TSGet", Label("timeseries", "tsget"), func() {