From 0fdd200bc73d0033e6742edd51979bc81cda2d52 Mon Sep 17 00:00:00 2001 From: yeplato <55429479+yeplato@users.noreply.github.com> Date: Sun, 2 Feb 2020 00:49:32 -0800 Subject: [PATCH] let XReadGroup skip empty message and process next message (#1244) * let XReadGroup skip empty message and process next message --- command.go | 6 +++++- commands_test.go | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/command.go b/command.go index dde513b..d7c8e0d 100644 --- a/command.go +++ b/command.go @@ -1000,10 +1000,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { } v, err := rd.ReadArrayReply(stringInterfaceMapParser) - if err != nil { + if err != nil && err != proto.Nil { return nil, err } + if v == nil || err == proto.Nil { + v = make(map[string]interface{}) + } + msgs = append(msgs, XMessage{ ID: id, Values: v.(map[string]interface{}), diff --git a/commands_test.go b/commands_test.go index 02fc7bf..3970808 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3583,6 +3583,27 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(1))) }) + It("should XReadGroup skip empty", func() { + n, err := client.XDel("stream", "2-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + res, err := client.XReadGroup(&redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer", + Streams: []string{"stream", "0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{{ + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }}, + })) + }) + It("should XGroupCreateMkStream", func() { err := client.XGroupCreateMkStream("stream2", "group", "0").Err() Expect(err).NotTo(HaveOccurred())