forked from mirror/redis
let XReadGroup skip empty message and process next message (#1244)
* let XReadGroup skip empty message and process next message
This commit is contained in:
parent
7f69d5e320
commit
0fdd200bc7
|
@ -1000,10 +1000,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
|
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
|
||||||
if err != nil {
|
if err != nil && err != proto.Nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if v == nil || err == proto.Nil {
|
||||||
|
v = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
msgs = append(msgs, XMessage{
|
msgs = append(msgs, XMessage{
|
||||||
ID: id,
|
ID: id,
|
||||||
Values: v.(map[string]interface{}),
|
Values: v.(map[string]interface{}),
|
||||||
|
|
|
@ -3583,6 +3583,27 @@ var _ = Describe("Commands", func() {
|
||||||
Expect(n).To(Equal(int64(1)))
|
Expect(n).To(Equal(int64(1)))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should XReadGroup skip empty", func() {
|
||||||
|
n, err := client.XDel("stream", "2-0").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(1)))
|
||||||
|
|
||||||
|
res, err := client.XReadGroup(&redis.XReadGroupArgs{
|
||||||
|
Group: "group",
|
||||||
|
Consumer: "consumer",
|
||||||
|
Streams: []string{"stream", "0"},
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal([]redis.XStream{{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{}},
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
It("should XGroupCreateMkStream", func() {
|
It("should XGroupCreateMkStream", func() {
|
||||||
err := client.XGroupCreateMkStream("stream2", "group", "0").Err()
|
err := client.XGroupCreateMkStream("stream2", "group", "0").Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
Loading…
Reference in New Issue