feature: add XINFO STREAM support

This commit is contained in:
Ilia Choly 2020-09-22 13:45:23 -04:00
parent 2b9cfd3cc4
commit a2b0227421
2 changed files with 127 additions and 25 deletions

View File

@ -1081,34 +1081,43 @@ func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
return nil
}
func readXMessage(rd *proto.Reader) (XMessage, error) {
n, err := rd.ReadArrayLen()
if err != nil {
return XMessage{}, err
}
if n != 2 {
return XMessage{}, fmt.Errorf("got %d, wanted 2", n)
}
id, err := rd.ReadString()
if err != nil {
return XMessage{}, err
}
var values map[string]interface{}
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
if err != nil {
if err != proto.Nil {
return XMessage{}, err
}
} else {
values = v.(map[string]interface{})
}
return XMessage{
ID: id,
Values: values,
}, nil
}
// xMessageSliceParser implements proto.MultiBulkParse.
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
msgs := make([]XMessage, n)
for i := 0; i < len(msgs); i++ {
i := i
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
id, err := rd.ReadString()
if err != nil {
return nil, err
}
var values map[string]interface{}
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
if err != nil {
if err != proto.Nil {
return nil, err
}
} else {
values = v.(map[string]interface{})
}
msgs[i] = XMessage{
ID: id,
Values: values,
}
return nil, nil
})
for i := int64(0); i < n; i++ {
var err error
msgs[i], err = readXMessage(rd)
if err != nil {
return nil, err
}
@ -1493,6 +1502,92 @@ func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) {
//------------------------------------------------------------------------------
type XInfoStreamCmd struct {
baseCmd
val *XInfoStream
}
type XInfoStream struct {
Length int64
RadixTreeKeys int64
RadixTreeNodes int64
Groups int64
LastGeneratedID string
FirstEntry XMessage
LastEntry XMessage
}
var _ Cmder = (*XInfoStreamCmd)(nil)
func NewXInfoStreamCmd(ctx context.Context, stream string) *XInfoStreamCmd {
return &XInfoStreamCmd{
baseCmd: baseCmd{
ctx: ctx,
args: []interface{}{"xinfo", "stream", stream},
},
}
}
func (cmd *XInfoStreamCmd) Val() *XInfoStream {
return cmd.val
}
func (cmd *XInfoStreamCmd) Result() (*XInfoStream, error) {
return cmd.val, cmd.err
}
func (cmd *XInfoStreamCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
v, err := rd.ReadReply(xStreamInfoParser)
if err != nil {
return err
}
cmd.val = v.(*XInfoStream)
return nil
}
func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 14 {
return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+
"wanted 14", n)
}
var info XInfoStream
for i := 0; i < 7; i++ {
key, err := rd.ReadString()
if err != nil {
return nil, err
}
switch key {
case "length":
info.Length, err = rd.ReadIntReply()
case "radix-tree-keys":
info.RadixTreeKeys, err = rd.ReadIntReply()
case "radix-tree-nodes":
info.RadixTreeNodes, err = rd.ReadIntReply()
case "groups":
info.Groups, err = rd.ReadIntReply()
case "last-generated-id":
info.LastGeneratedID, err = rd.ReadString()
case "first-entry":
info.FirstEntry, err = readXMessage(rd)
case "last-entry":
info.LastEntry, err = readXMessage(rd)
default:
return nil, fmt.Errorf("redis: unexpected content %s "+
"in XINFO STREAM reply", key)
}
if err != nil {
return nil, err
}
}
return &info, nil
}
//------------------------------------------------------------------------------
type ZSliceCmd struct {
baseCmd

View File

@ -219,6 +219,7 @@ type Cmdable interface {
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
@ -1699,6 +1700,12 @@ func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd {
return cmd
}
func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd {
cmd := NewXInfoStreamCmd(ctx, key)
_ = c(ctx, cmd)
return cmd
}
//------------------------------------------------------------------------------
// Z represents sorted set member.