From 8527f5907e1979547090aa3e1d2e428c813dc56c Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Wed, 31 Oct 2018 14:35:23 +0100 Subject: [PATCH] Implement BZPop{Min,Max} --- command.go | 62 +++++++++++++++++++ commands.go | 37 +++++++++++ commands_test.go | 156 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 255 insertions(+) diff --git a/command.go b/command.go index 05dd675..cb4f94b 100644 --- a/command.go +++ b/command.go @@ -1337,6 +1337,68 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { //------------------------------------------------------------------------------ +type ZWithKeyCmd struct { + baseCmd + + val ZWithKey +} + +var _ Cmder = (*ZWithKeyCmd)(nil) + +func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { + return &ZWithKeyCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *ZWithKeyCmd) Val() ZWithKey { + return cmd.val +} + +func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ZWithKeyCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(zWithKeyParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.(ZWithKey) + return nil +} + +// Implements proto.MultiBulkParse +func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 3 { + return nil, fmt.Errorf("got %d elements, expected 3", n) + } + + var z ZWithKey + var err error + + z.Key, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + return z, nil +} + +//------------------------------------------------------------------------------ + type ScanCmd struct { baseCmd diff --git a/commands.go b/commands.go index e9a8992..e4cd677 100644 --- a/commands.go +++ b/commands.go @@ -185,6 +185,8 @@ type Cmdable interface { XClaimJustID(a *XClaimArgs) *StringSliceCmd XTrim(key string, maxLen int64) *IntCmd XTrimApprox(key string, maxLen int64) *IntCmd + BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd + BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -1550,6 +1552,13 @@ type Z struct { Member interface{} } +// ZWithKey represents sorted set member including the name of the key where it was popped. +type ZWithKey struct { + Score float64 + Member interface{} + Key string +} + // ZStore is used as an arg to ZInterStore and ZUnionStore. type ZStore struct { Weights []float64 @@ -1557,6 +1566,34 @@ type ZStore struct { Aggregate string } +// Redis `BZPOPMAX key [key ...] timeout` command. +func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmax" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +// Redis `BZPOPMIN key [key ...] timeout` command. +func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmin" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { for i, m := range members { a[n+2*i] = m.Score diff --git a/commands_test.go b/commands_test.go index 656cced..ba59727 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2118,6 +2118,162 @@ var _ = Describe("Commands", func() { Describe("sorted sets", func() { + It("should BZPopMax", func() { + err := client.ZAdd("zset1", redis.Z{ + Score: 1, + Member: "one", + }).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset1", redis.Z{ + Score: 2, + Member: "two", + }).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset1", redis.Z{ + Score: 3, + Member: "three", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + member, err := client.BZPopMax(0, "zset1", "zset2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(member).To(Equal(redis.ZWithKey{ + Score: 3, + Member: "three", + Key: "zset1", + })) + }) + + It("should BZPopMax blocks", func() { + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bZPopMax := client.BZPopMax(0, "zset") + Expect(bZPopMax.Err()).NotTo(HaveOccurred()) + Expect(bZPopMax.Val()).To(Equal(redis.ZWithKey{ + Member: "a", + Score: 1, + Key: "zset", + })) + done <- true + }() + <-started + + select { + case <-done: + Fail("BZPopMax is not blocked") + case <-time.After(time.Second): + // ok + } + + zAdd := client.ZAdd("zset", redis.Z{ + Member: "a", + Score: 1, + }) + Expect(zAdd.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BZPopMax is still blocked") + } + }) + + It("should BZPopMax timeout", func() { + val, err := client.BZPopMax(time.Second, "zset1").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(val).To(Equal(redis.ZWithKey{})) + + Expect(client.Ping().Err()).NotTo(HaveOccurred()) + + stats := client.PoolStats() + Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(2))) + Expect(stats.Timeouts).To(Equal(uint32(0))) + }) + + It("should BZPopMin", func() { + err := client.ZAdd("zset1", redis.Z{ + Score: 1, + Member: "one", + }).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset1", redis.Z{ + Score: 2, + Member: "two", + }).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset1", redis.Z{ + Score: 3, + Member: "three", + }).Err() + Expect(err).NotTo(HaveOccurred()) + + member, err := client.BZPopMin(0, "zset1", "zset2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(member).To(Equal(redis.ZWithKey{ + Score: 1, + Member: "one", + Key: "zset1", + })) + }) + + It("should BZPopMin blocks", func() { + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + bZPopMin := client.BZPopMin(0, "zset") + Expect(bZPopMin.Err()).NotTo(HaveOccurred()) + Expect(bZPopMin.Val()).To(Equal(redis.ZWithKey{ + Member: "a", + Score: 1, + Key: "zset", + })) + done <- true + }() + <-started + + select { + case <-done: + Fail("BZPopMin is not blocked") + case <-time.After(time.Second): + // ok + } + + zAdd := client.ZAdd("zset", redis.Z{ + Member: "a", + Score: 1, + }) + Expect(zAdd.Err()).NotTo(HaveOccurred()) + + select { + case <-done: + // ok + case <-time.After(time.Second): + Fail("BZPopMin is still blocked") + } + }) + + It("should BZPopMin timeout", func() { + val, err := client.BZPopMin(time.Second, "zset1").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(val).To(Equal(redis.ZWithKey{})) + + Expect(client.Ping().Err()).NotTo(HaveOccurred()) + + stats := client.PoolStats() + Expect(stats.Hits).To(Equal(uint32(1))) + Expect(stats.Misses).To(Equal(uint32(2))) + Expect(stats.Timeouts).To(Equal(uint32(0))) + }) + It("should ZAdd", func() { added, err := client.ZAdd("zset", redis.Z{ Score: 1,