diff --git a/commands.go b/commands.go index 6e925af..e9b4134 100644 --- a/commands.go +++ b/commands.go @@ -175,6 +175,7 @@ type Cmdable interface { XRead(a *XReadArgs) *XStreamSliceCmd XReadStreams(streams ...string) *XStreamSliceCmd XGroupCreate(stream, group, start string) *StatusCmd + XGroupCreateMkStream(stream, group, start string) *StatusCmd XGroupSetID(stream, group, start string) *StatusCmd XGroupDestroy(stream, group string) *IntCmd XGroupDelConsumer(stream, group, consumer string) *IntCmd @@ -1424,6 +1425,12 @@ func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { return cmd } +func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream") + c.process(cmd) + return cmd +} + func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { cmd := NewStatusCmd("xgroup", "setid", stream, group, start) c.process(cmd) diff --git a/commands_test.go b/commands_test.go index ea646be..cda4595 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3569,6 +3569,22 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(1))) }) + It("should XGroupCreateMkStream", func() { + err := client.XGroupCreateMkStream("stream2", "group", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.XGroupCreateMkStream("stream2", "group", "0").Err() + Expect(err).To(Equal(proto.RedisError("BUSYGROUP Consumer Group name already exists"))) + + n, err := client.XGroupDestroy("stream2", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + n, err = client.Del("stream2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + }) + It("should XPending", func() { info, err := client.XPending("stream", "group").Result() Expect(err).NotTo(HaveOccurred())