mirror of https://github.com/go-redis/redis.git
Add support for XREAD last entry (#3005)
* add support for XREAD last entry * handle reading from multiple streams * add test to ensure we block for empty stream * small tweak * add an option to XReadArgs instead * modify test comment * small preallocation optimization * Changed argument to generic ID, skip tests on Enterprise * Fix test case * Updated expiration command --------- Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> Co-authored-by: vladvildanov <divinez122@outlook.com>
This commit is contained in:
parent
445d2667eb
commit
3975cd5380
|
@ -2588,13 +2588,14 @@ var _ = Describe("Commands", func() {
|
||||||
Expect(sadd.Err()).NotTo(HaveOccurred())
|
Expect(sadd.Err()).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
|
expireAt := time.Now().Add(10 * time.Second)
|
||||||
|
res, err := client.HPExpireAt(ctx, "myhash", expireAt, "key1", "key200").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(res).To(Equal([]int64{1, -2}))
|
Expect(res).To(Equal([]int64{1, -2}))
|
||||||
|
|
||||||
res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
|
res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(res).To(BeEquivalentTo([]int64{time.Now().Add(10 * time.Second).UnixMilli(), -1, -2}))
|
Expect(res).To(BeEquivalentTo([]int64{expireAt.UnixMilli(), -1, -2}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
|
||||||
|
@ -5888,6 +5889,78 @@ var _ = Describe("Commands", func() {
|
||||||
Expect(err).To(Equal(redis.Nil))
|
Expect(err).To(Equal(redis.Nil))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should XRead LastEntry", Label("NonRedisEnterprise"), func() {
|
||||||
|
res, err := client.XRead(ctx, &redis.XReadArgs{
|
||||||
|
Streams: []string{"stream"},
|
||||||
|
Count: 2, // we expect 1 message
|
||||||
|
ID: "+",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal([]redis.XStream{
|
||||||
|
{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() {
|
||||||
|
res, err := client.XRead(ctx, &redis.XReadArgs{
|
||||||
|
Streams: []string{"stream", "stream"},
|
||||||
|
ID: "+",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal([]redis.XStream{
|
||||||
|
{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() {
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
id, err := client.XAdd(ctx, &redis.XAddArgs{
|
||||||
|
Stream: "empty",
|
||||||
|
ID: "4-0",
|
||||||
|
Values: map[string]interface{}{"quatro": "quatre"},
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(id).To(Equal("4-0"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
res, err := client.XRead(ctx, &redis.XReadArgs{
|
||||||
|
Streams: []string{"empty"},
|
||||||
|
Block: 500 * time.Millisecond,
|
||||||
|
ID: "+",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
// Ensure that the XRead call with LastEntry option blocked for at least 100ms.
|
||||||
|
Expect(time.Since(start)).To(BeNumerically(">=", 100*time.Millisecond))
|
||||||
|
Expect(res).To(Equal([]redis.XStream{
|
||||||
|
{
|
||||||
|
Stream: "empty",
|
||||||
|
Messages: []redis.XMessage{
|
||||||
|
{ID: "4-0", Values: map[string]interface{}{"quatro": "quatre"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
Describe("group", func() {
|
Describe("group", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()
|
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()
|
||||||
|
|
|
@ -137,10 +137,11 @@ type XReadArgs struct {
|
||||||
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
|
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
|
||||||
Count int64
|
Count int64
|
||||||
Block time.Duration
|
Block time.Duration
|
||||||
|
ID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
|
func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
|
||||||
args := make([]interface{}, 0, 6+len(a.Streams))
|
args := make([]interface{}, 0, 2*len(a.Streams)+6)
|
||||||
args = append(args, "xread")
|
args = append(args, "xread")
|
||||||
|
|
||||||
keyPos := int8(1)
|
keyPos := int8(1)
|
||||||
|
@ -159,6 +160,11 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
|
||||||
for _, s := range a.Streams {
|
for _, s := range a.Streams {
|
||||||
args = append(args, s)
|
args = append(args, s)
|
||||||
}
|
}
|
||||||
|
if a.ID != "" {
|
||||||
|
for range a.Streams {
|
||||||
|
args = append(args, a.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cmd := NewXStreamSliceCmd(ctx, args...)
|
cmd := NewXStreamSliceCmd(ctx, args...)
|
||||||
if a.Block >= 0 {
|
if a.Block >= 0 {
|
||||||
|
|
Loading…
Reference in New Issue