mirror of https://github.com/go-redis/redis.git
Merge pull request #795 from go-redis/feature/streams
Add basic redis streams support
This commit is contained in:
commit
8a90ef696e
173
command.go
173
command.go
|
@ -714,6 +714,179 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XStream struct {
|
||||||
|
Stream string
|
||||||
|
Messages []*XMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
type XMessage struct {
|
||||||
|
ID string
|
||||||
|
Values map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XStreamSliceCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
val []*XStream
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*XStreamSliceCmd)(nil)
|
||||||
|
|
||||||
|
func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd {
|
||||||
|
return &XStreamSliceCmd{
|
||||||
|
baseCmd: baseCmd{_args: args},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XStreamSliceCmd) Val() []*XStream {
|
||||||
|
return cmd.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) {
|
||||||
|
return cmd.val, cmd.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XStreamSliceCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
|
var v interface{}
|
||||||
|
v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser)
|
||||||
|
if cmd.err != nil {
|
||||||
|
return cmd.err
|
||||||
|
}
|
||||||
|
cmd.val = v.([]*XStream)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
xx := make([]*XStream, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
v, err := rd.ReadArrayReply(xStreamParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
xx[i] = v.(*XStream)
|
||||||
|
}
|
||||||
|
return xx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 2 {
|
||||||
|
return nil, fmt.Errorf("got %d, wanted 2", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := rd.ReadArrayReply(xMessageSliceParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &XStream{
|
||||||
|
Stream: stream,
|
||||||
|
Messages: v.([]*XMessage),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XMessageSliceCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
val []*XMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*XMessageSliceCmd)(nil)
|
||||||
|
|
||||||
|
func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
|
||||||
|
return &XMessageSliceCmd{
|
||||||
|
baseCmd: baseCmd{_args: args},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) Val() []*XMessage {
|
||||||
|
return cmd.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) {
|
||||||
|
return cmd.val, cmd.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
|
||||||
|
var v interface{}
|
||||||
|
v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
|
||||||
|
if cmd.err != nil {
|
||||||
|
return cmd.err
|
||||||
|
}
|
||||||
|
cmd.val = v.([]*XMessage)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
msgs := make([]*XMessage, n)
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
v, err := rd.ReadArrayReply(xMessageParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
msgs[i] = v.(*XMessage)
|
||||||
|
}
|
||||||
|
return msgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
id, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := rd.ReadArrayReply(xKeyValueParser)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &XMessage{
|
||||||
|
ID: id,
|
||||||
|
Values: v.(map[string]interface{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
values := make(map[string]interface{}, n)
|
||||||
|
for i := int64(0); i < n; i += 2 {
|
||||||
|
key, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := rd.ReadStringReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
values[key] = value
|
||||||
|
}
|
||||||
|
return values, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type ZSliceCmd struct {
|
type ZSliceCmd struct {
|
||||||
baseCmd
|
baseCmd
|
||||||
|
|
||||||
|
|
131
commands.go
131
commands.go
|
@ -171,6 +171,16 @@ type Cmdable interface {
|
||||||
SRem(key string, members ...interface{}) *IntCmd
|
SRem(key string, members ...interface{}) *IntCmd
|
||||||
SUnion(keys ...string) *StringSliceCmd
|
SUnion(keys ...string) *StringSliceCmd
|
||||||
SUnionStore(destination string, keys ...string) *IntCmd
|
SUnionStore(destination string, keys ...string) *IntCmd
|
||||||
|
XAdd(stream, id string, els map[string]interface{}) *StringCmd
|
||||||
|
XAddExt(opt *XAddExt) *StringCmd
|
||||||
|
XLen(key string) *IntCmd
|
||||||
|
XRange(stream, start, stop string) *XMessageSliceCmd
|
||||||
|
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
|
||||||
|
XRevRange(stream string, start, stop string) *XMessageSliceCmd
|
||||||
|
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
|
||||||
|
XRead(streams ...string) *XStreamSliceCmd
|
||||||
|
XReadN(count int64, streams ...string) *XStreamSliceCmd
|
||||||
|
XReadExt(opt *XReadExt) *XStreamSliceCmd
|
||||||
ZAdd(key string, members ...Z) *IntCmd
|
ZAdd(key string, members ...Z) *IntCmd
|
||||||
ZAddNX(key string, members ...Z) *IntCmd
|
ZAddNX(key string, members ...Z) *IntCmd
|
||||||
ZAddXX(key string, members ...Z) *IntCmd
|
ZAddXX(key string, members ...Z) *IntCmd
|
||||||
|
@ -1282,6 +1292,127 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type XAddExt struct {
|
||||||
|
Stream string
|
||||||
|
MaxLen int64 // MAXLEN N
|
||||||
|
MaxLenApprox int64 // MAXLEN ~ N
|
||||||
|
ID string
|
||||||
|
Values map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
|
||||||
|
a := make([]interface{}, 0, 6+len(opt.Values)*2)
|
||||||
|
a = append(a, "xadd")
|
||||||
|
a = append(a, opt.Stream)
|
||||||
|
if opt.MaxLen > 0 {
|
||||||
|
a = append(a, "maxlen", opt.MaxLen)
|
||||||
|
} else if opt.MaxLenApprox > 0 {
|
||||||
|
a = append(a, "maxlen", "~", opt.MaxLenApprox)
|
||||||
|
}
|
||||||
|
if opt.ID != "" {
|
||||||
|
a = append(a, opt.ID)
|
||||||
|
} else {
|
||||||
|
a = append(a, "*")
|
||||||
|
}
|
||||||
|
for k, v := range opt.Values {
|
||||||
|
a = append(a, k)
|
||||||
|
a = append(a, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := NewStringCmd(a...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
|
||||||
|
return c.XAddExt(&XAddExt{
|
||||||
|
Stream: stream,
|
||||||
|
ID: id,
|
||||||
|
Values: values,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XLen(key string) *IntCmd {
|
||||||
|
cmd := NewIntCmd("xlen", key)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XRange(stream, start, stop string) *XMessageSliceCmd {
|
||||||
|
cmd := NewXMessageSliceCmd("xrange", stream, start, stop)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd {
|
||||||
|
cmd := NewXMessageSliceCmd("xrange", stream, start, stop, "count", count)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XRevRange(stream, start, stop string) *XMessageSliceCmd {
|
||||||
|
cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageSliceCmd {
|
||||||
|
cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop, "count", count)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
type XReadExt struct {
|
||||||
|
Streams []string
|
||||||
|
Count int64
|
||||||
|
Block time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
|
||||||
|
a := make([]interface{}, 0, 5+len(opt.Streams))
|
||||||
|
a = append(a, "xread")
|
||||||
|
if opt != nil {
|
||||||
|
if opt.Count > 0 {
|
||||||
|
a = append(a, "count")
|
||||||
|
a = append(a, opt.Count)
|
||||||
|
}
|
||||||
|
if opt.Block > 0 {
|
||||||
|
a = append(a, "block")
|
||||||
|
a = append(a, int64(opt.Block/time.Millisecond))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
a = append(a, "streams")
|
||||||
|
for _, s := range opt.Streams {
|
||||||
|
a = append(a, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := NewXStreamSliceCmd(a...)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
|
||||||
|
return c.XReadExt(&XReadExt{
|
||||||
|
Streams: streams,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
|
||||||
|
return c.XReadExt(&XReadExt{
|
||||||
|
Streams: streams,
|
||||||
|
Count: count,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
|
||||||
|
return c.XReadExt(&XReadExt{
|
||||||
|
Streams: streams,
|
||||||
|
Block: block,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// Z represents sorted set member.
|
// Z represents sorted set member.
|
||||||
type Z struct {
|
type Z struct {
|
||||||
Score float64
|
Score float64
|
||||||
|
|
193
commands_test.go
193
commands_test.go
|
@ -3018,6 +3018,199 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("streams", func() {
|
||||||
|
createStream := func() {
|
||||||
|
id, err := client.XAdd("stream", "1-0", map[string]interface{}{
|
||||||
|
"uno": "un",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(id).To(Equal("1-0"))
|
||||||
|
|
||||||
|
id, err = client.XAdd("stream", "2-0", map[string]interface{}{
|
||||||
|
"dos": "deux",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(id).To(Equal("2-0"))
|
||||||
|
|
||||||
|
id, err = client.XAdd("stream", "3-0", map[string]interface{}{
|
||||||
|
"tres": "troix",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(id).To(Equal("3-0"))
|
||||||
|
}
|
||||||
|
|
||||||
|
It("should XAdd", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
id, err := client.XAdd("stream", "*", map[string]interface{}{
|
||||||
|
"quatro": "quatre",
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
vals, err := client.XRange("stream", "-", "+").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(vals).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
{ID: id, Values: map[string]interface{}{"quatro": "quatre"}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XAddExt", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
id, err := client.XAddExt(&redis.XAddExt{
|
||||||
|
Stream: "stream",
|
||||||
|
MaxLen: 1,
|
||||||
|
Values: map[string]interface{}{"quatro": "quatre"},
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
vals, err := client.XRange("stream", "-", "+").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(vals).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: id, Values: map[string]interface{}{"quatro": "quatre"}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XLen", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
n, err := client.XLen("stream").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(Equal(int64(3)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRange", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
msgs, err := client.XRange("stream", "-", "+").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
msgs, err = client.XRange("stream", "2", "+").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
msgs, err = client.XRange("stream", "-", "2").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRangeN", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
msgs, err := client.XRangeN("stream", "-", "+", 2).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
msgs, err = client.XRangeN("stream", "2", "+", 1).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
msgs, err = client.XRangeN("stream", "-", "2", 1).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRevRange", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
msgs, err := client.XRevRange("stream", "+", "-").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
msgs, err = client.XRevRange("stream", "+", "2").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRevRangeN", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
msgs, err := client.XRevRangeN("stream", "+", "-", 2).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
msgs, err = client.XRevRangeN("stream", "+", "2", 1).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(msgs).To(Equal([]*redis.XMessage{
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XRead", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
res, err := client.XRead("stream", "0").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal([]*redis.XStream{{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||||
|
}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
_, err = client.XRead("stream", "3").Result()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XReadExt", func() {
|
||||||
|
createStream()
|
||||||
|
|
||||||
|
res, err := client.XReadExt(&redis.XReadExt{
|
||||||
|
Streams: []string{"stream", "0"},
|
||||||
|
Count: 2,
|
||||||
|
Block: 100 * time.Millisecond,
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal([]*redis.XStream{{
|
||||||
|
Stream: "stream",
|
||||||
|
Messages: []*redis.XMessage{
|
||||||
|
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||||
|
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||||
|
}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
_, err = client.XReadExt(&redis.XReadExt{
|
||||||
|
Streams: []string{"stream", "3"},
|
||||||
|
Count: 1,
|
||||||
|
Block: 100 * time.Millisecond,
|
||||||
|
}).Result()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
Describe("Geo add and radius search", func() {
|
Describe("Geo add and radius search", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
geoAdd := client.GeoAdd(
|
geoAdd := client.GeoAdd(
|
||||||
|
|
Loading…
Reference in New Issue