xgroup: Add XGroupCreateMkStream

This commit is contained in:
Roman Volosatovs 2018-12-11 15:52:46 +01:00
parent 11cf9400d5
commit c9555c4277
No known key found for this signature in database
GPG Key ID: 3AC661943D80C89E
2 changed files with 23 additions and 0 deletions

View File

@ -175,6 +175,7 @@ type Cmdable interface {
XRead(a *XReadArgs) *XStreamSliceCmd XRead(a *XReadArgs) *XStreamSliceCmd
XReadStreams(streams ...string) *XStreamSliceCmd XReadStreams(streams ...string) *XStreamSliceCmd
XGroupCreate(stream, group, start string) *StatusCmd XGroupCreate(stream, group, start string) *StatusCmd
XGroupCreateMkStream(stream, group, start string) *StatusCmd
XGroupSetID(stream, group, start string) *StatusCmd XGroupSetID(stream, group, start string) *StatusCmd
XGroupDestroy(stream, group string) *IntCmd XGroupDestroy(stream, group string) *IntCmd
XGroupDelConsumer(stream, group, consumer string) *IntCmd XGroupDelConsumer(stream, group, consumer string) *IntCmd
@ -1424,6 +1425,12 @@ func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
return cmd return cmd
} }
func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
c.process(cmd)
return cmd
}
func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "setid", stream, group, start) cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
c.process(cmd) c.process(cmd)

View File

@ -3569,6 +3569,22 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(1))) Expect(n).To(Equal(int64(1)))
}) })
It("should XGroupCreateMkStream", func() {
err := client.XGroupCreateMkStream("stream2", "group", "0").Err()
Expect(err).NotTo(HaveOccurred())
err = client.XGroupCreateMkStream("stream2", "group", "0").Err()
Expect(err).To(Equal(proto.RedisError("BUSYGROUP Consumer Group name already exists")))
n, err := client.XGroupDestroy("stream2", "group").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
n, err = client.Del("stream2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
})
It("should XPending", func() { It("should XPending", func() {
info, err := client.XPending("stream", "group").Result() info, err := client.XPending("stream", "group").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())