diff --git a/command.go b/command.go index dde513b..d7c8e0d 100644 --- a/command.go +++ b/command.go @@ -1000,10 +1000,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { } v, err := rd.ReadArrayReply(stringInterfaceMapParser) - if err != nil { + if err != nil && err != proto.Nil { return nil, err } + if v == nil || err == proto.Nil { + v = make(map[string]interface{}) + } + msgs = append(msgs, XMessage{ ID: id, Values: v.(map[string]interface{}), diff --git a/commands_test.go b/commands_test.go index 02fc7bf..3970808 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3583,6 +3583,27 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(1))) }) + It("should XReadGroup skip empty", func() { + n, err := client.XDel("stream", "2-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + res, err := client.XReadGroup(&redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer", + Streams: []string{"stream", "0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{{ + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }}, + })) + }) + It("should XGroupCreateMkStream", func() { err := client.XGroupCreateMkStream("stream2", "group", "0").Err() Expect(err).NotTo(HaveOccurred())