From f594401261ea6c736e4be11d0e6296713c991527 Mon Sep 17 00:00:00 2001 From: kkkbird Date: Tue, 9 Feb 2021 20:46:26 +0800 Subject: [PATCH] support "XINFO CONSUMERS" (#1649) * support "XINFO CONSUMERS" * add "xinfo" test --- command.go | 97 +++++++++++++++++++++++++++++++++++++++++++ commands.go | 6 +++ commands_test.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+) diff --git a/command.go b/command.go index 2932035..6ca0634 100644 --- a/command.go +++ b/command.go @@ -1445,6 +1445,103 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { //------------------------------------------------------------------------------ +type XInfoConsumersCmd struct { + baseCmd + val []XInfoConsumer +} + +type XInfoConsumer struct { + Name string + Pending int64 + Idle int64 +} + +var _ Cmder = (*XInfoGroupsCmd)(nil) + +func NewXInfoConsumersCmd(ctx context.Context, stream string, group string) *XInfoConsumersCmd { + return &XInfoConsumersCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: []interface{}{"xinfo", "consumers", stream, group}, + }, + } +} + +func (cmd *XInfoConsumersCmd) Val() []XInfoConsumer { + return cmd.val +} + +func (cmd *XInfoConsumersCmd) Result() ([]XInfoConsumer, error) { + return cmd.val, cmd.err +} + +func (cmd *XInfoConsumersCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XInfoConsumersCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + + cmd.val = make([]XInfoConsumer, n) + + for i := 0; i < n; i++ { + cmd.val[i], err = readXConsumerInfo(rd) + if err != nil { + return err + } + } + + return nil +} + +func readXConsumerInfo(rd *proto.Reader) (XInfoConsumer, error) { + var consumer XInfoConsumer + + n, err := rd.ReadArrayLen() + if err != nil { + return consumer, err + } + if n != 6 { + return consumer, fmt.Errorf("redis: got %d elements in XINFO CONSUMERS reply, wanted 6", n) + } + + for i := 0; i < 3; i++ { + key, err := rd.ReadString() + if err != nil { + return consumer, err + } + + val, err := rd.ReadString() + if err != nil { + return consumer, err + } + + switch key { + case "name": + consumer.Name = val + case "pending": + consumer.Pending, err = strconv.ParseInt(val, 0, 64) + if err != nil { + return consumer, err + } + case "idle": + consumer.Idle, err = strconv.ParseInt(val, 0, 64) + if err != nil { + return consumer, err + } + default: + return consumer, fmt.Errorf("redis: unexpected content %s in XINFO CONSUMERS reply", key) + } + } + + return consumer, nil +} + +//------------------------------------------------------------------------------ + type XInfoGroupsCmd struct { baseCmd val []XInfoGroup diff --git a/commands.go b/commands.go index 422a7c5..6f7e73c 100644 --- a/commands.go +++ b/commands.go @@ -1752,6 +1752,12 @@ func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *Int return cmd } +func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd { + cmd := NewXInfoConsumersCmd(ctx, key, group) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd { cmd := NewXInfoGroupsCmd(ctx, key) _ = c(ctx, cmd) diff --git a/commands_test.go b/commands_test.go index a73f4f8..f015541 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3891,6 +3891,111 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(2))) }) }) + + Describe("xinfo", func() { + BeforeEach(func() { + err := client.XGroupCreate(ctx, "stream", "group1", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group1", + Consumer: "consumer1", + Streams: []string{"stream", ">"}, + Count: 2, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + }, + }, + })) + + res, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group1", + Consumer: "consumer2", + Streams: []string{"stream", ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + })) + + err = client.XGroupCreate(ctx, "stream", "group2", "1-0").Err() + Expect(err).NotTo(HaveOccurred()) + + res, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer1", + Streams: []string{"stream", ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XStream{ + { + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }, + }, + })) + }) + + AfterEach(func() { + n, err := client.XGroupDestroy(ctx, "stream", "group1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + n, err = client.XGroupDestroy(ctx, "stream", "group2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + }) + + It("should XINFO STREAM", func() { + res, err := client.XInfoStream(ctx, "stream").Result() + Expect(err).NotTo(HaveOccurred()) + res.RadixTreeKeys = 0 + res.RadixTreeNodes = 0 + + Expect(res).To(Equal(&redis.XInfoStream{ + Length: 3, + RadixTreeKeys: 0, + RadixTreeNodes: 0, + Groups: 2, + LastGeneratedID: "3-0", + FirstEntry: redis.XMessage{ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + LastEntry: redis.XMessage{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + })) + }) + + It("should XINFO GROUPS", func() { + res, err := client.XInfoGroups(ctx, "stream").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XInfoGroup{ + {Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0"}, + {Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0"}, + })) + }) + + It("should XINFO CONSUMERS", func() { + res, err := client.XInfoConsumers(ctx, "stream", "group1").Result() + Expect(err).NotTo(HaveOccurred()) + for i := range res { + res[i].Idle = 0 + } + Expect(res).To(Equal([]redis.XInfoConsumer{ + {Name: "consumer1", Pending: 2, Idle: 0}, + {Name: "consumer2", Pending: 1, Idle: 0}, + })) + }) + }) }) Describe("Geo add and radius search", func() {