diff --git a/command.go b/command.go index 552c897b..11472bec 100644 --- a/command.go +++ b/command.go @@ -714,6 +714,179 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { //------------------------------------------------------------------------------ +type XStream struct { + Stream string + Messages []*XMessage +} + +type XMessage struct { + ID string + Values map[string]interface{} +} + +//------------------------------------------------------------------------------ + +type XStreamSliceCmd struct { + baseCmd + + val []*XStream +} + +var _ Cmder = (*XStreamSliceCmd)(nil) + +func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { + return &XStreamSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XStreamSliceCmd) Val() []*XStream { + return cmd.val +} + +func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) { + return cmd.val, cmd.err +} + +func (cmd *XStreamSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { + var v interface{} + v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]*XStream) + return nil +} + +// Implements proto.MultiBulkParse +func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + xx := make([]*XStream, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadArrayReply(xStreamParser) + if err != nil { + return nil, err + } + xx[i] = v.(*XStream) + } + return xx, nil +} + +// Implements proto.MultiBulkParse +func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + return &XStream{ + Stream: stream, + Messages: v.([]*XMessage), + }, nil +} + +//------------------------------------------------------------------------------ + +type XMessageSliceCmd struct { + baseCmd + + val []*XMessage +} + +var _ Cmder = (*XMessageSliceCmd)(nil) + +func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { + return &XMessageSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XMessageSliceCmd) Val() []*XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { + var v interface{} + v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]*XMessage) + return nil +} + +// Implements proto.MultiBulkParse +func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + msgs := make([]*XMessage, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadArrayReply(xMessageParser) + if err != nil { + return nil, err + } + msgs[i] = v.(*XMessage) + } + return msgs, nil +} + +// Implements proto.MultiBulkParse +func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xKeyValueParser) + if err != nil { + return nil, err + } + + return &XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }, nil +} + +// Implements proto.MultiBulkParse +func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) { + values := make(map[string]interface{}, n) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + value, err := rd.ReadStringReply() + if err != nil { + return nil, err + } + + values[key] = value + } + return values, nil +} + +//------------------------------------------------------------------------------ + type ZSliceCmd struct { baseCmd diff --git a/commands.go b/commands.go index c6a88154..1debee1e 100644 --- a/commands.go +++ b/commands.go @@ -171,6 +171,16 @@ type Cmdable interface { SRem(key string, members ...interface{}) *IntCmd SUnion(keys ...string) *StringSliceCmd SUnionStore(destination string, keys ...string) *IntCmd + XAdd(stream, id string, els map[string]interface{}) *StringCmd + XAddExt(opt *XAddExt) *StringCmd + XLen(key string) *IntCmd + XRange(stream, start, stop string) *XMessageSliceCmd + XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd + XRevRange(stream string, start, stop string) *XMessageSliceCmd + XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd + XRead(streams ...string) *XStreamSliceCmd + XReadN(count int64, streams ...string) *XStreamSliceCmd + XReadExt(opt *XReadExt) *XStreamSliceCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -1282,6 +1292,127 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd { //------------------------------------------------------------------------------ +type XAddExt struct { + Stream string + MaxLen int64 // MAXLEN N + MaxLenApprox int64 // MAXLEN ~ N + ID string + Values map[string]interface{} +} + +func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd { + a := make([]interface{}, 0, 6+len(opt.Values)*2) + a = append(a, "xadd") + a = append(a, opt.Stream) + if opt.MaxLen > 0 { + a = append(a, "maxlen", opt.MaxLen) + } else if opt.MaxLenApprox > 0 { + a = append(a, "maxlen", "~", opt.MaxLenApprox) + } + if opt.ID != "" { + a = append(a, opt.ID) + } else { + a = append(a, "*") + } + for k, v := range opt.Values { + a = append(a, k) + a = append(a, v) + } + + cmd := NewStringCmd(a...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd { + return c.XAddExt(&XAddExt{ + Stream: stream, + ID: id, + Values: values, + }) +} + +func (c *cmdable) XLen(key string) *IntCmd { + cmd := NewIntCmd("xlen", key) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRange(stream, start, stop string) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrange", stream, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrange", stream, start, stop, "count", count) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRevRange(stream, start, stop string) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageSliceCmd { + cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop, "count", count) + c.process(cmd) + return cmd +} + +type XReadExt struct { + Streams []string + Count int64 + Block time.Duration +} + +func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd { + a := make([]interface{}, 0, 5+len(opt.Streams)) + a = append(a, "xread") + if opt != nil { + if opt.Count > 0 { + a = append(a, "count") + a = append(a, opt.Count) + } + if opt.Block > 0 { + a = append(a, "block") + a = append(a, int64(opt.Block/time.Millisecond)) + } + } + a = append(a, "streams") + for _, s := range opt.Streams { + a = append(a, s) + } + + cmd := NewXStreamSliceCmd(a...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd { + return c.XReadExt(&XReadExt{ + Streams: streams, + }) +} + +func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd { + return c.XReadExt(&XReadExt{ + Streams: streams, + Count: count, + }) +} + +func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd { + return c.XReadExt(&XReadExt{ + Streams: streams, + Block: block, + }) +} + +//------------------------------------------------------------------------------ + // Z represents sorted set member. type Z struct { Score float64 diff --git a/commands_test.go b/commands_test.go index f4f794f5..896aee90 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3018,6 +3018,199 @@ var _ = Describe("Commands", func() { }) + Describe("streams", func() { + createStream := func() { + id, err := client.XAdd("stream", "1-0", map[string]interface{}{ + "uno": "un", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("1-0")) + + id, err = client.XAdd("stream", "2-0", map[string]interface{}{ + "dos": "deux", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("2-0")) + + id, err = client.XAdd("stream", "3-0", map[string]interface{}{ + "tres": "troix", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("3-0")) + } + + It("should XAdd", func() { + createStream() + + id, err := client.XAdd("stream", "*", map[string]interface{}{ + "quatro": "quatre", + }).Result() + Expect(err).NotTo(HaveOccurred()) + + vals, err := client.XRange("stream", "-", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + {ID: id, Values: map[string]interface{}{"quatro": "quatre"}}, + })) + }) + + It("should XAddExt", func() { + createStream() + + id, err := client.XAddExt(&redis.XAddExt{ + Stream: "stream", + MaxLen: 1, + Values: map[string]interface{}{"quatro": "quatre"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + vals, err := client.XRange("stream", "-", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]*redis.XMessage{ + {ID: id, Values: map[string]interface{}{"quatro": "quatre"}}, + })) + }) + + It("should XLen", func() { + createStream() + + n, err := client.XLen("stream").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XRange", func() { + createStream() + + msgs, err := client.XRange("stream", "-", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + })) + + msgs, err = client.XRange("stream", "2", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + })) + + msgs, err = client.XRange("stream", "-", "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + })) + }) + + It("should XRangeN", func() { + createStream() + + msgs, err := client.XRangeN("stream", "-", "+", 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + })) + + msgs, err = client.XRangeN("stream", "2", "+", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + })) + + msgs, err = client.XRangeN("stream", "-", "2", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + })) + }) + + It("should XRevRange", func() { + createStream() + + msgs, err := client.XRevRange("stream", "+", "-").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + })) + + msgs, err = client.XRevRange("stream", "+", "2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + })) + }) + + It("should XRevRangeN", func() { + createStream() + + msgs, err := client.XRevRangeN("stream", "+", "-", 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + })) + + msgs, err = client.XRevRangeN("stream", "+", "2", 1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(msgs).To(Equal([]*redis.XMessage{ + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + })) + }) + + It("should XRead", func() { + createStream() + + res, err := client.XRead("stream", "0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]*redis.XStream{{ + Stream: "stream", + Messages: []*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + {ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, + }}, + })) + + _, err = client.XRead("stream", "3").Result() + Expect(err).To(Equal(redis.Nil)) + }) + + It("should XReadExt", func() { + createStream() + + res, err := client.XReadExt(&redis.XReadExt{ + Streams: []string{"stream", "0"}, + Count: 2, + Block: 100 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]*redis.XStream{{ + Stream: "stream", + Messages: []*redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + }}, + })) + + _, err = client.XReadExt(&redis.XReadExt{ + Streams: []string{"stream", "3"}, + Count: 1, + Block: 100 * time.Millisecond, + }).Result() + Expect(err).To(Equal(redis.Nil)) + }) + }) + Describe("Geo add and radius search", func() { BeforeEach(func() { geoAdd := client.GeoAdd(