mirror of https://github.com/go-redis/redis.git
Merge branch 'master' into apply-make-fmt
This commit is contained in:
commit
642e2cb41f
|
@ -57,4 +57,5 @@ url
|
|||
variadic
|
||||
RedisStack
|
||||
RedisGears
|
||||
RedisTimeseries
|
||||
RedisTimeseries
|
||||
RediSearch
|
||||
|
|
|
@ -23,4 +23,4 @@ jobs:
|
|||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@v4
|
||||
uses: golangci/golangci-lint-action@v6
|
||||
|
|
|
@ -8,7 +8,7 @@ jobs:
|
|||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Check Spelling
|
||||
uses: rojopolis/spellcheck-github-actions@0.36.0
|
||||
uses: rojopolis/spellcheck-github-actions@0.38.0
|
||||
with:
|
||||
config_path: .github/spellcheck-settings.yml
|
||||
task_name: Markdown
|
||||
|
|
2
Makefile
2
Makefile
|
@ -31,7 +31,7 @@ build:
|
|||
|
||||
testdata/redis:
|
||||
mkdir -p $@
|
||||
wget -qO- https://download.redis.io/releases/redis-7.2.1.tar.gz | tar xvz --strip-components=1 -C $@
|
||||
wget -qO- https://download.redis.io/releases/redis-7.4-rc2.tar.gz | tar xvz --strip-components=1 -C $@
|
||||
|
||||
testdata/redis/src/redis-server: testdata/redis
|
||||
cd $< && make all
|
||||
|
|
66
command.go
66
command.go
|
@ -573,6 +573,10 @@ func (cmd *StatusCmd) Result() (string, error) {
|
|||
return cmd.val, cmd.err
|
||||
}
|
||||
|
||||
func (cmd *StatusCmd) Bytes() ([]byte, error) {
|
||||
return util.StringToBytes(cmd.val), cmd.err
|
||||
}
|
||||
|
||||
func (cmd *StatusCmd) String() string {
|
||||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
@ -3783,6 +3787,65 @@ func (cmd *MapStringStringSliceCmd) readReply(rd *proto.Reader) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// MapStringInterfaceCmd represents a command that returns a map of strings to interface{}.
|
||||
type MapMapStringInterfaceCmd struct {
|
||||
baseCmd
|
||||
val map[string]interface{}
|
||||
}
|
||||
|
||||
func NewMapMapStringInterfaceCmd(ctx context.Context, args ...interface{}) *MapMapStringInterfaceCmd {
|
||||
return &MapMapStringInterfaceCmd{
|
||||
baseCmd: baseCmd{
|
||||
ctx: ctx,
|
||||
args: args,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd *MapMapStringInterfaceCmd) String() string {
|
||||
return cmdString(cmd, cmd.val)
|
||||
}
|
||||
|
||||
func (cmd *MapMapStringInterfaceCmd) SetVal(val map[string]interface{}) {
|
||||
cmd.val = val
|
||||
}
|
||||
|
||||
func (cmd *MapMapStringInterfaceCmd) Result() (map[string]interface{}, error) {
|
||||
return cmd.val, cmd.err
|
||||
}
|
||||
|
||||
func (cmd *MapMapStringInterfaceCmd) Val() map[string]interface{} {
|
||||
return cmd.val
|
||||
}
|
||||
|
||||
func (cmd *MapMapStringInterfaceCmd) readReply(rd *proto.Reader) (err error) {
|
||||
n, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data := make(map[string]interface{}, n/2)
|
||||
for i := 0; i < n; i += 2 {
|
||||
_, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
cmd.err = err
|
||||
}
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
cmd.err = err
|
||||
}
|
||||
value, err := rd.ReadString()
|
||||
if err != nil {
|
||||
cmd.err = err
|
||||
}
|
||||
data[key] = value
|
||||
}
|
||||
|
||||
cmd.val = data
|
||||
return nil
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------
|
||||
|
||||
type MapStringInterfaceSliceCmd struct {
|
||||
|
@ -4997,6 +5060,7 @@ type ClientInfo struct {
|
|||
PSub int // number of pattern matching subscriptions
|
||||
SSub int // redis version 7.0.3, number of shard channel subscriptions
|
||||
Multi int // number of commands in a MULTI/EXEC context
|
||||
Watch int // redis version 7.4 RC1, number of keys this client is currently watching.
|
||||
QueryBuf int // qbuf, query buffer length (0 means no query pending)
|
||||
QueryBufFree int // qbuf-free, free space of the query buffer (0 means the buffer is full)
|
||||
ArgvMem int // incomplete arguments for the next command (already extracted from query buffer)
|
||||
|
@ -5149,6 +5213,8 @@ func parseClientInfo(txt string) (info *ClientInfo, err error) {
|
|||
info.SSub, err = strconv.Atoi(val)
|
||||
case "multi":
|
||||
info.Multi, err = strconv.Atoi(val)
|
||||
case "watch":
|
||||
info.Watch, err = strconv.Atoi(val)
|
||||
case "qbuf":
|
||||
info.QueryBuf, err = strconv.Atoi(val)
|
||||
case "qbuf-free":
|
||||
|
|
|
@ -220,6 +220,7 @@ type Cmdable interface {
|
|||
ProbabilisticCmdable
|
||||
PubSubCmdable
|
||||
ScriptingFunctionsCmdable
|
||||
SearchCmdable
|
||||
SetCmdable
|
||||
SortedSetCmdable
|
||||
StringCmdable
|
||||
|
|
286
commands_test.go
286
commands_test.go
|
@ -193,6 +193,40 @@ var _ = Describe("Commands", func() {
|
|||
Expect(r.Val()).To(Equal(int64(0)))
|
||||
})
|
||||
|
||||
It("should ClientKillByFilter with MAXAGE", Label("NonRedisEnterprise"), func() {
|
||||
var s []string
|
||||
started := make(chan bool)
|
||||
done := make(chan bool)
|
||||
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
||||
started <- true
|
||||
blpop := client.BLPop(ctx, 0, "list")
|
||||
Expect(blpop.Val()).To(Equal(s))
|
||||
done <- true
|
||||
}()
|
||||
<-started
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
Fail("BLPOP is not blocked.")
|
||||
case <-time.After(2 * time.Second):
|
||||
// ok
|
||||
}
|
||||
|
||||
killed := client.ClientKillByFilter(ctx, "MAXAGE", "1")
|
||||
Expect(killed.Err()).NotTo(HaveOccurred())
|
||||
Expect(killed.Val()).To(SatisfyAny(Equal(int64(2)), Equal(int64(3))))
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// ok
|
||||
case <-time.After(time.Second):
|
||||
Fail("BLPOP is still blocked.")
|
||||
}
|
||||
})
|
||||
|
||||
It("should ClientID", func() {
|
||||
err := client.ClientID(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
@ -1099,6 +1133,26 @@ var _ = Describe("Commands", func() {
|
|||
|
||||
keys, cursor, err := client.HScan(ctx, "myhash", 0, "", 0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// If we don't get at least two items back, it's really strange.
|
||||
Expect(cursor).To(BeNumerically(">=", 2))
|
||||
Expect(len(keys)).To(BeNumerically(">=", 2))
|
||||
Expect(keys[0]).To(HavePrefix("key"))
|
||||
Expect(keys[1]).To(Equal("hello"))
|
||||
})
|
||||
|
||||
It("should HScan without values", Label("NonRedisEnterprise"), func() {
|
||||
for i := 0; i < 1000; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
keys, cursor, err := client.HScanNoValues(ctx, "myhash", 0, "", 0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// If we don't get at least two items back, it's really strange.
|
||||
Expect(cursor).To(BeNumerically(">=", 2))
|
||||
Expect(len(keys)).To(BeNumerically(">=", 2))
|
||||
Expect(keys[0]).To(HavePrefix("key"))
|
||||
Expect(keys[1]).To(HavePrefix("key"))
|
||||
Expect(keys).NotTo(BeEmpty())
|
||||
Expect(cursor).NotTo(BeZero())
|
||||
})
|
||||
|
@ -2429,6 +2483,166 @@ var _ = Describe("Commands", func() {
|
|||
Equal([]redis.KeyValue{{Key: "key2", Value: "hello2"}}),
|
||||
))
|
||||
})
|
||||
|
||||
It("should HExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
res, err := client.HExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, 1, -2}))
|
||||
})
|
||||
|
||||
It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
res, err := client.HPExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err = client.HPExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, 1, -2}))
|
||||
})
|
||||
|
||||
It("should HExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err := client.HExpireAt(ctx, "myhash", time.Now().Add(10*time.Second), "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, 1, -2}))
|
||||
})
|
||||
|
||||
It("should HPExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err := client.HPExpireAt(ctx, "myhash", time.Now().Add(10*time.Second), "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, 1, -2}))
|
||||
})
|
||||
|
||||
It("should HPersist", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err := client.HPersist(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{-1, -1, -2}))
|
||||
|
||||
res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, -2}))
|
||||
|
||||
res, err = client.HPersist(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, -1, -2}))
|
||||
})
|
||||
|
||||
It("should HExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, -2}))
|
||||
|
||||
res, err = client.HExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res[0]).To(BeNumerically("~", time.Now().Add(10*time.Second).Unix(), 1))
|
||||
})
|
||||
|
||||
It("should HPExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
expireAt := time.Now().Add(10 * time.Second)
|
||||
res, err := client.HPExpireAt(ctx, "myhash", expireAt, "key1", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, -2}))
|
||||
|
||||
res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo([]int64{expireAt.UnixMilli(), -1, -2}))
|
||||
})
|
||||
|
||||
It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, -2}))
|
||||
|
||||
res, err = client.HTTL(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{10, -1, -2}))
|
||||
})
|
||||
|
||||
It("should HPTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||
resEmpty, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
|
||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]int64{1, -2}))
|
||||
|
||||
res, err = client.HPTTL(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res[0]).To(BeNumerically("~", 10*time.Second.Milliseconds(), 1))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("hyperloglog", func() {
|
||||
|
@ -5685,6 +5899,78 @@ var _ = Describe("Commands", func() {
|
|||
Expect(err).To(Equal(redis.Nil))
|
||||
})
|
||||
|
||||
It("should XRead LastEntry", Label("NonRedisEnterprise"), func() {
|
||||
res, err := client.XRead(ctx, &redis.XReadArgs{
|
||||
Streams: []string{"stream"},
|
||||
Count: 2, // we expect 1 message
|
||||
ID: "+",
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XStream{
|
||||
{
|
||||
Stream: "stream",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
},
|
||||
},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() {
|
||||
res, err := client.XRead(ctx, &redis.XReadArgs{
|
||||
Streams: []string{"stream", "stream"},
|
||||
ID: "+",
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XStream{
|
||||
{
|
||||
Stream: "stream",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Stream: "stream",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
},
|
||||
},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() {
|
||||
start := time.Now()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
id, err := client.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: "empty",
|
||||
ID: "4-0",
|
||||
Values: map[string]interface{}{"quatro": "quatre"},
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(id).To(Equal("4-0"))
|
||||
}()
|
||||
|
||||
res, err := client.XRead(ctx, &redis.XReadArgs{
|
||||
Streams: []string{"empty"},
|
||||
Block: 500 * time.Millisecond,
|
||||
ID: "+",
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// Ensure that the XRead call with LastEntry option blocked for at least 100ms.
|
||||
Expect(time.Since(start)).To(BeNumerically(">=", 100*time.Millisecond))
|
||||
Expect(res).To(Equal([]redis.XStream{
|
||||
{
|
||||
Stream: "empty",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "4-0", Values: map[string]interface{}{"quatro": "quatre"}},
|
||||
},
|
||||
},
|
||||
}))
|
||||
})
|
||||
|
||||
Describe("group", func() {
|
||||
BeforeEach(func() {
|
||||
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()
|
||||
|
|
|
@ -34,8 +34,8 @@ require (
|
|||
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.22.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
||||
golang.org/x/net v0.20.0 // indirect
|
||||
golang.org/x/sys v0.16.0 // indirect
|
||||
golang.org/x/net v0.23.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20240108191215-35c7eff3a6b1 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect
|
||||
|
|
|
@ -46,10 +46,10 @@ go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40
|
|||
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
|
||||
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
|
||||
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
|
||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
|
||||
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
||||
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
|
||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
278
hash_commands.go
278
hash_commands.go
|
@ -1,6 +1,9 @@
|
|||
package redis
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type HashCmdable interface {
|
||||
HDel(ctx context.Context, key string, fields ...string) *IntCmd
|
||||
|
@ -16,9 +19,23 @@ type HashCmdable interface {
|
|||
HMSet(ctx context.Context, key string, values ...interface{}) *BoolCmd
|
||||
HSetNX(ctx context.Context, key, field string, value interface{}) *BoolCmd
|
||||
HScan(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd
|
||||
HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd
|
||||
HVals(ctx context.Context, key string) *StringSliceCmd
|
||||
HRandField(ctx context.Context, key string, count int) *StringSliceCmd
|
||||
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
|
||||
HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd
|
||||
HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
|
||||
HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd
|
||||
HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
|
||||
HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd
|
||||
HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
|
||||
HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd
|
||||
HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
|
||||
HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd
|
||||
HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd
|
||||
HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd
|
||||
HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd
|
||||
HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd
|
||||
}
|
||||
|
||||
func (c cmdable) HDel(ctx context.Context, key string, fields ...string) *IntCmd {
|
||||
|
@ -172,3 +189,262 @@ func (c cmdable) HScan(ctx context.Context, key string, cursor uint64, match str
|
|||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (c cmdable) HScanNoValues(ctx context.Context, key string, cursor uint64, match string, count int64) *ScanCmd {
|
||||
args := []interface{}{"hscan", key, cursor}
|
||||
if match != "" {
|
||||
args = append(args, "match", match)
|
||||
}
|
||||
if count > 0 {
|
||||
args = append(args, "count", count)
|
||||
}
|
||||
args = append(args, "novalues")
|
||||
cmd := NewScanCmd(ctx, c, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
type HExpireArgs struct {
|
||||
NX bool
|
||||
XX bool
|
||||
GT bool
|
||||
LT bool
|
||||
}
|
||||
|
||||
// HExpire - Sets the expiration time for specified fields in a hash in seconds.
|
||||
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
|
||||
// For more information - https://redis.io/commands/hexpire/
|
||||
func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration), "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HExpire - Sets the expiration time for specified fields in a hash in seconds.
|
||||
// It requires a key, an expiration duration, a struct with boolean flags for conditional expiration settings (NX, XX, GT, LT), and a list of fields.
|
||||
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
|
||||
// For more information - https://redis.io/commands/hexpire/
|
||||
func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration)}
|
||||
|
||||
// only if one argument is true, we can add it to the args
|
||||
// if more than one argument is true, it will cause an error
|
||||
if expirationArgs.NX {
|
||||
args = append(args, "NX")
|
||||
} else if expirationArgs.XX {
|
||||
args = append(args, "XX")
|
||||
} else if expirationArgs.GT {
|
||||
args = append(args, "GT")
|
||||
} else if expirationArgs.LT {
|
||||
args = append(args, "LT")
|
||||
}
|
||||
|
||||
args = append(args, "FIELDS", len(fields))
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HPExpire - Sets the expiration time for specified fields in a hash in milliseconds.
|
||||
// Similar to HExpire, it accepts a key, an expiration duration in milliseconds, a struct with expiration condition flags, and a list of fields.
|
||||
// The command modifies the standard time.Duration to milliseconds for the Redis command.
|
||||
// For more information - https://redis.io/commands/hpexpire/
|
||||
func (c cmdable) HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration), "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (c cmdable) HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPEXPIRE", key, formatMs(ctx, expiration)}
|
||||
|
||||
// only if one argument is true, we can add it to the args
|
||||
// if more than one argument is true, it will cause an error
|
||||
if expirationArgs.NX {
|
||||
args = append(args, "NX")
|
||||
} else if expirationArgs.XX {
|
||||
args = append(args, "XX")
|
||||
} else if expirationArgs.GT {
|
||||
args = append(args, "GT")
|
||||
} else if expirationArgs.LT {
|
||||
args = append(args, "LT")
|
||||
}
|
||||
|
||||
args = append(args, "FIELDS", len(fields))
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in seconds.
|
||||
// Takes a key, a UNIX timestamp, a struct of conditional flags, and a list of fields.
|
||||
// The command sets absolute expiration times based on the UNIX timestamp provided.
|
||||
// For more information - https://redis.io/commands/hexpireat/
|
||||
func (c cmdable) HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd {
|
||||
|
||||
args := []interface{}{"HEXPIREAT", key, tm.Unix(), "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (c cmdable) HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HEXPIREAT", key, tm.Unix()}
|
||||
|
||||
// only if one argument is true, we can add it to the args
|
||||
// if more than one argument is true, it will cause an error
|
||||
if expirationArgs.NX {
|
||||
args = append(args, "NX")
|
||||
} else if expirationArgs.XX {
|
||||
args = append(args, "XX")
|
||||
} else if expirationArgs.GT {
|
||||
args = append(args, "GT")
|
||||
} else if expirationArgs.LT {
|
||||
args = append(args, "LT")
|
||||
}
|
||||
|
||||
args = append(args, "FIELDS", len(fields))
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HPExpireAt - Sets the expiration time for specified fields in a hash to a UNIX timestamp in milliseconds.
|
||||
// Similar to HExpireAt but for timestamps in milliseconds. It accepts the same parameters and adjusts the UNIX time to milliseconds.
|
||||
// For more information - https://redis.io/commands/hpexpireat/
|
||||
func (c cmdable) HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond), "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (c cmdable) HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPEXPIREAT", key, tm.UnixNano() / int64(time.Millisecond)}
|
||||
|
||||
// only if one argument is true, we can add it to the args
|
||||
// if more than one argument is true, it will cause an error
|
||||
if expirationArgs.NX {
|
||||
args = append(args, "NX")
|
||||
} else if expirationArgs.XX {
|
||||
args = append(args, "XX")
|
||||
} else if expirationArgs.GT {
|
||||
args = append(args, "GT")
|
||||
} else if expirationArgs.LT {
|
||||
args = append(args, "LT")
|
||||
}
|
||||
|
||||
args = append(args, "FIELDS", len(fields))
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HPersist - Removes the expiration time from specified fields in a hash.
|
||||
// Accepts a key and the fields themselves.
|
||||
// This command ensures that each field specified will have its expiration removed if present.
|
||||
// For more information - https://redis.io/commands/hpersist/
|
||||
func (c cmdable) HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPERSIST", key, "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in seconds.
|
||||
// Requires a key and the fields themselves to fetch their expiration timestamps.
|
||||
// This command returns the expiration times for each field or error/status codes for each field as specified.
|
||||
// For more information - https://redis.io/commands/hexpiretime/
|
||||
func (c cmdable) HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HEXPIRETIME", key, "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HPExpireTime - Retrieves the expiration time for specified fields in a hash as a UNIX timestamp in milliseconds.
|
||||
// Similar to HExpireTime, adjusted for timestamps in milliseconds. It requires the same parameters.
|
||||
// Provides the expiration timestamp for each field in milliseconds.
|
||||
// For more information - https://redis.io/commands/hexpiretime/
|
||||
func (c cmdable) HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPEXPIRETIME", key, "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HTTL - Retrieves the remaining time to live for specified fields in a hash in seconds.
|
||||
// Requires a key and the fields themselves. It returns the TTL for each specified field.
|
||||
// This command fetches the TTL in seconds for each field or returns error/status codes as appropriate.
|
||||
// For more information - https://redis.io/commands/httl/
|
||||
func (c cmdable) HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HTTL", key, "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
||||
// HPTTL - Retrieves the remaining time to live for specified fields in a hash in milliseconds.
|
||||
// Similar to HTTL, but returns the TTL in milliseconds. It requires a key and the specified fields.
|
||||
// This command provides the TTL in milliseconds for each field or returns error/status codes as needed.
|
||||
// For more information - https://redis.io/commands/hpttl/
|
||||
func (c cmdable) HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd {
|
||||
args := []interface{}{"HPTTL", key, "FIELDS", len(fields)}
|
||||
|
||||
for _, field := range fields {
|
||||
args = append(args, field)
|
||||
}
|
||||
cmd := NewIntSliceCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
|
@ -16,6 +17,10 @@ func connCheck(conn net.Conn) error {
|
|||
// Reset previous timeout.
|
||||
_ = conn.SetDeadline(time.Time{})
|
||||
|
||||
// Check if tls.Conn.
|
||||
if c, ok := conn.(*tls.Conn); ok {
|
||||
conn = c.NetConn()
|
||||
}
|
||||
sysConn, ok := conn.(syscall.Conn)
|
||||
if !ok {
|
||||
return nil
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"net/http/httptest"
|
||||
"time"
|
||||
|
@ -14,12 +15,17 @@ import (
|
|||
var _ = Describe("tests conn_check with real conns", func() {
|
||||
var ts *httptest.Server
|
||||
var conn net.Conn
|
||||
var tlsConn *tls.Conn
|
||||
var err error
|
||||
|
||||
BeforeEach(func() {
|
||||
ts = httptest.NewServer(nil)
|
||||
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
tlsTestServer := httptest.NewUnstartedServer(nil)
|
||||
tlsTestServer.StartTLS()
|
||||
tlsConn, err = tls.DialWithDialer(&net.Dialer{Timeout: time.Second}, tlsTestServer.Listener.Addr().Network(), tlsTestServer.Listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
|
@ -33,11 +39,23 @@ var _ = Describe("tests conn_check with real conns", func() {
|
|||
Expect(connCheck(conn)).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("good tls conn check", func() {
|
||||
Expect(connCheck(tlsConn)).NotTo(HaveOccurred())
|
||||
|
||||
Expect(tlsConn.Close()).NotTo(HaveOccurred())
|
||||
Expect(connCheck(tlsConn)).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("bad conn check", func() {
|
||||
Expect(conn.Close()).NotTo(HaveOccurred())
|
||||
Expect(connCheck(conn)).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("bad tls conn check", func() {
|
||||
Expect(tlsConn.Close()).NotTo(HaveOccurred())
|
||||
Expect(connCheck(tlsConn)).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("check conn deadline", func() {
|
||||
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
|
|
|
@ -3,6 +3,7 @@ package internal
|
|||
import (
|
||||
"context"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -81,3 +82,47 @@ func GetAddr(addr string) string {
|
|||
}
|
||||
return net.JoinHostPort(addr[:ind], addr[ind+1:])
|
||||
}
|
||||
|
||||
func ToInteger(val interface{}) int {
|
||||
switch v := val.(type) {
|
||||
case int:
|
||||
return v
|
||||
case int64:
|
||||
return int(v)
|
||||
case string:
|
||||
i, _ := strconv.Atoi(v)
|
||||
return i
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func ToFloat(val interface{}) float64 {
|
||||
switch v := val.(type) {
|
||||
case float64:
|
||||
return v
|
||||
case string:
|
||||
f, _ := strconv.ParseFloat(v, 64)
|
||||
return f
|
||||
default:
|
||||
return 0.0
|
||||
}
|
||||
}
|
||||
|
||||
func ToString(val interface{}) string {
|
||||
if str, ok := val.(string); ok {
|
||||
return str
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func ToStringSlice(val interface{}) []string {
|
||||
if arr, ok := val.([]interface{}); ok {
|
||||
result := make([]string, len(arr))
|
||||
for i, v := range arr {
|
||||
result[i] = ToString(v)
|
||||
}
|
||||
return result
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -96,6 +96,22 @@ var _ = Describe("ScanIterator", func() {
|
|||
Expect(vals).To(HaveLen(71 * 2))
|
||||
Expect(vals).To(ContainElement("K01"))
|
||||
Expect(vals).To(ContainElement("K71"))
|
||||
Expect(vals).To(ContainElement("x"))
|
||||
})
|
||||
|
||||
It("should hscan without values across multiple pages", Label("NonRedisEnterprise"), func() {
|
||||
Expect(hashSeed(71)).NotTo(HaveOccurred())
|
||||
|
||||
var vals []string
|
||||
iter := client.HScanNoValues(ctx, hashKey, 0, "", 10).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
vals = append(vals, iter.Val())
|
||||
}
|
||||
Expect(iter.Err()).NotTo(HaveOccurred())
|
||||
Expect(vals).To(HaveLen(71))
|
||||
Expect(vals).To(ContainElement("K01"))
|
||||
Expect(vals).To(ContainElement("K71"))
|
||||
Expect(vals).NotTo(ContainElement("x"))
|
||||
})
|
||||
|
||||
It("should scan to page borders", func() {
|
||||
|
|
|
@ -242,18 +242,18 @@ var _ = Describe("JSON Commands", Label("json"), func() {
|
|||
Expect(cmd.Val()).To(Equal("OK"))
|
||||
})
|
||||
|
||||
It("should JSONGet", Label("json.get", "json"), func() {
|
||||
It("should JSONGet", Label("json.get", "json", "NonRedisEnterprise"), func() {
|
||||
res, err := client.JSONSet(ctx, "get3", "$", `{"a": 1, "b": 2}`).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal("OK"))
|
||||
|
||||
res, err = client.JSONGetWithArgs(ctx, "get3", &redis.JSONGetArgs{Indent: "-"}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal(`[-{--"a":1,--"b":2-}]`))
|
||||
Expect(res).To(Equal(`{-"a":1,-"b":2}`))
|
||||
|
||||
res, err = client.JSONGetWithArgs(ctx, "get3", &redis.JSONGetArgs{Indent: "-", Newline: `~`, Space: `!`}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal(`[~-{~--"a":!1,~--"b":!2~-}~]`))
|
||||
Expect(res).To(Equal(`{~-"a":!1,~-"b":!2~}`))
|
||||
})
|
||||
|
||||
It("should JSONMerge", Label("json.merge", "json"), func() {
|
||||
|
|
|
@ -2,6 +2,7 @@ package redis_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -12,13 +13,18 @@ import (
|
|||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// This test is for manual use and is not part of the CI of Go-Redis.
|
||||
var _ = Describe("Monitor command", Label("monitor"), func() {
|
||||
ctx := context.TODO()
|
||||
var client *redis.Client
|
||||
|
||||
BeforeEach(func() {
|
||||
if os.Getenv("RUN_MONITOR_TEST") != "true" {
|
||||
Skip("Skipping Monitor command test. Set RUN_MONITOR_TEST=true to run it.")
|
||||
}
|
||||
client = redis.NewClient(&redis.Options{Addr: ":6379"})
|
||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
|
@ -50,6 +56,10 @@ var _ = Describe("Monitor command", Label("monitor"), func() {
|
|||
})
|
||||
|
||||
func TestMonitorCommand(t *testing.T) {
|
||||
if os.Getenv("RUN_MONITOR_TEST") != "true" {
|
||||
t.Skip("Skipping Monitor command test. Set RUN_MONITOR_TEST=true to run it.")
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
client := redis.NewClient(&redis.Options{Addr: ":6379"})
|
||||
if err := client.FlushDB(ctx).Err(); err != nil {
|
||||
|
|
|
@ -341,6 +341,8 @@ func (n *clusterNode) Close() error {
|
|||
return n.Client.Close()
|
||||
}
|
||||
|
||||
const maximumNodeLatency = 1 * time.Minute
|
||||
|
||||
func (n *clusterNode) updateLatency() {
|
||||
const numProbe = 10
|
||||
var dur uint64
|
||||
|
@ -361,7 +363,7 @@ func (n *clusterNode) updateLatency() {
|
|||
if successes == 0 {
|
||||
// If none of the pings worked, set latency to some arbitrarily high value so this node gets
|
||||
// least priority.
|
||||
latency = float64((1 * time.Minute) / time.Microsecond)
|
||||
latency = float64((maximumNodeLatency) / time.Microsecond)
|
||||
} else {
|
||||
latency = float64(dur) / float64(successes)
|
||||
}
|
||||
|
@ -735,20 +737,40 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
|
|||
return c.nodes.Random()
|
||||
}
|
||||
|
||||
var node *clusterNode
|
||||
var allNodesFailing = true
|
||||
var (
|
||||
closestNonFailingNode *clusterNode
|
||||
closestNode *clusterNode
|
||||
minLatency time.Duration
|
||||
)
|
||||
|
||||
// setting the max possible duration as zerovalue for minlatency
|
||||
minLatency = time.Duration(math.MaxInt64)
|
||||
|
||||
for _, n := range nodes {
|
||||
if n.Failing() {
|
||||
continue
|
||||
if closestNode == nil || n.Latency() < minLatency {
|
||||
closestNode = n
|
||||
minLatency = n.Latency()
|
||||
if !n.Failing() {
|
||||
closestNonFailingNode = n
|
||||
allNodesFailing = false
|
||||
}
|
||||
}
|
||||
if node == nil || n.Latency() < node.Latency() {
|
||||
node = n
|
||||
}
|
||||
}
|
||||
if node != nil {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// If all nodes are failing - return random node
|
||||
// pick the healthly node with the lowest latency
|
||||
if !allNodesFailing && closestNonFailingNode != nil {
|
||||
return closestNonFailingNode, nil
|
||||
}
|
||||
|
||||
// if all nodes are failing, we will pick the temporarily failing node with lowest latency
|
||||
if minLatency < maximumNodeLatency && closestNode != nil {
|
||||
internal.Logger.Printf(context.TODO(), "redis: all nodes are marked as failed, picking the temporarily failing node with lowest latency")
|
||||
return closestNode, nil
|
||||
}
|
||||
|
||||
// If all nodes are having the maximum latency(all pings are failing) - return a random node across the cluster
|
||||
internal.Logger.Printf(context.TODO(), "redis: pings to all nodes are failing, picking a random node across the cluster")
|
||||
return c.nodes.Random()
|
||||
}
|
||||
|
||||
|
@ -916,10 +938,13 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
|
|||
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
||||
slot := c.cmdSlot(ctx, cmd)
|
||||
var node *clusterNode
|
||||
var moved bool
|
||||
var ask bool
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
|
||||
if attempt > 0 {
|
||||
// MOVED and ASK responses are not transient errors that require retry delay; they
|
||||
// should be attempted immediately.
|
||||
if attempt > 0 && !moved && !ask {
|
||||
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -963,7 +988,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
|||
continue
|
||||
}
|
||||
|
||||
var moved bool
|
||||
var addr string
|
||||
moved, ask, addr = isMovedError(lastErr)
|
||||
if moved || ask {
|
||||
|
|
|
@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
|
|||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("follows node redirection immediately", func() {
|
||||
// Configure retry backoffs far in excess of the expected duration of redirection
|
||||
opt := redisClusterOptions()
|
||||
opt.MinRetryBackoff = 10 * time.Minute
|
||||
opt.MaxRetryBackoff = 20 * time.Minute
|
||||
client := cluster.newClusterClient(ctx, opt)
|
||||
|
||||
Eventually(func() error {
|
||||
return client.SwapNodes(ctx, "A")
|
||||
}, 30*time.Second).ShouldNot(HaveOccurred())
|
||||
|
||||
// Note that this context sets a deadline more aggressive than the lowest possible bound
|
||||
// of the retry backoff; this verifies that redirection completes immediately.
|
||||
redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := client.Set(redirCtx, "A", "VALUE", 0).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
v, err := client.Get(redirCtx, "A").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(v).To(Equal("VALUE"))
|
||||
|
||||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("calls fn for every master node", func() {
|
||||
for i := 0; i < 10; i++ {
|
||||
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
|
||||
|
|
|
@ -84,7 +84,7 @@ func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, er
|
|||
}
|
||||
|
||||
func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
|
||||
return cn.WithWriter(context.Background(), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
||||
return writeCmd(wr, cmd)
|
||||
})
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -137,10 +137,11 @@ type XReadArgs struct {
|
|||
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
|
||||
Count int64
|
||||
Block time.Duration
|
||||
ID string
|
||||
}
|
||||
|
||||
func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
|
||||
args := make([]interface{}, 0, 6+len(a.Streams))
|
||||
args := make([]interface{}, 0, 2*len(a.Streams)+6)
|
||||
args = append(args, "xread")
|
||||
|
||||
keyPos := int8(1)
|
||||
|
@ -159,6 +160,11 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
|
|||
for _, s := range a.Streams {
|
||||
args = append(args, s)
|
||||
}
|
||||
if a.ID != "" {
|
||||
for range a.Streams {
|
||||
args = append(args, a.ID)
|
||||
}
|
||||
}
|
||||
|
||||
cmd := NewXStreamSliceCmd(ctx, args...)
|
||||
if a.Block >= 0 {
|
||||
|
|
|
@ -40,25 +40,32 @@ type TimeseriesCmdable interface {
|
|||
}
|
||||
|
||||
type TSOptions struct {
|
||||
Retention int
|
||||
ChunkSize int
|
||||
Encoding string
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
Retention int
|
||||
ChunkSize int
|
||||
Encoding string
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
IgnoreMaxTimeDiff int64
|
||||
IgnoreMaxValDiff float64
|
||||
}
|
||||
type TSIncrDecrOptions struct {
|
||||
Timestamp int64
|
||||
Retention int
|
||||
ChunkSize int
|
||||
Uncompressed bool
|
||||
Labels map[string]string
|
||||
Timestamp int64
|
||||
Retention int
|
||||
ChunkSize int
|
||||
Uncompressed bool
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
IgnoreMaxTimeDiff int64
|
||||
IgnoreMaxValDiff float64
|
||||
}
|
||||
|
||||
type TSAlterOptions struct {
|
||||
Retention int
|
||||
ChunkSize int
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
Retention int
|
||||
ChunkSize int
|
||||
DuplicatePolicy string
|
||||
Labels map[string]string
|
||||
IgnoreMaxTimeDiff int64
|
||||
IgnoreMaxValDiff float64
|
||||
}
|
||||
|
||||
type TSCreateRuleOptions struct {
|
||||
|
@ -223,6 +230,9 @@ func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interf
|
|||
args = append(args, label, value)
|
||||
}
|
||||
}
|
||||
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
|
||||
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
|
||||
}
|
||||
}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
|
@ -264,6 +274,9 @@ func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOp
|
|||
args = append(args, label, value)
|
||||
}
|
||||
}
|
||||
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
|
||||
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
|
||||
}
|
||||
}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
|
@ -292,6 +305,9 @@ func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOption
|
|||
args = append(args, label, value)
|
||||
}
|
||||
}
|
||||
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
|
||||
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
|
||||
}
|
||||
}
|
||||
cmd := NewStatusCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
|
@ -351,12 +367,18 @@ func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp flo
|
|||
if options.Uncompressed {
|
||||
args = append(args, "UNCOMPRESSED")
|
||||
}
|
||||
if options.DuplicatePolicy != "" {
|
||||
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
}
|
||||
}
|
||||
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
|
||||
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
|
||||
}
|
||||
}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
|
@ -391,12 +413,18 @@ func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp flo
|
|||
if options.Uncompressed {
|
||||
args = append(args, "UNCOMPRESSED")
|
||||
}
|
||||
if options.DuplicatePolicy != "" {
|
||||
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
|
||||
}
|
||||
if options.Labels != nil {
|
||||
args = append(args, "LABELS")
|
||||
for label, value := range options.Labels {
|
||||
args = append(args, label, value)
|
||||
}
|
||||
}
|
||||
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
|
||||
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
|
||||
}
|
||||
}
|
||||
cmd := NewIntCmd(ctx, args...)
|
||||
_ = c(ctx, cmd)
|
||||
|
|
|
@ -23,7 +23,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
Expect(client.Close()).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() {
|
||||
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
|
||||
result, err := client.TSCreate(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
|
@ -62,10 +62,60 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
resultInfo, err = client.TSInfo(ctx, keyName).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup))
|
||||
|
||||
}
|
||||
// Test insertion filters
|
||||
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, DuplicatePolicy: "LAST", IgnoreMaxValDiff: 10.0}
|
||||
result, err = client.TSCreateWithArgs(ctx, "ts-if-1", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1000))
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1010))
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1010))
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1020, 11.5).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1020))
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1021, 22.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1021))
|
||||
|
||||
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1021).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(4))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
|
||||
{Timestamp: 1000, Value: 1.0},
|
||||
{Timestamp: 1010, Value: 11.0},
|
||||
{Timestamp: 1020, Value: 11.5},
|
||||
{Timestamp: 1021, Value: 22.0}}))
|
||||
// Test insertion filters with other duplicate policy
|
||||
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0}
|
||||
result, err = client.TSCreateWithArgs(ctx, "ts-if-2", opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
resultAdd1, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd1).To(BeEquivalentTo(1000))
|
||||
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd1).To(BeEquivalentTo(1010))
|
||||
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd1).To(BeEquivalentTo(1013))
|
||||
|
||||
rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(3))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
|
||||
{Timestamp: 1000, Value: 1.0},
|
||||
{Timestamp: 1010, Value: 11.0},
|
||||
{Timestamp: 1013, Value: 10.0}}))
|
||||
})
|
||||
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() {
|
||||
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs", "NonRedisEnterprise"), func() {
|
||||
result, err := client.TSAdd(ctx, "1", 1, 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1))
|
||||
|
@ -138,9 +188,23 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
resultGet, err = client.TSGet(ctx, "tsami-1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultGet.Value).To(BeEquivalentTo(5))
|
||||
// Insertion filters
|
||||
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
|
||||
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1000, 1.0, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1000))
|
||||
|
||||
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1004, 3.0, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo(1000))
|
||||
|
||||
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(1))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
|
||||
})
|
||||
|
||||
It("should TSAlter", Label("timeseries", "tsalter"), func() {
|
||||
It("should TSAlter", Label("timeseries", "tsalter", "NonRedisEnterprise"), func() {
|
||||
result, err := client.TSCreate(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(result).To(BeEquivalentTo("OK"))
|
||||
|
@ -179,6 +243,33 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
resultInfo, err = client.TSInfo(ctx, "1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min"))
|
||||
// Test insertion filters
|
||||
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1000))
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1010))
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1013))
|
||||
|
||||
alterOpt := &redis.TSAlterOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
|
||||
resultAlter, err = client.TSAlter(ctx, "ts-if-1", alterOpt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAlter).To(BeEquivalentTo("OK"))
|
||||
|
||||
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1015, 11.5).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultAdd).To(BeEquivalentTo(1013))
|
||||
|
||||
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(3))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
|
||||
{Timestamp: 1000, Value: 1.0},
|
||||
{Timestamp: 1010, Value: 11.0},
|
||||
{Timestamp: 1013, Value: 10.0}}))
|
||||
})
|
||||
|
||||
It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() {
|
||||
|
@ -216,7 +307,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
|
||||
})
|
||||
|
||||
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() {
|
||||
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := client.TSIncrBy(ctx, "1", 1).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
@ -277,6 +368,54 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
|
|||
resultInfo, err = client.TSInfo(ctx, "4").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
|
||||
|
||||
// Test insertion filters INCRBY
|
||||
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
|
||||
res, err := client.TSIncrByWithArgs(ctx, "ts-if-1", 1.0, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo(1000))
|
||||
|
||||
res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo(1000))
|
||||
|
||||
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(1))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
|
||||
|
||||
res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo(1000))
|
||||
|
||||
rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(1))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 11.1}}))
|
||||
|
||||
// Test insertion filters DECRBY
|
||||
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
|
||||
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 1.0, opt).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo(1000))
|
||||
|
||||
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo(1000))
|
||||
|
||||
rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(1))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -1.0}}))
|
||||
|
||||
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(BeEquivalentTo(1000))
|
||||
|
||||
rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(rangePoints)).To(BeEquivalentTo(1))
|
||||
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -11.1}}))
|
||||
})
|
||||
|
||||
It("should TSGet", Label("timeseries", "tsget"), func() {
|
||||
|
|
Loading…
Reference in New Issue