Implement BZPop{Min,Max}

This commit is contained in:
Roman Volosatovs 2018-10-31 14:35:23 +01:00
parent b3d9bf10f6
commit 8527f5907e
No known key found for this signature in database
GPG Key ID: 3AC661943D80C89E
3 changed files with 255 additions and 0 deletions

View File

@ -1337,6 +1337,68 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type ZWithKeyCmd struct {
baseCmd
val ZWithKey
}
var _ Cmder = (*ZWithKeyCmd)(nil)
func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(ZWithKey)
return nil
}
// Implements proto.MultiBulkParse
func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 3 {
return nil, fmt.Errorf("got %d elements, expected 3", n)
}
var z ZWithKey
var err error
z.Key, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
return z, nil
}
//------------------------------------------------------------------------------
type ScanCmd struct { type ScanCmd struct {
baseCmd baseCmd

View File

@ -185,6 +185,8 @@ type Cmdable interface {
XClaimJustID(a *XClaimArgs) *StringSliceCmd XClaimJustID(a *XClaimArgs) *StringSliceCmd
XTrim(key string, maxLen int64) *IntCmd XTrim(key string, maxLen int64) *IntCmd
XTrimApprox(key string, maxLen int64) *IntCmd XTrimApprox(key string, maxLen int64) *IntCmd
BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(key string, members ...Z) *IntCmd ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd
@ -1550,6 +1552,13 @@ type Z struct {
Member interface{} Member interface{}
} }
// ZWithKey represents sorted set member including the name of the key where it was popped.
type ZWithKey struct {
Score float64
Member interface{}
Key string
}
// ZStore is used as an arg to ZInterStore and ZUnionStore. // ZStore is used as an arg to ZInterStore and ZUnionStore.
type ZStore struct { type ZStore struct {
Weights []float64 Weights []float64
@ -1557,6 +1566,34 @@ type ZStore struct {
Aggregate string Aggregate string
} }
// Redis `BZPOPMAX key [key ...] timeout` command.
func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmax"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
// Redis `BZPOPMIN key [key ...] timeout` command.
func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
args[0] = "bzpopmin"
for i, key := range keys {
args[1+i] = key
}
args[len(args)-1] = formatSec(timeout)
cmd := NewZWithKeyCmd(args...)
cmd.setReadTimeout(timeout)
c.process(cmd)
return cmd
}
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
for i, m := range members { for i, m := range members {
a[n+2*i] = m.Score a[n+2*i] = m.Score

View File

@ -2118,6 +2118,162 @@ var _ = Describe("Commands", func() {
Describe("sorted sets", func() { Describe("sorted sets", func() {
It("should BZPopMax", func() {
err := client.ZAdd("zset1", redis.Z{
Score: 1,
Member: "one",
}).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ZAdd("zset1", redis.Z{
Score: 2,
Member: "two",
}).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ZAdd("zset1", redis.Z{
Score: 3,
Member: "three",
}).Err()
Expect(err).NotTo(HaveOccurred())
member, err := client.BZPopMax(0, "zset1", "zset2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(member).To(Equal(redis.ZWithKey{
Score: 3,
Member: "three",
Key: "zset1",
}))
})
It("should BZPopMax blocks", func() {
started := make(chan bool)
done := make(chan bool)
go func() {
defer GinkgoRecover()
started <- true
bZPopMax := client.BZPopMax(0, "zset")
Expect(bZPopMax.Err()).NotTo(HaveOccurred())
Expect(bZPopMax.Val()).To(Equal(redis.ZWithKey{
Member: "a",
Score: 1,
Key: "zset",
}))
done <- true
}()
<-started
select {
case <-done:
Fail("BZPopMax is not blocked")
case <-time.After(time.Second):
// ok
}
zAdd := client.ZAdd("zset", redis.Z{
Member: "a",
Score: 1,
})
Expect(zAdd.Err()).NotTo(HaveOccurred())
select {
case <-done:
// ok
case <-time.After(time.Second):
Fail("BZPopMax is still blocked")
}
})
It("should BZPopMax timeout", func() {
val, err := client.BZPopMax(time.Second, "zset1").Result()
Expect(err).To(Equal(redis.Nil))
Expect(val).To(Equal(redis.ZWithKey{}))
Expect(client.Ping().Err()).NotTo(HaveOccurred())
stats := client.PoolStats()
Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Misses).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
})
It("should BZPopMin", func() {
err := client.ZAdd("zset1", redis.Z{
Score: 1,
Member: "one",
}).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ZAdd("zset1", redis.Z{
Score: 2,
Member: "two",
}).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ZAdd("zset1", redis.Z{
Score: 3,
Member: "three",
}).Err()
Expect(err).NotTo(HaveOccurred())
member, err := client.BZPopMin(0, "zset1", "zset2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(member).To(Equal(redis.ZWithKey{
Score: 1,
Member: "one",
Key: "zset1",
}))
})
It("should BZPopMin blocks", func() {
started := make(chan bool)
done := make(chan bool)
go func() {
defer GinkgoRecover()
started <- true
bZPopMin := client.BZPopMin(0, "zset")
Expect(bZPopMin.Err()).NotTo(HaveOccurred())
Expect(bZPopMin.Val()).To(Equal(redis.ZWithKey{
Member: "a",
Score: 1,
Key: "zset",
}))
done <- true
}()
<-started
select {
case <-done:
Fail("BZPopMin is not blocked")
case <-time.After(time.Second):
// ok
}
zAdd := client.ZAdd("zset", redis.Z{
Member: "a",
Score: 1,
})
Expect(zAdd.Err()).NotTo(HaveOccurred())
select {
case <-done:
// ok
case <-time.After(time.Second):
Fail("BZPopMin is still blocked")
}
})
It("should BZPopMin timeout", func() {
val, err := client.BZPopMin(time.Second, "zset1").Result()
Expect(err).To(Equal(redis.Nil))
Expect(val).To(Equal(redis.ZWithKey{}))
Expect(client.Ping().Err()).NotTo(HaveOccurred())
stats := client.PoolStats()
Expect(stats.Hits).To(Equal(uint32(1)))
Expect(stats.Misses).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
})
It("should ZAdd", func() { It("should ZAdd", func() {
added, err := client.ZAdd("zset", redis.Z{ added, err := client.ZAdd("zset", redis.Z{
Score: 1, Score: 1,