mirror of https://github.com/go-redis/redis.git
Merge pull request #830 from go-redis/feature/x-group
Add streams group related commands
This commit is contained in:
commit
ad7024da36
717
command.go
717
command.go
|
@ -3,6 +3,7 @@ package redis
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -181,6 +182,33 @@ func (cmd *Cmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
vals := make([]interface{}, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
v, err := rd.ReadReply(sliceParser)
|
||||||
|
if err != nil {
|
||||||
|
if err == Nil {
|
||||||
|
vals = append(vals, nil)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err, ok := err.(proto.RedisError); ok {
|
||||||
|
vals = append(vals, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch v := v.(type) {
|
||||||
|
case []byte:
|
||||||
|
vals = append(vals, string(v))
|
||||||
|
default:
|
||||||
|
vals = append(vals, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return vals, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type SliceCmd struct {
|
type SliceCmd struct {
|
||||||
|
@ -363,6 +391,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 2 {
|
||||||
|
return nil, fmt.Errorf("got %d elements, expected 2", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
sec, err := rd.ReadInt()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
microsec, err := rd.ReadInt()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Unix(sec, microsec*1000), nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type BoolCmd struct {
|
type BoolCmd struct {
|
||||||
|
@ -560,6 +607,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
ss := make([]string, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
s, err := rd.ReadStringReply()
|
||||||
|
if err == Nil {
|
||||||
|
ss = append(ss, "")
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
ss = append(ss, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ss, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type BoolSliceCmd struct {
|
type BoolSliceCmd struct {
|
||||||
|
@ -598,6 +661,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
bools := make([]bool, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
n, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bools = append(bools, n == 1)
|
||||||
|
}
|
||||||
|
return bools, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type StringStringMapCmd struct {
|
type StringStringMapCmd struct {
|
||||||
|
@ -636,6 +712,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
m := make(map[string]string, n/2)
|
||||||
|
for i := int64(0); i < n; i += 2 {
|
||||||
|
key, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m[key] = value
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type StringIntMapCmd struct {
|
type StringIntMapCmd struct {
|
||||||
|
@ -674,6 +769,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
m := make(map[string]int64, n/2)
|
||||||
|
for i := int64(0); i < n; i += 2 {
|
||||||
|
key, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m[key] = n
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type StringStructMapCmd struct {
|
type StringStructMapCmd struct {
|
||||||
|
@ -712,24 +826,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
// Implements proto.MultiBulkParse
|
||||||
|
func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
m := make(map[string]struct{}, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
key, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
type XStream struct {
|
m[key] = struct{}{}
|
||||||
Stream string
|
}
|
||||||
Messages []*XMessage
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type XMessage struct {
|
type XMessage struct {
|
||||||
ID string
|
ID string
|
||||||
Values map[string]interface{}
|
Values map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type XMessageSliceCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
val []XMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*XMessageSliceCmd)(nil)
|
||||||
|
|
||||||
|
func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
|
||||||
|
return &XMessageSliceCmd{
|
||||||
|
baseCmd: baseCmd{_args: args},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) Val() []XMessage {
|
||||||
|
return cmd.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) {
|
||||||
|
return cmd.val, cmd.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
|
var v interface{}
|
||||||
|
v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
|
||||||
|
if cmd.err != nil {
|
||||||
|
return cmd.err
|
||||||
|
}
|
||||||
|
cmd.val = v.([]XMessage)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
msgs := make([]XMessage, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
id, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs = append(msgs, XMessage{
|
||||||
|
ID: id,
|
||||||
|
Values: v.(map[string]interface{}),
|
||||||
|
})
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return msgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
m := make(map[string]interface{}, n/2)
|
||||||
|
for i := int64(0); i < n; i += 2 {
|
||||||
|
key, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m[key] = value
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XStream struct {
|
||||||
|
Stream string
|
||||||
|
Messages []XMessage
|
||||||
|
}
|
||||||
|
|
||||||
type XStreamSliceCmd struct {
|
type XStreamSliceCmd struct {
|
||||||
baseCmd
|
baseCmd
|
||||||
|
|
||||||
val []*XStream
|
val []XStream
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Cmder = (*XStreamSliceCmd)(nil)
|
var _ Cmder = (*XStreamSliceCmd)(nil)
|
||||||
|
@ -740,11 +951,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XStreamSliceCmd) Val() []*XStream {
|
func (cmd *XStreamSliceCmd) Val() []XStream {
|
||||||
return cmd.val
|
return cmd.val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) {
|
func (cmd *XStreamSliceCmd) Result() ([]XStream, error) {
|
||||||
return cmd.val, cmd.err
|
return cmd.val, cmd.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -758,25 +969,15 @@ func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
if cmd.err != nil {
|
if cmd.err != nil {
|
||||||
return cmd.err
|
return cmd.err
|
||||||
}
|
}
|
||||||
cmd.val = v.([]*XStream)
|
cmd.val = v.([]XStream)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
// Implements proto.MultiBulkParse
|
||||||
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
xx := make([]*XStream, n)
|
ret := make([]XStream, 0, n)
|
||||||
for i := int64(0); i < n; i++ {
|
for i := int64(0); i < n; i++ {
|
||||||
v, err := rd.ReadArrayReply(xStreamParser)
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
xx[i] = v.(*XStream)
|
|
||||||
}
|
|
||||||
return xx, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
if n != 2 {
|
if n != 2 {
|
||||||
return nil, fmt.Errorf("got %d, wanted 2", n)
|
return nil, fmt.Errorf("got %d, wanted 2", n)
|
||||||
}
|
}
|
||||||
|
@ -791,99 +992,218 @@ func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &XStream{
|
ret = append(ret, XStream{
|
||||||
Stream: stream,
|
Stream: stream,
|
||||||
Messages: v.([]*XMessage),
|
Messages: v.([]XMessage),
|
||||||
}, nil
|
})
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type XMessageSliceCmd struct {
|
type XPending struct {
|
||||||
baseCmd
|
Count int64
|
||||||
|
Lower string
|
||||||
val []*XMessage
|
Higher string
|
||||||
|
Consumers map[string]int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Cmder = (*XMessageSliceCmd)(nil)
|
type XPendingCmd struct {
|
||||||
|
baseCmd
|
||||||
|
val *XPending
|
||||||
|
}
|
||||||
|
|
||||||
func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
|
var _ Cmder = (*XPendingCmd)(nil)
|
||||||
return &XMessageSliceCmd{
|
|
||||||
|
func NewXPendingCmd(args ...interface{}) *XPendingCmd {
|
||||||
|
return &XPendingCmd{
|
||||||
baseCmd: baseCmd{_args: args},
|
baseCmd: baseCmd{_args: args},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XMessageSliceCmd) Val() []*XMessage {
|
func (cmd *XPendingCmd) Val() *XPending {
|
||||||
return cmd.val
|
return cmd.val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) {
|
func (cmd *XPendingCmd) Result() (*XPending, error) {
|
||||||
return cmd.val, cmd.err
|
return cmd.val, cmd.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XMessageSliceCmd) String() string {
|
func (cmd *XPendingCmd) String() string {
|
||||||
return cmdString(cmd, cmd.val)
|
return cmdString(cmd, cmd.val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
|
func (cmd *XPendingCmd) readReply(cn *pool.Conn) error {
|
||||||
var v interface{}
|
var info interface{}
|
||||||
v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
|
info, cmd.err = cn.Rd.ReadArrayReply(xPendingParser)
|
||||||
if cmd.err != nil {
|
if cmd.err != nil {
|
||||||
return cmd.err
|
return cmd.err
|
||||||
}
|
}
|
||||||
cmd.val = v.([]*XMessage)
|
cmd.val = info.(*XPending)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
if n != 4 {
|
||||||
msgs := make([]*XMessage, n)
|
return nil, fmt.Errorf("got %d, wanted 4", n)
|
||||||
for i := int64(0); i < n; i++ {
|
}
|
||||||
v, err := rd.ReadArrayReply(xMessageParser)
|
|
||||||
|
count, err := rd.ReadIntReply()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
msgs[i] = v.(*XMessage)
|
|
||||||
|
lower, err := rd.ReadStringReply()
|
||||||
|
if err != nil && err != Nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return msgs, nil
|
|
||||||
|
higher, err := rd.ReadStringReply()
|
||||||
|
if err != nil && err != Nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pending := &XPending{
|
||||||
|
Count: count,
|
||||||
|
Lower: lower,
|
||||||
|
Higher: higher,
|
||||||
|
}
|
||||||
|
_, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
_, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 2 {
|
||||||
|
return nil, fmt.Errorf("got %d, wanted 2", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
consumerName, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
consumerPendingStr, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
consumerPending, err := strconv.ParseInt(consumerPendingStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if pending.Consumers == nil {
|
||||||
|
pending.Consumers = make(map[string]int64)
|
||||||
|
}
|
||||||
|
pending.Consumers[consumerName] = consumerPending
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil && err != Nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return pending, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
//------------------------------------------------------------------------------
|
||||||
func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
|
type XPendingExt struct {
|
||||||
|
Id string
|
||||||
|
Consumer string
|
||||||
|
Idle time.Duration
|
||||||
|
RetryCount int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type XPendingExtCmd struct {
|
||||||
|
baseCmd
|
||||||
|
val []XPendingExt
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*XPendingExtCmd)(nil)
|
||||||
|
|
||||||
|
func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd {
|
||||||
|
return &XPendingExtCmd{
|
||||||
|
baseCmd: baseCmd{_args: args},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XPendingExtCmd) Val() []XPendingExt {
|
||||||
|
return cmd.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) {
|
||||||
|
return cmd.val, cmd.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XPendingExtCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XPendingExtCmd) readReply(cn *pool.Conn) error {
|
||||||
|
var info interface{}
|
||||||
|
info, cmd.err = cn.Rd.ReadArrayReply(xPendingExtSliceParser)
|
||||||
|
if cmd.err != nil {
|
||||||
|
return cmd.err
|
||||||
|
}
|
||||||
|
cmd.val = info.([]XPendingExt)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
ret := make([]XPendingExt, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 4 {
|
||||||
|
return nil, fmt.Errorf("got %d, wanted 4", n)
|
||||||
|
}
|
||||||
|
|
||||||
id, err := rd.ReadStringReply()
|
id, err := rd.ReadStringReply()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := rd.ReadArrayReply(xKeyValueParser)
|
consumer, err := rd.ReadStringReply()
|
||||||
if err != nil {
|
if err != nil && err != Nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &XMessage{
|
idle, err := rd.ReadIntReply()
|
||||||
ID: id,
|
if err != nil && err != Nil {
|
||||||
Values: v.(map[string]interface{}),
|
return nil, err
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
retryCount, err := rd.ReadIntReply()
|
||||||
|
if err != nil && err != Nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = append(ret, XPendingExt{
|
||||||
|
Id: id,
|
||||||
|
Consumer: consumer,
|
||||||
|
Idle: time.Duration(idle) * time.Millisecond,
|
||||||
|
RetryCount: retryCount,
|
||||||
|
})
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
//------------------------------------------------------------------------------
|
||||||
func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
values := make(map[string]interface{}, n)
|
|
||||||
for i := int64(0); i < n; i += 2 {
|
|
||||||
key, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
value, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
values[key] = value
|
|
||||||
}
|
|
||||||
return values, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -923,6 +1243,27 @@ func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
zz := make([]Z, n/2)
|
||||||
|
for i := int64(0); i < n; i += 2 {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
z := &zz[i/2]
|
||||||
|
|
||||||
|
z.Member, err = rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
z.Score, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return zz, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type ScanCmd struct {
|
type ScanCmd struct {
|
||||||
|
@ -1016,6 +1357,69 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
slots := make([]ClusterSlot, n)
|
||||||
|
for i := 0; i < len(slots); i++ {
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if n < 2 {
|
||||||
|
err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
start, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
end, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := make([]ClusterNode, n-2)
|
||||||
|
for j := 0; j < len(nodes); j++ {
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if n != 2 && n != 3 {
|
||||||
|
err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ip, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
port, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
|
||||||
|
|
||||||
|
if n == 3 {
|
||||||
|
id, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nodes[j].Id = id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
slots[i] = ClusterSlot{
|
||||||
|
Start: int(start),
|
||||||
|
End: int(end),
|
||||||
|
Nodes: nodes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return slots, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// GeoLocation is used with GeoAdd to add geospatial location.
|
// GeoLocation is used with GeoAdd to add geospatial location.
|
||||||
|
@ -1107,6 +1511,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
|
||||||
|
return func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
var loc GeoLocation
|
||||||
|
var err error
|
||||||
|
|
||||||
|
loc.Name, err = rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if q.WithDist {
|
||||||
|
loc.Dist, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if q.WithGeoHash {
|
||||||
|
loc.GeoHash, err = rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if q.WithCoord {
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if n != 2 {
|
||||||
|
return nil, fmt.Errorf("got %d coordinates, expected 2", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
loc.Longitude, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
loc.Latitude, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &loc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
|
||||||
|
return func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
locs := make([]GeoLocation, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
v, err := rd.ReadReply(newGeoLocationParser(q))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch vv := v.(type) {
|
||||||
|
case []byte:
|
||||||
|
locs = append(locs, GeoLocation{
|
||||||
|
Name: string(vv),
|
||||||
|
})
|
||||||
|
case *GeoLocation:
|
||||||
|
locs = append(locs, *vv)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return locs, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type GeoPos struct {
|
type GeoPos struct {
|
||||||
|
@ -1149,6 +1620,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
positions := make([]*GeoPos, 0, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
v, err := rd.ReadReply(geoPosParser)
|
||||||
|
if err != nil {
|
||||||
|
if err == Nil {
|
||||||
|
positions = append(positions, nil)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch v := v.(type) {
|
||||||
|
case *GeoPos:
|
||||||
|
positions = append(positions, v)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("got %T, expected *GeoPos", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return positions, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
var pos GeoPos
|
||||||
|
var err error
|
||||||
|
|
||||||
|
pos.Longitude, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pos.Latitude, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pos, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type CommandInfo struct {
|
type CommandInfo struct {
|
||||||
|
@ -1197,6 +1706,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
m := make(map[string]*CommandInfo, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
v, err := rd.ReadReply(commandInfoParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
vv := v.(*CommandInfo)
|
||||||
|
m[vv.Name] = vv
|
||||||
|
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
var cmd CommandInfo
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if n != 6 {
|
||||||
|
return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.Name, err = rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
arity, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cmd.Arity = int8(arity)
|
||||||
|
|
||||||
|
flags, err := rd.ReadReply(stringSliceParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cmd.Flags = flags.([]string)
|
||||||
|
|
||||||
|
firstKeyPos, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cmd.FirstKeyPos = int8(firstKeyPos)
|
||||||
|
|
||||||
|
lastKeyPos, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cmd.LastKeyPos = int8(lastKeyPos)
|
||||||
|
|
||||||
|
stepCount, err := rd.ReadIntReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cmd.StepCount = int8(stepCount)
|
||||||
|
|
||||||
|
for _, flag := range cmd.Flags {
|
||||||
|
if flag == "readonly" {
|
||||||
|
cmd.ReadOnly = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cmd, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type cmdsInfoCache struct {
|
type cmdsInfoCache struct {
|
||||||
|
|
242
commands.go
242
commands.go
|
@ -172,16 +172,26 @@ type Cmdable interface {
|
||||||
SRem(key string, members ...interface{}) *IntCmd
|
SRem(key string, members ...interface{}) *IntCmd
|
||||||
SUnion(keys ...string) *StringSliceCmd
|
SUnion(keys ...string) *StringSliceCmd
|
||||||
SUnionStore(destination string, keys ...string) *IntCmd
|
SUnionStore(destination string, keys ...string) *IntCmd
|
||||||
XAdd(stream, id string, els map[string]interface{}) *StringCmd
|
XAdd(a *XAddArgs) *StringCmd
|
||||||
XAddExt(opt *XAddExt) *StringCmd
|
XLen(stream string) *IntCmd
|
||||||
XLen(key string) *IntCmd
|
|
||||||
XRange(stream, start, stop string) *XMessageSliceCmd
|
XRange(stream, start, stop string) *XMessageSliceCmd
|
||||||
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
|
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
|
||||||
XRevRange(stream string, start, stop string) *XMessageSliceCmd
|
XRevRange(stream string, start, stop string) *XMessageSliceCmd
|
||||||
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
|
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
|
||||||
XRead(streams ...string) *XStreamSliceCmd
|
XRead(a *XReadArgs) *XStreamSliceCmd
|
||||||
XReadN(count int64, streams ...string) *XStreamSliceCmd
|
XReadStreams(streams ...string) *XStreamSliceCmd
|
||||||
XReadExt(opt *XReadExt) *XStreamSliceCmd
|
XGroupCreate(stream, group, start string) *StatusCmd
|
||||||
|
XGroupSetID(stream, group, start string) *StatusCmd
|
||||||
|
XGroupDestroy(stream, group string) *IntCmd
|
||||||
|
XGroupDelConsumer(stream, group, consumer string) *IntCmd
|
||||||
|
XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd
|
||||||
|
XAck(stream, group string, ids ...string) *IntCmd
|
||||||
|
XPending(stream, group string) *XPendingCmd
|
||||||
|
XPendingExt(a *XPendingExtArgs) *XPendingExtCmd
|
||||||
|
XClaim(a *XClaimArgs) *XMessageSliceCmd
|
||||||
|
XClaimJustID(a *XClaimArgs) *StringSliceCmd
|
||||||
|
XTrim(key string, maxLen int64) *IntCmd
|
||||||
|
XTrimApprox(key string, maxLen int64) *IntCmd
|
||||||
ZAdd(key string, members ...Z) *IntCmd
|
ZAdd(key string, members ...Z) *IntCmd
|
||||||
ZAddNX(key string, members ...Z) *IntCmd
|
ZAddNX(key string, members ...Z) *IntCmd
|
||||||
ZAddXX(key string, members ...Z) *IntCmd
|
ZAddXX(key string, members ...Z) *IntCmd
|
||||||
|
@ -1300,7 +1310,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type XAddExt struct {
|
type XAddArgs struct {
|
||||||
Stream string
|
Stream string
|
||||||
MaxLen int64 // MAXLEN N
|
MaxLen int64 // MAXLEN N
|
||||||
MaxLenApprox int64 // MAXLEN ~ N
|
MaxLenApprox int64 // MAXLEN ~ N
|
||||||
|
@ -1308,40 +1318,32 @@ type XAddExt struct {
|
||||||
Values map[string]interface{}
|
Values map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
|
func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
|
||||||
a := make([]interface{}, 0, 6+len(opt.Values)*2)
|
args := make([]interface{}, 0, 6+len(a.Values)*2)
|
||||||
a = append(a, "xadd")
|
args = append(args, "xadd")
|
||||||
a = append(a, opt.Stream)
|
args = append(args, a.Stream)
|
||||||
if opt.MaxLen > 0 {
|
if a.MaxLen > 0 {
|
||||||
a = append(a, "maxlen", opt.MaxLen)
|
args = append(args, "maxlen", a.MaxLen)
|
||||||
} else if opt.MaxLenApprox > 0 {
|
} else if a.MaxLenApprox > 0 {
|
||||||
a = append(a, "maxlen", "~", opt.MaxLenApprox)
|
args = append(args, "maxlen", "~", a.MaxLenApprox)
|
||||||
}
|
}
|
||||||
if opt.ID != "" {
|
if a.ID != "" {
|
||||||
a = append(a, opt.ID)
|
args = append(args, a.ID)
|
||||||
} else {
|
} else {
|
||||||
a = append(a, "*")
|
args = append(args, "*")
|
||||||
}
|
}
|
||||||
for k, v := range opt.Values {
|
for k, v := range a.Values {
|
||||||
a = append(a, k)
|
args = append(args, k)
|
||||||
a = append(a, v)
|
args = append(args, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := NewStringCmd(a...)
|
cmd := NewStringCmd(args...)
|
||||||
c.process(cmd)
|
c.process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
|
func (c *cmdable) XLen(stream string) *IntCmd {
|
||||||
return c.XAddExt(&XAddExt{
|
cmd := NewIntCmd("xlen", stream)
|
||||||
Stream: stream,
|
|
||||||
ID: id,
|
|
||||||
Values: values,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cmdable) XLen(key string) *IntCmd {
|
|
||||||
cmd := NewIntCmd("xlen", key)
|
|
||||||
c.process(cmd)
|
c.process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
@ -1370,55 +1372,173 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
type XReadExt struct {
|
type XReadArgs struct {
|
||||||
Streams []string
|
Streams []string
|
||||||
Count int64
|
Count int64
|
||||||
Block time.Duration
|
Block time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
|
func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
|
||||||
a := make([]interface{}, 0, 5+len(opt.Streams))
|
args := make([]interface{}, 0, 5+len(a.Streams))
|
||||||
a = append(a, "xread")
|
args = append(args, "xread")
|
||||||
if opt != nil {
|
if a.Count > 0 {
|
||||||
if opt.Count > 0 {
|
args = append(args, "count")
|
||||||
a = append(a, "count")
|
args = append(args, a.Count)
|
||||||
a = append(a, opt.Count)
|
|
||||||
}
|
}
|
||||||
if opt.Block >= 0 {
|
if a.Block >= 0 {
|
||||||
a = append(a, "block")
|
args = append(args, "block")
|
||||||
a = append(a, int64(opt.Block/time.Millisecond))
|
args = append(args, int64(a.Block/time.Millisecond))
|
||||||
}
|
}
|
||||||
}
|
args = append(args, "streams")
|
||||||
a = append(a, "streams")
|
for _, s := range a.Streams {
|
||||||
for _, s := range opt.Streams {
|
args = append(args, s)
|
||||||
a = append(a, s)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := NewXStreamSliceCmd(a...)
|
cmd := NewXStreamSliceCmd(args...)
|
||||||
c.process(cmd)
|
c.process(cmd)
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
|
func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd {
|
||||||
return c.XReadExt(&XReadExt{
|
return c.XRead(&XReadArgs{
|
||||||
Streams: streams,
|
Streams: streams,
|
||||||
Block: -1,
|
Block: -1,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
|
func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
|
||||||
return c.XReadExt(&XReadExt{
|
cmd := NewStatusCmd("xgroup", "create", stream, group, start)
|
||||||
Streams: streams,
|
c.process(cmd)
|
||||||
Count: count,
|
return cmd
|
||||||
Block: -1,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
|
func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
|
||||||
return c.XReadExt(&XReadExt{
|
cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
|
||||||
Streams: streams,
|
c.process(cmd)
|
||||||
Block: block,
|
return cmd
|
||||||
})
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd {
|
||||||
|
cmd := NewIntCmd("xgroup", "destroy", stream, group)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
|
||||||
|
cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
type XReadGroupArgs struct {
|
||||||
|
Group string
|
||||||
|
Consumer string
|
||||||
|
Streams []string
|
||||||
|
Count int64
|
||||||
|
Block time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
|
||||||
|
args := make([]interface{}, 0, 8+len(a.Streams))
|
||||||
|
args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
|
||||||
|
if a.Count > 0 {
|
||||||
|
args = append(args, "count", a.Count)
|
||||||
|
}
|
||||||
|
if a.Block >= 0 {
|
||||||
|
args = append(args, "block", int64(a.Block/time.Millisecond))
|
||||||
|
}
|
||||||
|
args = append(args, "streams")
|
||||||
|
for _, s := range a.Streams {
|
||||||
|
args = append(args, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := NewXStreamSliceCmd(args...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd {
|
||||||
|
args := []interface{}{"xack", stream, group}
|
||||||
|
for _, id := range ids {
|
||||||
|
args = append(args, id)
|
||||||
|
}
|
||||||
|
cmd := NewIntCmd(args...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XPending(stream, group string) *XPendingCmd {
|
||||||
|
cmd := NewXPendingCmd("xpending", stream, group)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
type XPendingExtArgs struct {
|
||||||
|
Stream string
|
||||||
|
Group string
|
||||||
|
Start string
|
||||||
|
End string
|
||||||
|
Count int64
|
||||||
|
Consumer string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd {
|
||||||
|
args := make([]interface{}, 0, 7)
|
||||||
|
args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count)
|
||||||
|
if a.Consumer != "" {
|
||||||
|
args = append(args, a.Consumer)
|
||||||
|
}
|
||||||
|
cmd := NewXPendingExtCmd(args...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
type XClaimArgs struct {
|
||||||
|
Stream string
|
||||||
|
Group string
|
||||||
|
Consumer string
|
||||||
|
MinIdle time.Duration
|
||||||
|
Messages []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd {
|
||||||
|
args := xClaimArgs(a)
|
||||||
|
cmd := NewXMessageSliceCmd(args...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd {
|
||||||
|
args := xClaimArgs(a)
|
||||||
|
args = append(args, "justid")
|
||||||
|
cmd := NewStringSliceCmd(args...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func xClaimArgs(a *XClaimArgs) []interface{} {
|
||||||
|
args := make([]interface{}, 0, 4+len(a.Messages))
|
||||||
|
args = append(args,
|
||||||
|
"xclaim",
|
||||||
|
a.Stream,
|
||||||
|
a.Group, a.Consumer,
|
||||||
|
int64(a.MinIdle/time.Millisecond))
|
||||||
|
for _, id := range a.Messages {
|
||||||
|
args = append(args, id)
|
||||||
|
}
|
||||||
|
return args
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd {
|
||||||
|
cmd := NewIntCmd("xtrim", key, "maxlen", maxLen)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd {
|
||||||
|
cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
206
commands_test.go
206
commands_test.go
|
@ -3042,37 +3042,54 @@ var _ = Describe("Commands", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("streams", func() {
|
Describe("streams", func() {
|
||||||
createStream := func() {
|
BeforeEach(func() {
|
||||||
id, err := client.XAdd("stream", "1-0", map[string]interface{}{
|
id, err := client.XAdd(&redis.XAddArgs{
|
||||||
"uno": "un",
|
Stream: "stream",
|
||||||
|
ID: "1-0",
|
||||||
|
Values: map[string]interface{}{"uno": "un"},
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(id).To(Equal("1-0"))
|
Expect(id).To(Equal("1-0"))
|
||||||
|
|
||||||
id, err = client.XAdd("stream", "2-0", map[string]interface{}{
|
id, err = client.XAdd(&redis.XAddArgs{
|
||||||
"dos": "deux",
|
Stream: "stream",
|
||||||
|
ID: "2-0",
|
||||||
|
Values: map[string]interface{}{"dos": "deux"},
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(id).To(Equal("2-0"))
|
Expect(id).To(Equal("2-0"))
|
||||||
|
|
||||||
id, err = client.XAdd("stream", "3-0", map[string]interface{}{
|
id, err = client.XAdd(&redis.XAddArgs{
|
||||||
"tres": "troix",
|
Stream: "stream",
|
||||||
|
ID: "3-0",
|
||||||
|
Values: map[string]interface{}{"tres": "troix"},
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(id).To(Equal("3-0"))
|
Expect(id).To(Equal("3-0"))
|
||||||
}
|
})
|
||||||
|
|
||||||
|
It("should XTrim", func() {
|
||||||
|
n, err := client.XTrim("stream", 0).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(3)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XTrimApprox", func() {
|
||||||
|
n, err := client.XTrimApprox("stream", 0).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(3)))
|
||||||
|
})
|
||||||
|
|
||||||
It("should XAdd", func() {
|
It("should XAdd", func() {
|
||||||
createStream()
|
id, err := client.XAdd(&redis.XAddArgs{
|
||||||
|
Stream: "stream",
|
||||||
id, err := client.XAdd("stream", "*", map[string]interface{}{
|
Values: map[string]interface{}{"quatro": "quatre"},
|
||||||
"quatro": "quatre",
|
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
vals, err := client.XRange("stream", "-", "+").Result()
|
vals, err := client.XRange("stream", "-", "+").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(vals).To(Equal([]*redis.XMessage{
|
Expect(vals).To(Equal([]redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
@ -3080,10 +3097,8 @@ var _ = Describe("Commands", func() {
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XAddExt", func() {
|
It("should XAdd with MaxLen", func() {
|
||||||
createStream()
|
id, err := client.XAdd(&redis.XAddArgs{
|
||||||
|
|
||||||
id, err := client.XAddExt(&redis.XAddExt{
|
|
||||||
Stream: "stream",
|
Stream: "stream",
|
||||||
MaxLen: 1,
|
MaxLen: 1,
|
||||||
Values: map[string]interface{}{"quatro": "quatre"},
|
Values: map[string]interface{}{"quatro": "quatre"},
|
||||||
|
@ -3092,25 +3107,21 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
vals, err := client.XRange("stream", "-", "+").Result()
|
vals, err := client.XRange("stream", "-", "+").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(vals).To(Equal([]*redis.XMessage{
|
Expect(vals).To(Equal([]redis.XMessage{
|
||||||
{ID: id, Values: map[string]interface{}{"quatro": "quatre"}},
|
{ID: id, Values: map[string]interface{}{"quatro": "quatre"}},
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XLen", func() {
|
It("should XLen", func() {
|
||||||
createStream()
|
|
||||||
|
|
||||||
n, err := client.XLen("stream").Result()
|
n, err := client.XLen("stream").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(n).To(Equal(int64(3)))
|
Expect(n).To(Equal(int64(3)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XRange", func() {
|
It("should XRange", func() {
|
||||||
createStream()
|
|
||||||
|
|
||||||
msgs, err := client.XRange("stream", "-", "+").Result()
|
msgs, err := client.XRange("stream", "-", "+").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
@ -3118,48 +3129,44 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
msgs, err = client.XRange("stream", "2", "+").Result()
|
msgs, err = client.XRange("stream", "2", "+").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
msgs, err = client.XRange("stream", "-", "2").Result()
|
msgs, err = client.XRange("stream", "-", "2").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XRangeN", func() {
|
It("should XRangeN", func() {
|
||||||
createStream()
|
|
||||||
|
|
||||||
msgs, err := client.XRangeN("stream", "-", "+", 2).Result()
|
msgs, err := client.XRangeN("stream", "-", "+", 2).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
msgs, err = client.XRangeN("stream", "2", "+", 1).Result()
|
msgs, err = client.XRangeN("stream", "2", "+", 1).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
msgs, err = client.XRangeN("stream", "-", "2", 1).Result()
|
msgs, err = client.XRangeN("stream", "-", "2", 1).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XRevRange", func() {
|
It("should XRevRange", func() {
|
||||||
createStream()
|
|
||||||
|
|
||||||
msgs, err := client.XRevRange("stream", "+", "-").Result()
|
msgs, err := client.XRevRange("stream", "+", "-").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
@ -3167,82 +3174,171 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
msgs, err = client.XRevRange("stream", "+", "2").Result()
|
msgs, err = client.XRevRange("stream", "+", "2").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XRevRangeN", func() {
|
It("should XRevRangeN", func() {
|
||||||
createStream()
|
|
||||||
|
|
||||||
msgs, err := client.XRevRangeN("stream", "+", "-", 2).Result()
|
msgs, err := client.XRevRangeN("stream", "+", "-", 2).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
msgs, err = client.XRevRangeN("stream", "+", "2", 1).Result()
|
msgs, err = client.XRevRangeN("stream", "+", "2", 1).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(msgs).To(Equal([]*redis.XMessage{
|
Expect(msgs).To(Equal([]redis.XMessage{
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XRead", func() {
|
It("should XRead", func() {
|
||||||
createStream()
|
res, err := client.XReadStreams("stream", "0").Result()
|
||||||
|
|
||||||
res, err := client.XRead("stream", "0").Result()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(res).To(Equal([]*redis.XStream{{
|
Expect(res).To(Equal([]redis.XStream{{
|
||||||
Stream: "stream",
|
Stream: "stream",
|
||||||
Messages: []*redis.XMessage{
|
Messages: []redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
}},
|
}},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
_, err = client.XRead("stream", "3").Result()
|
_, err = client.XReadStreams("stream", "3").Result()
|
||||||
Expect(err).To(Equal(redis.Nil))
|
Expect(err).To(Equal(redis.Nil))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should XReadExt", func() {
|
It("should XRead", func() {
|
||||||
createStream()
|
res, err := client.XRead(&redis.XReadArgs{
|
||||||
|
|
||||||
res, err := client.XReadExt(&redis.XReadExt{
|
|
||||||
Streams: []string{"stream", "0"},
|
Streams: []string{"stream", "0"},
|
||||||
Count: 2,
|
Count: 2,
|
||||||
Block: 100 * time.Millisecond,
|
Block: 100 * time.Millisecond,
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(res).To(Equal([]*redis.XStream{{
|
Expect(res).To(Equal([]redis.XStream{{
|
||||||
Stream: "stream",
|
Stream: "stream",
|
||||||
Messages: []*redis.XMessage{
|
Messages: []redis.XMessage{
|
||||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
}},
|
}},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
_, err = client.XReadExt(&redis.XReadExt{
|
_, err = client.XRead(&redis.XReadArgs{
|
||||||
Streams: []string{"stream", "3"},
|
Streams: []string{"stream", "3"},
|
||||||
Count: 1,
|
Count: 1,
|
||||||
Block: 100 * time.Millisecond,
|
Block: 100 * time.Millisecond,
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).To(Equal(redis.Nil))
|
Expect(err).To(Equal(redis.Nil))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("group", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
err := client.XGroupCreate("stream", "group", "0").Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
res, err := client.XReadGroup(&redis.XReadGroupArgs{
|
||||||
|
Group: "group",
|
||||||
|
Consumer: "consumer",
|
||||||
|
Streams: []string{"stream", "0"},
|
||||||
|
}).Result()
|
||||||
|
Expect(res).To(Equal([]redis.XStream{{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
n, err := client.XGroupDestroy("stream", "group").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(1)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XPending", func() {
|
||||||
|
info, err := client.XPending("stream", "group").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(info).To(Equal(&redis.XPending{
|
||||||
|
Count: 3,
|
||||||
|
Lower: "1-0",
|
||||||
|
Higher: "3-0",
|
||||||
|
Consumers: map[string]int64{"consumer": 3},
|
||||||
|
}))
|
||||||
|
|
||||||
|
infoExt, err := client.XPendingExt(&redis.XPendingExtArgs{
|
||||||
|
Stream: "stream",
|
||||||
|
Group: "group",
|
||||||
|
Start: "-",
|
||||||
|
End: "+",
|
||||||
|
Count: 10,
|
||||||
|
Consumer: "consumer",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
for i := range infoExt {
|
||||||
|
infoExt[i].Idle = 0
|
||||||
|
}
|
||||||
|
Expect(infoExt).To(Equal([]redis.XPendingExt{
|
||||||
|
{Id: "1-0", Consumer: "consumer", Idle: 0, RetryCount: 1},
|
||||||
|
{Id: "2-0", Consumer: "consumer", Idle: 0, RetryCount: 1},
|
||||||
|
{Id: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1},
|
||||||
|
}))
|
||||||
|
|
||||||
|
n, err := client.XGroupDelConsumer("stream", "group", "consumer").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(3)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XClaim", func() {
|
||||||
|
msgs, err := client.XClaim(&redis.XClaimArgs{
|
||||||
|
Stream: "stream",
|
||||||
|
Group: "group",
|
||||||
|
Consumer: "consumer",
|
||||||
|
Messages: []string{"1-0", "2-0", "3-0"},
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]redis.XMessage{{
|
||||||
|
ID: "1-0",
|
||||||
|
Values: map[string]interface{}{"uno": "un"},
|
||||||
|
}, {
|
||||||
|
ID: "2-0",
|
||||||
|
Values: map[string]interface{}{"dos": "deux"},
|
||||||
|
}, {
|
||||||
|
ID: "3-0",
|
||||||
|
Values: map[string]interface{}{"tres": "troix"},
|
||||||
|
}}))
|
||||||
|
|
||||||
|
ids, err := client.XClaimJustID(&redis.XClaimArgs{
|
||||||
|
Stream: "stream",
|
||||||
|
Group: "group",
|
||||||
|
Consumer: "consumer",
|
||||||
|
Messages: []string{"1-0", "2-0", "3-0"},
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(ids).To(Equal([]string{"1-0", "2-0", "3-0"}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XAck", func() {
|
||||||
|
n, err := client.XAck("stream", "group", "1-0", "2-0", "4-0").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(2)))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("Geo add and radius search", func() {
|
Describe("Geo add and radius search", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
geoAdd := client.GeoAdd(
|
n, err := client.GeoAdd(
|
||||||
"Sicily",
|
"Sicily",
|
||||||
&redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"},
|
&redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"},
|
||||||
&redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"},
|
&redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"},
|
||||||
)
|
).Result()
|
||||||
Expect(geoAdd.Err()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(geoAdd.Val()).To(Equal(int64(2)))
|
Expect(n).To(Equal(int64(2)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should not add same geo location", func() {
|
It("should not add same geo location", func() {
|
||||||
|
|
394
parser.go
394
parser.go
|
@ -1,394 +0,0 @@
|
||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/go-redis/redis/internal/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
vals := make([]interface{}, 0, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
v, err := rd.ReadReply(sliceParser)
|
|
||||||
if err != nil {
|
|
||||||
if err == Nil {
|
|
||||||
vals = append(vals, nil)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err, ok := err.(proto.RedisError); ok {
|
|
||||||
vals = append(vals, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch v := v.(type) {
|
|
||||||
case []byte:
|
|
||||||
vals = append(vals, string(v))
|
|
||||||
default:
|
|
||||||
vals = append(vals, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return vals, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
bools := make([]bool, 0, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
n, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
bools = append(bools, n == 1)
|
|
||||||
}
|
|
||||||
return bools, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
ss := make([]string, 0, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
s, err := rd.ReadStringReply()
|
|
||||||
if err == Nil {
|
|
||||||
ss = append(ss, "")
|
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
|
||||||
ss = append(ss, s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ss, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
m := make(map[string]string, n/2)
|
|
||||||
for i := int64(0); i < n; i += 2 {
|
|
||||||
key, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
value, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
m[key] = value
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
m := make(map[string]int64, n/2)
|
|
||||||
for i := int64(0); i < n; i += 2 {
|
|
||||||
key, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
m[key] = n
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
m := make(map[string]struct{}, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
key, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
m[key] = struct{}{}
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
zz := make([]Z, n/2)
|
|
||||||
for i := int64(0); i < n; i += 2 {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
z := &zz[i/2]
|
|
||||||
|
|
||||||
z.Member, err = rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
z.Score, err = rd.ReadFloatReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return zz, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
slots := make([]ClusterSlot, n)
|
|
||||||
for i := 0; i < len(slots); i++ {
|
|
||||||
n, err := rd.ReadArrayLen()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if n < 2 {
|
|
||||||
err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
start, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
end, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes := make([]ClusterNode, n-2)
|
|
||||||
for j := 0; j < len(nodes); j++ {
|
|
||||||
n, err := rd.ReadArrayLen()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if n != 2 && n != 3 {
|
|
||||||
err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ip, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
port, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
|
|
||||||
|
|
||||||
if n == 3 {
|
|
||||||
id, err := rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
nodes[j].Id = id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
slots[i] = ClusterSlot{
|
|
||||||
Start: int(start),
|
|
||||||
End: int(end),
|
|
||||||
Nodes: nodes,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return slots, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
|
|
||||||
return func(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
var loc GeoLocation
|
|
||||||
var err error
|
|
||||||
|
|
||||||
loc.Name, err = rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if q.WithDist {
|
|
||||||
loc.Dist, err = rd.ReadFloatReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if q.WithGeoHash {
|
|
||||||
loc.GeoHash, err = rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if q.WithCoord {
|
|
||||||
n, err := rd.ReadArrayLen()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if n != 2 {
|
|
||||||
return nil, fmt.Errorf("got %d coordinates, expected 2", n)
|
|
||||||
}
|
|
||||||
|
|
||||||
loc.Longitude, err = rd.ReadFloatReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
loc.Latitude, err = rd.ReadFloatReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &loc, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
|
|
||||||
return func(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
locs := make([]GeoLocation, 0, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
v, err := rd.ReadReply(newGeoLocationParser(q))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
switch vv := v.(type) {
|
|
||||||
case []byte:
|
|
||||||
locs = append(locs, GeoLocation{
|
|
||||||
Name: string(vv),
|
|
||||||
})
|
|
||||||
case *GeoLocation:
|
|
||||||
locs = append(locs, *vv)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return locs, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
var pos GeoPos
|
|
||||||
var err error
|
|
||||||
|
|
||||||
pos.Longitude, err = rd.ReadFloatReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pos.Latitude, err = rd.ReadFloatReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pos, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
positions := make([]*GeoPos, 0, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
v, err := rd.ReadReply(geoPosParser)
|
|
||||||
if err != nil {
|
|
||||||
if err == Nil {
|
|
||||||
positions = append(positions, nil)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
switch v := v.(type) {
|
|
||||||
case *GeoPos:
|
|
||||||
positions = append(positions, v)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("got %T, expected *GeoPos", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return positions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
var cmd CommandInfo
|
|
||||||
var err error
|
|
||||||
|
|
||||||
if n != 6 {
|
|
||||||
return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd.Name, err = rd.ReadStringReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
arity, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cmd.Arity = int8(arity)
|
|
||||||
|
|
||||||
flags, err := rd.ReadReply(stringSliceParser)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cmd.Flags = flags.([]string)
|
|
||||||
|
|
||||||
firstKeyPos, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cmd.FirstKeyPos = int8(firstKeyPos)
|
|
||||||
|
|
||||||
lastKeyPos, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cmd.LastKeyPos = int8(lastKeyPos)
|
|
||||||
|
|
||||||
stepCount, err := rd.ReadIntReply()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cmd.StepCount = int8(stepCount)
|
|
||||||
|
|
||||||
for _, flag := range cmd.Flags {
|
|
||||||
if flag == "readonly" {
|
|
||||||
cmd.ReadOnly = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &cmd, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
m := make(map[string]*CommandInfo, n)
|
|
||||||
for i := int64(0); i < n; i++ {
|
|
||||||
v, err := rd.ReadReply(commandInfoParser)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
vv := v.(*CommandInfo)
|
|
||||||
m[vv.Name] = vv
|
|
||||||
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements proto.MultiBulkParse
|
|
||||||
func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
if n != 2 {
|
|
||||||
return nil, fmt.Errorf("got %d elements, expected 2", n)
|
|
||||||
}
|
|
||||||
|
|
||||||
sec, err := rd.ReadInt()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
microsec, err := rd.ReadInt()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return time.Unix(sec, microsec*1000), nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue