From 34916092ba3cc7249c6d3c4c47db8b820122b908 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Thu, 2 Aug 2018 14:48:46 +0300 Subject: [PATCH] Add streams group related commands --- command.go | 793 ++++++++++++++++++++++++++++++++++++++++------- commands.go | 244 +++++++++++---- commands_test.go | 206 ++++++++---- parser.go | 394 ----------------------- 4 files changed, 1018 insertions(+), 619 deletions(-) delete mode 100644 parser.go diff --git a/command.go b/command.go index 11472be..992e614 100644 --- a/command.go +++ b/command.go @@ -3,6 +3,7 @@ package redis import ( "bytes" "fmt" + "net" "strconv" "strings" "time" @@ -181,6 +182,33 @@ func (cmd *Cmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { + vals := make([]interface{}, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(sliceParser) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue + } + return nil, err + } + + switch v := v.(type) { + case []byte: + vals = append(vals, string(v)) + default: + vals = append(vals, v) + } + } + return vals, nil +} + //------------------------------------------------------------------------------ type SliceCmd struct { @@ -363,6 +391,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func timeParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d elements, expected 2", n) + } + + sec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + microsec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + return time.Unix(sec, microsec*1000), nil +} + //------------------------------------------------------------------------------ type BoolCmd struct { @@ -560,6 +607,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ss := make([]string, 0, n) + for i := int64(0); i < n; i++ { + s, err := rd.ReadStringReply() + if err == Nil { + ss = append(ss, "") + } else if err != nil { + return nil, err + } else { + ss = append(ss, s) + } + } + return ss, nil +} + //------------------------------------------------------------------------------ type BoolSliceCmd struct { @@ -598,6 +661,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + bools := make([]bool, 0, n) + for i := int64(0); i < n; i++ { + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + bools = append(bools, n == 1) + } + return bools, nil +} + //------------------------------------------------------------------------------ type StringStringMapCmd struct { @@ -636,6 +712,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]string, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + value, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ type StringIntMapCmd struct { @@ -674,6 +769,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]int64, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + m[key] = n + } + return m, nil +} + //------------------------------------------------------------------------------ type StringStructMapCmd struct { @@ -712,97 +826,31 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { return nil } -//------------------------------------------------------------------------------ +// Implements proto.MultiBulkParse +func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]struct{}, n) + for i := int64(0); i < n; i++ { + key, err := rd.ReadStringReply() + if err != nil { + return nil, err + } -type XStream struct { - Stream string - Messages []*XMessage + m[key] = struct{}{} + } + return m, nil } +//------------------------------------------------------------------------------ + type XMessage struct { ID string Values map[string]interface{} } -//------------------------------------------------------------------------------ - -type XStreamSliceCmd struct { - baseCmd - - val []*XStream -} - -var _ Cmder = (*XStreamSliceCmd)(nil) - -func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { - return &XStreamSliceCmd{ - baseCmd: baseCmd{_args: args}, - } -} - -func (cmd *XStreamSliceCmd) Val() []*XStream { - return cmd.val -} - -func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) { - return cmd.val, cmd.err -} - -func (cmd *XStreamSliceCmd) String() string { - return cmdString(cmd, cmd.val) -} - -func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { - var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) - if cmd.err != nil { - return cmd.err - } - cmd.val = v.([]*XStream) - return nil -} - -// Implements proto.MultiBulkParse -func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - xx := make([]*XStream, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xStreamParser) - if err != nil { - return nil, err - } - xx[i] = v.(*XStream) - } - return xx, nil -} - -// Implements proto.MultiBulkParse -func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) - } - - stream, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - v, err := rd.ReadArrayReply(xMessageSliceParser) - if err != nil { - return nil, err - } - - return &XStream{ - Stream: stream, - Messages: v.([]*XMessage), - }, nil -} - -//------------------------------------------------------------------------------ - type XMessageSliceCmd struct { baseCmd - val []*XMessage + val []XMessage } var _ Cmder = (*XMessageSliceCmd)(nil) @@ -813,11 +861,11 @@ func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { } } -func (cmd *XMessageSliceCmd) Val() []*XMessage { +func (cmd *XMessageSliceCmd) Val() []XMessage { return cmd.val } -func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) { +func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { return cmd.val, cmd.err } @@ -831,44 +879,41 @@ func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XMessage) + cmd.val = v.([]XMessage) return nil } // Implements proto.MultiBulkParse func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - msgs := make([]*XMessage, n) + msgs := make([]XMessage, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xMessageParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + return nil, err + } + + msgs = append(msgs, XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }) + return nil, nil + }) if err != nil { return nil, err } - msgs[i] = v.(*XMessage) } return msgs, nil } // Implements proto.MultiBulkParse -func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - v, err := rd.ReadArrayReply(xKeyValueParser) - if err != nil { - return nil, err - } - - return &XMessage{ - ID: id, - Values: v.(map[string]interface{}), - }, nil -} - -// Implements proto.MultiBulkParse -func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) { - values := make(map[string]interface{}, n) +func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]interface{}, n/2) for i := int64(0); i < n; i += 2 { key, err := rd.ReadStringReply() if err != nil { @@ -880,13 +925,288 @@ func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) { return nil, err } - values[key] = value + m[key] = value } - return values, nil + return m, nil } //------------------------------------------------------------------------------ +type XStream struct { + Stream string + Messages []XMessage +} + +type XStreamSliceCmd struct { + baseCmd + + val []XStream +} + +var _ Cmder = (*XStreamSliceCmd)(nil) + +func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { + return &XStreamSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XStreamSliceCmd) Val() []XStream { + return cmd.val +} + +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { + return cmd.val, cmd.err +} + +func (cmd *XStreamSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { + var v interface{} + v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XStream) + return nil +} + +// Implements proto.MultiBulkParse +func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XStream, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + ret = append(ret, XStream{ + Stream: stream, + Messages: v.([]XMessage), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} + +//------------------------------------------------------------------------------ + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + baseCmd + val *XPending +} + +var _ Cmder = (*XPendingCmd)(nil) + +func NewXPendingCmd(args ...interface{}) *XPendingCmd { + return &XPendingCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XPendingCmd) Val() *XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (*XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingCmd) readReply(cn *pool.Conn) error { + var info interface{} + info, cmd.err = cn.Rd.ReadArrayReply(xPendingParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.(*XPending) + return nil +} + +func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + count, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + lower, err := rd.ReadStringReply() + if err != nil && err != Nil { + return nil, err + } + + higher, err := rd.ReadStringReply() + if err != nil && err != Nil { + return nil, err + } + + pending := &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + for i := int64(0); i < n; i++ { + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + consumerName, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + consumerPendingStr, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + consumerPending, err := strconv.ParseInt(consumerPendingStr, 10, 64) + if err != nil { + return nil, err + } + + if pending.Consumers == nil { + pending.Consumers = make(map[string]int64) + } + pending.Consumers[consumerName] = consumerPending + + return nil, nil + }) + if err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil && err != Nil { + return nil, err + } + + return pending, nil +} + +//------------------------------------------------------------------------------ + +type XPendingExt struct { + Id string + Consumer string + Idle time.Duration + RetryCount int64 +} + +type XPendingExtCmd struct { + baseCmd + val []XPendingExt +} + +var _ Cmder = (*XPendingExtCmd)(nil) + +func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { + return &XPendingExtCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XPendingExtCmd) Val() []XPendingExt { + return cmd.val +} + +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingExtCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingExtCmd) readReply(cn *pool.Conn) error { + var info interface{} + info, cmd.err = cn.Rd.ReadArrayReply(xPendingExtSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.([]XPendingExt) + return nil +} + +func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XPendingExt, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + id, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + consumer, err := rd.ReadStringReply() + if err != nil && err != Nil { + return nil, err + } + + idle, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + retryCount, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + ret = append(ret, XPendingExt{ + Id: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} + +//------------------------------------------------------------------------------ + +//------------------------------------------------------------------------------ + type ZSliceCmd struct { baseCmd @@ -923,6 +1243,27 @@ func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + zz := make([]Z, n/2) + for i := int64(0); i < n; i += 2 { + var err error + + z := &zz[i/2] + + z.Member, err = rd.ReadStringReply() + if err != nil { + return nil, err + } + + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + return zz, nil +} + //------------------------------------------------------------------------------ type ScanCmd struct { @@ -1016,6 +1357,69 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { + slots := make([]ClusterSlot, n) + for i := 0; i < len(slots); i++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n < 2 { + err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + return nil, err + } + + start, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + end, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 && n != 3 { + err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) + return nil, err + } + + ip, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + port, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) + + if n == 3 { + id, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + nodes[j].Id = id + } + } + + slots[i] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } + } + return slots, nil +} + //------------------------------------------------------------------------------ // GeoLocation is used with GeoAdd to add geospatial location. @@ -1107,6 +1511,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { return nil } +func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + var loc GeoLocation + var err error + + loc.Name, err = rd.ReadStringReply() + if err != nil { + return nil, err + } + if q.WithDist { + loc.Dist, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + if q.WithGeoHash { + loc.GeoHash, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + } + if q.WithCoord { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d coordinates, expected 2", n) + } + + loc.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + loc.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + + return &loc, nil + } +} + +func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + locs := make([]GeoLocation, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(newGeoLocationParser(q)) + if err != nil { + return nil, err + } + switch vv := v.(type) { + case []byte: + locs = append(locs, GeoLocation{ + Name: string(vv), + }) + case *GeoLocation: + locs = append(locs, *vv) + default: + return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) + } + } + return locs, nil + } +} + //------------------------------------------------------------------------------ type GeoPos struct { @@ -1149,6 +1620,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { return nil } +func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + positions := make([]*GeoPos, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(geoPosParser) + if err != nil { + if err == Nil { + positions = append(positions, nil) + continue + } + return nil, err + } + switch v := v.(type) { + case *GeoPos: + positions = append(positions, v) + default: + return nil, fmt.Errorf("got %T, expected *GeoPos", v) + } + } + return positions, nil +} + +func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { + var pos GeoPos + var err error + + pos.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + pos.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + return &pos, nil +} + //------------------------------------------------------------------------------ type CommandInfo struct { @@ -1197,6 +1706,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} + +func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) + } + + cmd.Name, err = rd.ReadStringReply() + if err != nil { + return nil, err + } + + arity, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := rd.ReadReply(stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + //------------------------------------------------------------------------------ type cmdsInfoCache struct { diff --git a/commands.go b/commands.go index dddf8ac..b259e3a 100644 --- a/commands.go +++ b/commands.go @@ -172,16 +172,26 @@ type Cmdable interface { SRem(key string, members ...interface{}) *IntCmd SUnion(keys ...string) *StringSliceCmd SUnionStore(destination string, keys ...string) *IntCmd - XAdd(stream, id string, els map[string]interface{}) *StringCmd - XAddExt(opt *XAddExt) *StringCmd - XLen(key string) *IntCmd + XAdd(a *XAddArgs) *StringCmd + XLen(stream string) *IntCmd XRange(stream, start, stop string) *XMessageSliceCmd XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd XRevRange(stream string, start, stop string) *XMessageSliceCmd XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd - XRead(streams ...string) *XStreamSliceCmd - XReadN(count int64, streams ...string) *XStreamSliceCmd - XReadExt(opt *XReadExt) *XStreamSliceCmd + XRead(a *XReadArgs) *XStreamSliceCmd + XReadStreams(streams ...string) *XStreamSliceCmd + XGroupCreate(stream, group, start string) *StatusCmd + XGroupSetID(stream, group, start string) *StatusCmd + XGroupDestroy(stream, group string) *IntCmd + XGroupDelConsumer(stream, group, consumer string) *IntCmd + XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd + XAck(stream, group string, ids ...string) *IntCmd + XPending(stream, group string) *XPendingCmd + XPendingExt(a *XPendingExtArgs) *XPendingExtCmd + XClaim(a *XClaimArgs) *XMessageSliceCmd + XClaimJustID(a *XClaimArgs) *StringSliceCmd + XTrim(key string, maxLen int64) *IntCmd + XTrimApprox(key string, maxLen int64) *IntCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -1300,7 +1310,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd { //------------------------------------------------------------------------------ -type XAddExt struct { +type XAddArgs struct { Stream string MaxLen int64 // MAXLEN N MaxLenApprox int64 // MAXLEN ~ N @@ -1308,40 +1318,32 @@ type XAddExt struct { Values map[string]interface{} } -func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd { - a := make([]interface{}, 0, 6+len(opt.Values)*2) - a = append(a, "xadd") - a = append(a, opt.Stream) - if opt.MaxLen > 0 { - a = append(a, "maxlen", opt.MaxLen) - } else if opt.MaxLenApprox > 0 { - a = append(a, "maxlen", "~", opt.MaxLenApprox) +func (c *cmdable) XAdd(a *XAddArgs) *StringCmd { + args := make([]interface{}, 0, 6+len(a.Values)*2) + args = append(args, "xadd") + args = append(args, a.Stream) + if a.MaxLen > 0 { + args = append(args, "maxlen", a.MaxLen) + } else if a.MaxLenApprox > 0 { + args = append(args, "maxlen", "~", a.MaxLenApprox) } - if opt.ID != "" { - a = append(a, opt.ID) + if a.ID != "" { + args = append(args, a.ID) } else { - a = append(a, "*") + args = append(args, "*") } - for k, v := range opt.Values { - a = append(a, k) - a = append(a, v) + for k, v := range a.Values { + args = append(args, k) + args = append(args, v) } - cmd := NewStringCmd(a...) + cmd := NewStringCmd(args...) c.process(cmd) return cmd } -func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd { - return c.XAddExt(&XAddExt{ - Stream: stream, - ID: id, - Values: values, - }) -} - -func (c *cmdable) XLen(key string) *IntCmd { - cmd := NewIntCmd("xlen", key) +func (c *cmdable) XLen(stream string) *IntCmd { + cmd := NewIntCmd("xlen", stream) c.process(cmd) return cmd } @@ -1370,55 +1372,173 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS return cmd } -type XReadExt struct { +type XReadArgs struct { Streams []string Count int64 Block time.Duration } -func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd { - a := make([]interface{}, 0, 5+len(opt.Streams)) - a = append(a, "xread") - if opt != nil { - if opt.Count > 0 { - a = append(a, "count") - a = append(a, opt.Count) - } - if opt.Block >= 0 { - a = append(a, "block") - a = append(a, int64(opt.Block/time.Millisecond)) - } +func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 5+len(a.Streams)) + args = append(args, "xread") + if a.Count > 0 { + args = append(args, "count") + args = append(args, a.Count) } - a = append(a, "streams") - for _, s := range opt.Streams { - a = append(a, s) + if a.Block >= 0 { + args = append(args, "block") + args = append(args, int64(a.Block/time.Millisecond)) + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) } - cmd := NewXStreamSliceCmd(a...) + cmd := NewXStreamSliceCmd(args...) c.process(cmd) return cmd } -func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ +func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd { + return c.XRead(&XReadArgs{ Streams: streams, Block: -1, }) } -func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ - Streams: streams, - Count: count, - Block: -1, - }) +func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start) + c.process(cmd) + return cmd } -func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ - Streams: streams, - Block: block, - }) +func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "setid", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd { + cmd := NewIntCmd("xgroup", "destroy", stream, group) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { + cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer) + c.process(cmd) + return cmd +} + +type XReadGroupArgs struct { + Group string + Consumer string + Streams []string + Count int64 + Block time.Duration +} + +func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 8+len(a.Streams)) + args = append(args, "xreadgroup", "group", a.Group, a.Consumer) + if a.Count > 0 { + args = append(args, "count", a.Count) + } + if a.Block >= 0 { + args = append(args, "block", int64(a.Block/time.Millisecond)) + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd { + args := []interface{}{"xack", stream, group} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XPending(stream, group string) *XPendingCmd { + cmd := NewXPendingCmd("xpending", stream, group) + c.process(cmd) + return cmd +} + +type XPendingExtArgs struct { + Stream string + Group string + Start string + End string + Count int64 + Consumer string +} + +func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + if a.Consumer != "" { + args = append(args, a.Consumer) + } + cmd := NewXPendingExtCmd(args...) + c.process(cmd) + return cmd +} + +type XClaimArgs struct { + Stream string + Group string + Consumer string + MinIdle time.Duration + Messages []string +} + +func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd { + args := xClaimArgs(a) + cmd := NewXMessageSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd { + args := xClaimArgs(a) + args = append(args, "justid") + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func xClaimArgs(a *XClaimArgs) []interface{} { + args := make([]interface{}, 0, 4+len(a.Messages)) + args = append(args, + "xclaim", + a.Stream, + a.Group, a.Consumer, + int64(a.MinIdle/time.Millisecond)) + for _, id := range a.Messages { + args = append(args, id) + } + return args +} + +func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", maxLen) + c.process(cmd) + return cmd +} + +func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen) + c.process(cmd) + return cmd } //------------------------------------------------------------------------------ diff --git a/commands_test.go b/commands_test.go index b29b4f7..9b3847e 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3042,37 +3042,54 @@ var _ = Describe("Commands", func() { }) Describe("streams", func() { - createStream := func() { - id, err := client.XAdd("stream", "1-0", map[string]interface{}{ - "uno": "un", + BeforeEach(func() { + id, err := client.XAdd(&redis.XAddArgs{ + Stream: "stream", + ID: "1-0", + Values: map[string]interface{}{"uno": "un"}, }).Result() Expect(err).NotTo(HaveOccurred()) Expect(id).To(Equal("1-0")) - id, err = client.XAdd("stream", "2-0", map[string]interface{}{ - "dos": "deux", + id, err = client.XAdd(&redis.XAddArgs{ + Stream: "stream", + ID: "2-0", + Values: map[string]interface{}{"dos": "deux"}, }).Result() Expect(err).NotTo(HaveOccurred()) Expect(id).To(Equal("2-0")) - id, err = client.XAdd("stream", "3-0", map[string]interface{}{ - "tres": "troix", + id, err = client.XAdd(&redis.XAddArgs{ + Stream: "stream", + ID: "3-0", + Values: map[string]interface{}{"tres": "troix"}, }).Result() Expect(err).NotTo(HaveOccurred()) Expect(id).To(Equal("3-0")) - } + }) + + It("should XTrim", func() { + n, err := client.XTrim("stream", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimApprox", func() { + n, err := client.XTrimApprox("stream", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) It("should XAdd", func() { - createStream() - - id, err := client.XAdd("stream", "*", map[string]interface{}{ - "quatro": "quatre", + id, err := client.XAdd(&redis.XAddArgs{ + Stream: "stream", + Values: map[string]interface{}{"quatro": "quatre"}, }).Result() Expect(err).NotTo(HaveOccurred()) vals, err := client.XRange("stream", "-", "+").Result() Expect(err).NotTo(HaveOccurred()) - Expect(vals).To(Equal([]*redis.XMessage{ + Expect(vals).To(Equal([]redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, @@ -3080,10 +3097,8 @@ var _ = Describe("Commands", func() { })) }) - It("should XAddExt", func() { - createStream() - - id, err := client.XAddExt(&redis.XAddExt{ + It("should XAdd with MaxLen", func() { + id, err := client.XAdd(&redis.XAddArgs{ Stream: "stream", MaxLen: 1, Values: map[string]interface{}{"quatro": "quatre"}, @@ -3092,25 +3107,21 @@ var _ = Describe("Commands", func() { vals, err := client.XRange("stream", "-", "+").Result() Expect(err).NotTo(HaveOccurred()) - Expect(vals).To(Equal([]*redis.XMessage{ + Expect(vals).To(Equal([]redis.XMessage{ {ID: id, Values: map[string]interface{}{"quatro": "quatre"}}, })) }) It("should XLen", func() { - createStream() - n, err := client.XLen("stream").Result() Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(3))) }) It("should XRange", func() { - createStream() - msgs, err := client.XRange("stream", "-", "+").Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, @@ -3118,48 +3129,44 @@ var _ = Describe("Commands", func() { msgs, err = client.XRange("stream", "2", "+").Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, })) msgs, err = client.XRange("stream", "-", "2").Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, })) }) It("should XRangeN", func() { - createStream() - msgs, err := client.XRangeN("stream", "-", "+", 2).Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, })) msgs, err = client.XRangeN("stream", "2", "+", 1).Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, })) msgs, err = client.XRangeN("stream", "-", "2", 1).Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, })) }) It("should XRevRange", func() { - createStream() - msgs, err := client.XRevRange("stream", "+", "-").Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, @@ -3167,82 +3174,171 @@ var _ = Describe("Commands", func() { msgs, err = client.XRevRange("stream", "+", "2").Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, })) }) It("should XRevRangeN", func() { - createStream() - msgs, err := client.XRevRangeN("stream", "+", "-", 2).Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, })) msgs, err = client.XRevRangeN("stream", "+", "2", 1).Result() Expect(err).NotTo(HaveOccurred()) - Expect(msgs).To(Equal([]*redis.XMessage{ + Expect(msgs).To(Equal([]redis.XMessage{ {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, })) }) It("should XRead", func() { - createStream() - - res, err := client.XRead("stream", "0").Result() + res, err := client.XReadStreams("stream", "0").Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal([]*redis.XStream{{ + Expect(res).To(Equal([]redis.XStream{{ Stream: "stream", - Messages: []*redis.XMessage{ + Messages: []redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, }}, })) - _, err = client.XRead("stream", "3").Result() + _, err = client.XReadStreams("stream", "3").Result() Expect(err).To(Equal(redis.Nil)) }) - It("should XReadExt", func() { - createStream() - - res, err := client.XReadExt(&redis.XReadExt{ + It("should XRead", func() { + res, err := client.XRead(&redis.XReadArgs{ Streams: []string{"stream", "0"}, Count: 2, Block: 100 * time.Millisecond, }).Result() Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal([]*redis.XStream{{ + Expect(res).To(Equal([]redis.XStream{{ Stream: "stream", - Messages: []*redis.XMessage{ + Messages: []redis.XMessage{ {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, }}, })) - _, err = client.XReadExt(&redis.XReadExt{ + _, err = client.XRead(&redis.XReadArgs{ Streams: []string{"stream", "3"}, Count: 1, Block: 100 * time.Millisecond, }).Result() Expect(err).To(Equal(redis.Nil)) }) + + Describe("group", func() { + BeforeEach(func() { + err := client.XGroupCreate("stream", "group", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + res, err := client.XReadGroup(&redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer", + Streams: []string{"stream", "0"}, + }).Result() + Expect(res).To(Equal([]redis.XStream{{ + Stream: "stream", + Messages: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }}, + })) + }) + + AfterEach(func() { + n, err := client.XGroupDestroy("stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + }) + + It("should XPending", func() { + info, err := client.XPending("stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info).To(Equal(&redis.XPending{ + Count: 3, + Lower: "1-0", + Higher: "3-0", + Consumers: map[string]int64{"consumer": 3}, + })) + + infoExt, err := client.XPendingExt(&redis.XPendingExtArgs{ + Stream: "stream", + Group: "group", + Start: "-", + End: "+", + Count: 10, + Consumer: "consumer", + }).Result() + Expect(err).NotTo(HaveOccurred()) + for i := range infoExt { + infoExt[i].Idle = 0 + } + Expect(infoExt).To(Equal([]redis.XPendingExt{ + {Id: "1-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + {Id: "2-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + {Id: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, + })) + + n, err := client.XGroupDelConsumer("stream", "group", "consumer").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XClaim", func() { + msgs, err := client.XClaim(&redis.XClaimArgs{ + Stream: "stream", + Group: "group", + Consumer: "consumer", + Messages: []string{"1-0", "2-0", "3-0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]redis.XMessage{{ + ID: "1-0", + Values: map[string]interface{}{"uno": "un"}, + }, { + ID: "2-0", + Values: map[string]interface{}{"dos": "deux"}, + }, { + ID: "3-0", + Values: map[string]interface{}{"tres": "troix"}, + }})) + + ids, err := client.XClaimJustID(&redis.XClaimArgs{ + Stream: "stream", + Group: "group", + Consumer: "consumer", + Messages: []string{"1-0", "2-0", "3-0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(ids).To(Equal([]string{"1-0", "2-0", "3-0"})) + }) + + It("should XAck", func() { + n, err := client.XAck("stream", "group", "1-0", "2-0", "4-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(2))) + }) + }) }) Describe("Geo add and radius search", func() { BeforeEach(func() { - geoAdd := client.GeoAdd( + n, err := client.GeoAdd( "Sicily", &redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"}, &redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"}, - ) - Expect(geoAdd.Err()).NotTo(HaveOccurred()) - Expect(geoAdd.Val()).To(Equal(int64(2))) + ).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(2))) }) It("should not add same geo location", func() { diff --git a/parser.go b/parser.go deleted file mode 100644 index f0dc67f..0000000 --- a/parser.go +++ /dev/null @@ -1,394 +0,0 @@ -package redis - -import ( - "fmt" - "net" - "strconv" - "time" - - "github.com/go-redis/redis/internal/proto" -) - -// Implements proto.MultiBulkParse -func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { - vals := make([]interface{}, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(sliceParser) - if err != nil { - if err == Nil { - vals = append(vals, nil) - continue - } - if err, ok := err.(proto.RedisError); ok { - vals = append(vals, err) - continue - } - return nil, err - } - - switch v := v.(type) { - case []byte: - vals = append(vals, string(v)) - default: - vals = append(vals, v) - } - } - return vals, nil -} - -// Implements proto.MultiBulkParse -func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - bools := make([]bool, 0, n) - for i := int64(0); i < n; i++ { - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - bools = append(bools, n == 1) - } - return bools, nil -} - -// Implements proto.MultiBulkParse -func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - ss := make([]string, 0, n) - for i := int64(0); i < n; i++ { - s, err := rd.ReadStringReply() - if err == Nil { - ss = append(ss, "") - } else if err != nil { - return nil, err - } else { - ss = append(ss, s) - } - } - return ss, nil -} - -// Implements proto.MultiBulkParse -func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]string, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - value, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - m[key] = value - } - return m, nil -} - -// Implements proto.MultiBulkParse -func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]int64, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - m[key] = n - } - return m, nil -} - -// Implements proto.MultiBulkParse -func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]struct{}, n) - for i := int64(0); i < n; i++ { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - m[key] = struct{}{} - } - return m, nil -} - -// Implements proto.MultiBulkParse -func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - zz := make([]Z, n/2) - for i := int64(0); i < n; i += 2 { - var err error - - z := &zz[i/2] - - z.Member, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - - z.Score, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - return zz, nil -} - -// Implements proto.MultiBulkParse -func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { - slots := make([]ClusterSlot, n) - for i := 0; i < len(slots); i++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n < 2 { - err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) - return nil, err - } - - start, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - end, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - nodes := make([]ClusterNode, n-2) - for j := 0; j < len(nodes); j++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 && n != 3 { - err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) - return nil, err - } - - ip, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - port, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) - - if n == 3 { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - nodes[j].Id = id - } - } - - slots[i] = ClusterSlot{ - Start: int(start), - End: int(end), - Nodes: nodes, - } - } - return slots, nil -} - -func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - var loc GeoLocation - var err error - - loc.Name, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - if q.WithDist { - loc.Dist, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - if q.WithGeoHash { - loc.GeoHash, err = rd.ReadIntReply() - if err != nil { - return nil, err - } - } - if q.WithCoord { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 { - return nil, fmt.Errorf("got %d coordinates, expected 2", n) - } - - loc.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - loc.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - - return &loc, nil - } -} - -func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - locs := make([]GeoLocation, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(newGeoLocationParser(q)) - if err != nil { - return nil, err - } - switch vv := v.(type) { - case []byte: - locs = append(locs, GeoLocation{ - Name: string(vv), - }) - case *GeoLocation: - locs = append(locs, *vv) - default: - return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) - } - } - return locs, nil - } -} - -func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { - var pos GeoPos - var err error - - pos.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - pos.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - return &pos, nil -} - -func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - positions := make([]*GeoPos, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(geoPosParser) - if err != nil { - if err == Nil { - positions = append(positions, nil) - continue - } - return nil, err - } - switch v := v.(type) { - case *GeoPos: - positions = append(positions, v) - default: - return nil, fmt.Errorf("got %T, expected *GeoPos", v) - } - } - return positions, nil -} - -func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { - var cmd CommandInfo - var err error - - if n != 6 { - return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) - } - - cmd.Name, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - - arity, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.Arity = int8(arity) - - flags, err := rd.ReadReply(stringSliceParser) - if err != nil { - return nil, err - } - cmd.Flags = flags.([]string) - - firstKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.FirstKeyPos = int8(firstKeyPos) - - lastKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.LastKeyPos = int8(lastKeyPos) - - stepCount, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.StepCount = int8(stepCount) - - for _, flag := range cmd.Flags { - if flag == "readonly" { - cmd.ReadOnly = true - break - } - } - - return &cmd, nil -} - -// Implements proto.MultiBulkParse -func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]*CommandInfo, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(commandInfoParser) - if err != nil { - return nil, err - } - vv := v.(*CommandInfo) - m[vv.Name] = vv - - } - return m, nil -} - -// Implements proto.MultiBulkParse -func timeParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d elements, expected 2", n) - } - - sec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - microsec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - return time.Unix(sec, microsec*1000), nil -}