Merge pull request #1505 from go-redis/fix/refactor-xinfo-group

Refactor NewXInfoGroupsCmd
This commit is contained in:
Vladimir Mihailenco 2020-09-23 12:05:02 +03:00 committed by GitHub
commit 2b9cfd3cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 46 deletions

View File

@ -1395,10 +1395,10 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
type XInfoGroupsCmd struct {
baseCmd
val []XInfoGroups
val []XInfoGroup
}
type XInfoGroups struct {
type XInfoGroup struct {
Name string
Consumers int64
Pending int64
@ -1416,11 +1416,11 @@ func NewXInfoGroupsCmd(ctx context.Context, stream string) *XInfoGroupsCmd {
}
}
func (cmd *XInfoGroupsCmd) Val() []XInfoGroups {
func (cmd *XInfoGroupsCmd) Val() []XInfoGroup {
return cmd.val
}
func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroups, error) {
func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroup, error) {
return cmd.val, cmd.err
}
@ -1429,58 +1429,66 @@ func (cmd *XInfoGroupsCmd) String() string {
}
func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(xGroupInfoParser)
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
cmd.val = append(cmd.val, v.(XInfoGroups))
}
return nil, nil
})
return err
}
cmd.val = make([]XInfoGroup, n)
for i := 0; i < n; i++ {
cmd.val[i], err = readXGroupInfo(rd)
if err != nil {
return err
}
}
return nil
}
func xGroupInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 8 {
return nil, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply,"+
"wanted 8", n)
func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) {
var group XInfoGroup
n, err := rd.ReadArrayLen()
if err != nil {
return group, err
}
if n != 8 {
return group, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply, wanted 8", n)
}
var (
err error
grp XInfoGroups
key string
val string
)
for i := 0; i < 4; i++ {
key, err = rd.ReadString()
key, err := rd.ReadString()
if err != nil {
return nil, err
return group, err
}
val, err = rd.ReadString()
val, err := rd.ReadString()
if err != nil {
return nil, err
return group, err
}
switch key {
case "name":
grp.Name = val
group.Name = val
case "consumers":
grp.Consumers, err = strconv.ParseInt(val, 0, 64)
case "pending":
grp.Pending, err = strconv.ParseInt(val, 0, 64)
case "last-delivered-id":
grp.LastDeliveredID = val
default:
return nil, fmt.Errorf("redis: unexpected content %s "+
"in XINFO GROUPS reply", key)
}
group.Consumers, err = strconv.ParseInt(val, 0, 64)
if err != nil {
return nil, err
return group, err
}
case "pending":
group.Pending, err = strconv.ParseInt(val, 0, 64)
if err != nil {
return group, err
}
case "last-delivered-id":
group.LastDeliveredID = val
default:
return group, fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key)
}
}
return grp, err
return group, nil
}
//------------------------------------------------------------------------------
@ -2257,7 +2265,7 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error {
}
cmdString := make([]string, cmdLen)
for i := 0; i < int(cmdLen); i++ {
for i := 0; i < cmdLen; i++ {
cmdString[i], err = rd.ReadString()
if err != nil {
return nil, err
@ -2265,7 +2273,7 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error {
}
var address, name string
for i := 4; i < int(n); i++ {
for i := 4; i < n; i++ {
str, err := rd.ReadString()
if err != nil {
return nil, err

View File

@ -1517,7 +1517,7 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
keyPos += 2
}
args = append(args, "streams")
keyPos += 1
keyPos++
for _, s := range a.Streams {
args = append(args, s)
}
@ -1592,10 +1592,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
}
if a.NoAck {
args = append(args, "noack")
keyPos += 1
keyPos++
}
args = append(args, "streams")
keyPos += 1
keyPos++
for _, s := range a.Streams {
args = append(args, s)
}

View File

@ -181,7 +181,7 @@ func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
}
}
func (r *Reader) ReadArrayLen() (int64, error) {
func (r *Reader) ReadArrayLen() (int, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
@ -190,7 +190,11 @@ func (r *Reader) ReadArrayLen() (int64, error) {
case ErrorReply:
return 0, ParseErrorReply(line)
case ArrayReply:
return parseArrayLen(line)
n, err := parseArrayLen(line)
if err != nil {
return 0, err
}
return int(n), nil
default:
return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
@ -216,7 +220,8 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) {
}
keys := make([]string, n)
for i := int64(0); i < n; i++ {
for i := 0; i < n; i++ {
key, err := r.ReadString()
if err != nil {
return nil, 0, err