diff --git a/command.go b/command.go index 7f49b78..b202de9 100644 --- a/command.go +++ b/command.go @@ -1395,10 +1395,10 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { type XInfoGroupsCmd struct { baseCmd - val []XInfoGroups + val []XInfoGroup } -type XInfoGroups struct { +type XInfoGroup struct { Name string Consumers int64 Pending int64 @@ -1416,11 +1416,11 @@ func NewXInfoGroupsCmd(ctx context.Context, stream string) *XInfoGroupsCmd { } } -func (cmd *XInfoGroupsCmd) Val() []XInfoGroups { +func (cmd *XInfoGroupsCmd) Val() []XInfoGroup { return cmd.val } -func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroups, error) { +func (cmd *XInfoGroupsCmd) Result() ([]XInfoGroup, error) { return cmd.val, cmd.err } @@ -1429,58 +1429,66 @@ func (cmd *XInfoGroupsCmd) String() string { } func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(xGroupInfoParser) - if err != nil { - return nil, err - } - cmd.val = append(cmd.val, v.(XInfoGroups)) + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + + cmd.val = make([]XInfoGroup, n) + + for i := 0; i < n; i++ { + cmd.val[i], err = readXGroupInfo(rd) + if err != nil { + return err } - return nil, nil - }) - return err + } + + return nil } -func xGroupInfoParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 8 { - return nil, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply,"+ - "wanted 8", n) +func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) { + var group XInfoGroup + + n, err := rd.ReadArrayLen() + if err != nil { + return group, err + } + if n != 8 { + return group, fmt.Errorf("redis: got %d elements in XINFO GROUPS reply, wanted 8", n) } - var ( - err error - grp XInfoGroups - key string - val string - ) for i := 0; i < 4; i++ { - key, err = rd.ReadString() + key, err := rd.ReadString() if err != nil { - return nil, err + return group, err } - val, err = rd.ReadString() + + val, err := rd.ReadString() if err != nil { - return nil, err + return group, err } + switch key { case "name": - grp.Name = val + group.Name = val case "consumers": - grp.Consumers, err = strconv.ParseInt(val, 0, 64) + group.Consumers, err = strconv.ParseInt(val, 0, 64) + if err != nil { + return group, err + } case "pending": - grp.Pending, err = strconv.ParseInt(val, 0, 64) + group.Pending, err = strconv.ParseInt(val, 0, 64) + if err != nil { + return group, err + } case "last-delivered-id": - grp.LastDeliveredID = val + group.LastDeliveredID = val default: - return nil, fmt.Errorf("redis: unexpected content %s "+ - "in XINFO GROUPS reply", key) - } - if err != nil { - return nil, err + return group, fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key) } } - return grp, err + + return group, nil } //------------------------------------------------------------------------------ @@ -2257,7 +2265,7 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error { } cmdString := make([]string, cmdLen) - for i := 0; i < int(cmdLen); i++ { + for i := 0; i < cmdLen; i++ { cmdString[i], err = rd.ReadString() if err != nil { return nil, err @@ -2265,7 +2273,7 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error { } var address, name string - for i := 4; i < int(n); i++ { + for i := 4; i < n; i++ { str, err := rd.ReadString() if err != nil { return nil, err diff --git a/commands.go b/commands.go index a6c9eee..973aaf5 100644 --- a/commands.go +++ b/commands.go @@ -1517,7 +1517,7 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd { keyPos += 2 } args = append(args, "streams") - keyPos += 1 + keyPos++ for _, s := range a.Streams { args = append(args, s) } @@ -1592,10 +1592,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic } if a.NoAck { args = append(args, "noack") - keyPos += 1 + keyPos++ } args = append(args, "streams") - keyPos += 1 + keyPos++ for _, s := range a.Streams { args = append(args, s) } diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 4a16349..d9e6c12 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -181,7 +181,7 @@ func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { } } -func (r *Reader) ReadArrayLen() (int64, error) { +func (r *Reader) ReadArrayLen() (int, error) { line, err := r.ReadLine() if err != nil { return 0, err @@ -190,7 +190,11 @@ func (r *Reader) ReadArrayLen() (int64, error) { case ErrorReply: return 0, ParseErrorReply(line) case ArrayReply: - return parseArrayLen(line) + n, err := parseArrayLen(line) + if err != nil { + return 0, err + } + return int(n), nil default: return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line) } @@ -216,7 +220,8 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { } keys := make([]string, n) - for i := int64(0); i < n; i++ { + + for i := 0; i < n; i++ { key, err := r.ReadString() if err != nil { return nil, 0, err