mirror of https://github.com/go-redis/redis.git
Add XAutoClaim command (#1780)
This commit is contained in:
parent
6e4eb2e3ac
commit
237bad5284
116
command.go
116
command.go
|
@ -1501,6 +1501,122 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XAutoClaimCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
start string
|
||||||
|
val []XMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*XAutoClaimCmd)(nil)
|
||||||
|
|
||||||
|
func NewXAutoClaimCmd(ctx context.Context, args ...interface{}) *XAutoClaimCmd {
|
||||||
|
return &XAutoClaimCmd{
|
||||||
|
baseCmd: baseCmd{
|
||||||
|
ctx: ctx,
|
||||||
|
args: args,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimCmd) Val() (messages []XMessage, start string) {
|
||||||
|
return cmd.val, cmd.start
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimCmd) Result() (messages []XMessage, start string, err error) {
|
||||||
|
return cmd.val, cmd.start, cmd.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error {
|
||||||
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 2 {
|
||||||
|
return nil, fmt.Errorf("got %d, wanted 2", n)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
cmd.start, err = rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.val, err = readXMessageSlice(rd)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XAutoClaimJustIDCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
start string
|
||||||
|
val []string
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*XAutoClaimJustIDCmd)(nil)
|
||||||
|
|
||||||
|
func NewXAutoClaimJustIDCmd(ctx context.Context, args ...interface{}) *XAutoClaimJustIDCmd {
|
||||||
|
return &XAutoClaimJustIDCmd{
|
||||||
|
baseCmd: baseCmd{
|
||||||
|
ctx: ctx,
|
||||||
|
args: args,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimJustIDCmd) Val() (ids []string, start string) {
|
||||||
|
return cmd.val, cmd.start
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimJustIDCmd) Result() (ids []string, start string, err error) {
|
||||||
|
return cmd.val, cmd.start, cmd.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimJustIDCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error {
|
||||||
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 2 {
|
||||||
|
return nil, fmt.Errorf("got %d, wanted 2", n)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
cmd.start, err = rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nn, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.val = make([]string, nn)
|
||||||
|
for i := 0; i < nn; i++ {
|
||||||
|
cmd.val[i], err = rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type XInfoConsumersCmd struct {
|
type XInfoConsumersCmd struct {
|
||||||
baseCmd
|
baseCmd
|
||||||
val []XInfoConsumer
|
val []XInfoConsumer
|
||||||
|
|
33
commands.go
33
commands.go
|
@ -1845,6 +1845,39 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type XAutoClaimArgs struct {
|
||||||
|
Stream string
|
||||||
|
Group string
|
||||||
|
MinIdle time.Duration
|
||||||
|
Start string
|
||||||
|
Count int64
|
||||||
|
Consumer string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd {
|
||||||
|
args := xAutoClaimArgs(ctx, a)
|
||||||
|
cmd := NewXAutoClaimCmd(ctx, args...)
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd {
|
||||||
|
args := xAutoClaimArgs(ctx, a)
|
||||||
|
args = append(args, "justid")
|
||||||
|
cmd := NewXAutoClaimJustIDCmd(ctx, args...)
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} {
|
||||||
|
args := make([]interface{}, 0, 9)
|
||||||
|
args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start)
|
||||||
|
if a.Count > 0 {
|
||||||
|
args = append(args, "count", a.Count)
|
||||||
|
}
|
||||||
|
return args
|
||||||
|
}
|
||||||
|
|
||||||
type XClaimArgs struct {
|
type XClaimArgs struct {
|
||||||
Stream string
|
Stream string
|
||||||
Group string
|
Group string
|
||||||
|
|
|
@ -4386,6 +4386,43 @@ var _ = Describe("Commands", func() {
|
||||||
Expect(n).To(Equal(int64(3)))
|
Expect(n).To(Equal(int64(3)))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should XAutoClaim", func() {
|
||||||
|
xca := &redis.XAutoClaimArgs{
|
||||||
|
Stream: "stream",
|
||||||
|
Group: "group",
|
||||||
|
Consumer: "consumer",
|
||||||
|
Start: "-",
|
||||||
|
Count: 2,
|
||||||
|
}
|
||||||
|
msgs, start, err := client.XAutoClaim(ctx, xca).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(start).To(Equal("2-0"))
|
||||||
|
Expect(msgs).To(Equal([]redis.XMessage{{
|
||||||
|
ID: "1-0",
|
||||||
|
Values: map[string]interface{}{"uno": "un"},
|
||||||
|
}, {
|
||||||
|
ID: "2-0",
|
||||||
|
Values: map[string]interface{}{"dos": "deux"},
|
||||||
|
}}))
|
||||||
|
|
||||||
|
xca.Start = start
|
||||||
|
msgs, start, err = client.XAutoClaim(ctx, xca).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(start).To(Equal("3-0"))
|
||||||
|
Expect(msgs).To(Equal([]redis.XMessage{{
|
||||||
|
ID: "2-0",
|
||||||
|
Values: map[string]interface{}{"dos": "deux"},
|
||||||
|
}, {
|
||||||
|
ID: "3-0",
|
||||||
|
Values: map[string]interface{}{"tres": "troix"},
|
||||||
|
}}))
|
||||||
|
|
||||||
|
ids, start, err := client.XAutoClaimJustID(ctx, xca).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(start).To(Equal("3-0"))
|
||||||
|
Expect(ids).To(Equal([]string{"2-0", "3-0"}))
|
||||||
|
})
|
||||||
|
|
||||||
It("should XClaim", func() {
|
It("should XClaim", func() {
|
||||||
msgs, err := client.XClaim(ctx, &redis.XClaimArgs{
|
msgs, err := client.XClaim(ctx, &redis.XClaimArgs{
|
||||||
Stream: "stream",
|
Stream: "stream",
|
||||||
|
|
Loading…
Reference in New Issue