diff --git a/command.go b/command.go index 9ff8a260..2edba95d 100644 --- a/command.go +++ b/command.go @@ -1879,11 +1879,7 @@ func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { case "entries": cmd.val.Entries, err = readXMessageSlice(rd) case "groups": - groups, err := rd.ReadReply(readStreamGroups) - if err != nil { - return err - } - cmd.val.Groups = groups.([]XInfoStreamGroup) + cmd.val.Groups, err = readStreamGroups(rd) default: return fmt.Errorf("redis: unexpected content %s "+ "in XINFO STREAM reply", key) @@ -1895,9 +1891,13 @@ func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { return nil } -func readStreamGroups(rd *proto.Reader, n int64) (interface{}, error) { +func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } groups := make([]XInfoStreamGroup, 0, n) - for i := int64(0); i < n; i++ { + for i := 0; i < n; i++ { nn, err := rd.ReadArrayLen() if err != nil { return nil, err @@ -1906,31 +1906,34 @@ func readStreamGroups(rd *proto.Reader, n int64) (interface{}, error) { return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ "wanted 10", nn) } - key, err := rd.ReadString() - if err != nil { - return nil, err - } group := XInfoStreamGroup{} - switch key { - case "name": - group.Name, err = rd.ReadString() - case "last-delivered-id": - group.LastDeliveredID, err = rd.ReadString() - case "pel-count": - group.PelCount, err = rd.ReadIntReply() - case "pending": - group.Pending, err = readXInfoStreamGroupPending(rd) - case "consumers": - group.Consumers, err = readXInfoStreamConsumers(rd) - default: - return nil, fmt.Errorf("redis: unexpected content %s "+ - "in XINFO STREAM reply", key) - } + for f := 0; f < 5; f++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } - if err != nil { - return nil, err + switch key { + case "name": + group.Name, err = rd.ReadString() + case "last-delivered-id": + group.LastDeliveredID, err = rd.ReadString() + case "pel-count": + group.PelCount, err = rd.ReadIntReply() + case "pending": + group.Pending, err = readXInfoStreamGroupPending(rd) + case "consumers": + group.Consumers, err = readXInfoStreamConsumers(rd) + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + + if err != nil { + return nil, err + } } groups = append(groups, group) @@ -2004,71 +2007,71 @@ func readXInfoStreamConsumers(rd *proto.Reader) ([]XInfoStreamConsumer, error) { "wanted 8", nn) } - cKey, err := rd.ReadString() - if err != nil { - return nil, err - } - c := XInfoStreamConsumer{} - switch cKey { - case "name": - c.Name, err = rd.ReadString() - case "seen-time": - seen, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond)) - case "pel-count": - c.PelCount, err = rd.ReadIntReply() - case "pending": - pendingNumber, err := rd.ReadArrayLen() + for f := 0; f < 4; f++ { + cKey, err := rd.ReadString() if err != nil { return nil, err } - c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) - - for f := 0; f < pendingNumber; f++ { - nn, err := rd.ReadArrayLen() + switch cKey { + case "name": + c.Name, err = rd.ReadString() + case "seen-time": + seen, err := rd.ReadIntReply() if err != nil { return nil, err } - if nn != 3 { - return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ - "wanted 3", nn) - } - - p := XInfoStreamConsumerPending{} - - p.ID, err = rd.ReadString() + c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond)) + case "pel-count": + c.PelCount, err = rd.ReadIntReply() + case "pending": + pendingNumber, err := rd.ReadArrayLen() if err != nil { return nil, err } - delivery, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) - p.DeliveryCount, err = rd.ReadIntReply() - if err != nil { - return nil, err - } + for f := 0; f < pendingNumber; f++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 3 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ + "wanted 3", nn) + } - c.Pending = append(c.Pending, p) + p := XInfoStreamConsumerPending{} + + p.ID, err = rd.ReadString() + if err != nil { + return nil, err + } + + delivery, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + + p.DeliveryCount, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + + c.Pending = append(c.Pending, p) + } + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", cKey) + } + if err != nil { + return nil, err } - default: - return nil, fmt.Errorf("redis: unexpected content %s "+ - "in XINFO STREAM reply", cKey) } - - if err != nil { - return nil, err - } - consumers = append(consumers, c) }