From a2b022742196c7c9e0c756c75752ff303482f527 Mon Sep 17 00:00:00 2001 From: Ilia Choly Date: Tue, 22 Sep 2020 13:45:23 -0400 Subject: [PATCH] feature: add XINFO STREAM support --- command.go | 145 +++++++++++++++++++++++++++++++++++++++++++--------- commands.go | 7 +++ 2 files changed, 127 insertions(+), 25 deletions(-) diff --git a/command.go b/command.go index b202de97..5451ba2f 100644 --- a/command.go +++ b/command.go @@ -1081,34 +1081,43 @@ func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { return nil } +func readXMessage(rd *proto.Reader) (XMessage, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return XMessage{}, err + } + if n != 2 { + return XMessage{}, fmt.Errorf("got %d, wanted 2", n) + } + + id, err := rd.ReadString() + if err != nil { + return XMessage{}, err + } + + var values map[string]interface{} + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + if err != proto.Nil { + return XMessage{}, err + } + } else { + values = v.(map[string]interface{}) + } + + return XMessage{ + ID: id, + Values: values, + }, nil +} + // xMessageSliceParser implements proto.MultiBulkParse. func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { msgs := make([]XMessage, n) - for i := 0; i < len(msgs); i++ { - i := i - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - id, err := rd.ReadString() - if err != nil { - return nil, err - } - - var values map[string]interface{} - - v, err := rd.ReadArrayReply(stringInterfaceMapParser) - if err != nil { - if err != proto.Nil { - return nil, err - } - } else { - values = v.(map[string]interface{}) - } - - msgs[i] = XMessage{ - ID: id, - Values: values, - } - return nil, nil - }) + for i := int64(0); i < n; i++ { + var err error + msgs[i], err = readXMessage(rd) if err != nil { return nil, err } @@ -1493,6 +1502,92 @@ func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) { //------------------------------------------------------------------------------ +type XInfoStreamCmd struct { + baseCmd + val *XInfoStream +} + +type XInfoStream struct { + Length int64 + RadixTreeKeys int64 + RadixTreeNodes int64 + Groups int64 + LastGeneratedID string + FirstEntry XMessage + LastEntry XMessage +} + +var _ Cmder = (*XInfoStreamCmd)(nil) + +func NewXInfoStreamCmd(ctx context.Context, stream string) *XInfoStreamCmd { + return &XInfoStreamCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: []interface{}{"xinfo", "stream", stream}, + }, + } +} + +func (cmd *XInfoStreamCmd) Val() *XInfoStream { + return cmd.val +} + +func (cmd *XInfoStreamCmd) Result() (*XInfoStream, error) { + return cmd.val, cmd.err +} + +func (cmd *XInfoStreamCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error { + v, err := rd.ReadReply(xStreamInfoParser) + if err != nil { + return err + } + cmd.val = v.(*XInfoStream) + return nil +} + +func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 14 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ + "wanted 14", n) + } + var info XInfoStream + for i := 0; i < 7; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + switch key { + case "length": + info.Length, err = rd.ReadIntReply() + case "radix-tree-keys": + info.RadixTreeKeys, err = rd.ReadIntReply() + case "radix-tree-nodes": + info.RadixTreeNodes, err = rd.ReadIntReply() + case "groups": + info.Groups, err = rd.ReadIntReply() + case "last-generated-id": + info.LastGeneratedID, err = rd.ReadString() + case "first-entry": + info.FirstEntry, err = readXMessage(rd) + case "last-entry": + info.LastEntry, err = readXMessage(rd) + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + if err != nil { + return nil, err + } + } + return &info, nil +} + +//------------------------------------------------------------------------------ + type ZSliceCmd struct { baseCmd diff --git a/commands.go b/commands.go index 973aaf5f..f4e6afc1 100644 --- a/commands.go +++ b/commands.go @@ -219,6 +219,7 @@ type Cmdable interface { XTrim(ctx context.Context, key string, maxLen int64) *IntCmd XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd + XInfoStream(ctx context.Context, key string) *XInfoStreamCmd BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd BZPopMin(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd @@ -1699,6 +1700,12 @@ func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd { return cmd } +func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd { + cmd := NewXInfoStreamCmd(ctx, key) + _ = c(ctx, cmd) + return cmd +} + //------------------------------------------------------------------------------ // Z represents sorted set member.