forked from mirror/redis
Merge pull request #893 from rvolosatovs/feature/bzpop
Implement BZPop{Min,Max}
This commit is contained in:
commit
6809e07fd3
62
command.go
62
command.go
|
@ -1337,6 +1337,68 @@ func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type ZWithKeyCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
val ZWithKey
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*ZWithKeyCmd)(nil)
|
||||||
|
|
||||||
|
func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
|
||||||
|
return &ZWithKeyCmd{
|
||||||
|
baseCmd: baseCmd{_args: args},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ZWithKeyCmd) Val() ZWithKey {
|
||||||
|
return cmd.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
|
||||||
|
return cmd.Val(), cmd.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ZWithKeyCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
|
||||||
|
var v interface{}
|
||||||
|
v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
|
||||||
|
if cmd.err != nil {
|
||||||
|
return cmd.err
|
||||||
|
}
|
||||||
|
cmd.val = v.(ZWithKey)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements proto.MultiBulkParse
|
||||||
|
func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
if n != 3 {
|
||||||
|
return nil, fmt.Errorf("got %d elements, expected 3", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
var z ZWithKey
|
||||||
|
var err error
|
||||||
|
|
||||||
|
z.Key, err = rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
z.Member, err = rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
z.Score, err = rd.ReadFloatReply()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return z, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type ScanCmd struct {
|
type ScanCmd struct {
|
||||||
baseCmd
|
baseCmd
|
||||||
|
|
||||||
|
|
37
commands.go
37
commands.go
|
@ -185,6 +185,8 @@ type Cmdable interface {
|
||||||
XClaimJustID(a *XClaimArgs) *StringSliceCmd
|
XClaimJustID(a *XClaimArgs) *StringSliceCmd
|
||||||
XTrim(key string, maxLen int64) *IntCmd
|
XTrim(key string, maxLen int64) *IntCmd
|
||||||
XTrimApprox(key string, maxLen int64) *IntCmd
|
XTrimApprox(key string, maxLen int64) *IntCmd
|
||||||
|
BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
|
||||||
|
BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
|
||||||
ZAdd(key string, members ...Z) *IntCmd
|
ZAdd(key string, members ...Z) *IntCmd
|
||||||
ZAddNX(key string, members ...Z) *IntCmd
|
ZAddNX(key string, members ...Z) *IntCmd
|
||||||
ZAddXX(key string, members ...Z) *IntCmd
|
ZAddXX(key string, members ...Z) *IntCmd
|
||||||
|
@ -1550,6 +1552,13 @@ type Z struct {
|
||||||
Member interface{}
|
Member interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ZWithKey represents sorted set member including the name of the key where it was popped.
|
||||||
|
type ZWithKey struct {
|
||||||
|
Score float64
|
||||||
|
Member interface{}
|
||||||
|
Key string
|
||||||
|
}
|
||||||
|
|
||||||
// ZStore is used as an arg to ZInterStore and ZUnionStore.
|
// ZStore is used as an arg to ZInterStore and ZUnionStore.
|
||||||
type ZStore struct {
|
type ZStore struct {
|
||||||
Weights []float64
|
Weights []float64
|
||||||
|
@ -1557,6 +1566,34 @@ type ZStore struct {
|
||||||
Aggregate string
|
Aggregate string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Redis `BZPOPMAX key [key ...] timeout` command.
|
||||||
|
func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
|
||||||
|
args := make([]interface{}, 1+len(keys)+1)
|
||||||
|
args[0] = "bzpopmax"
|
||||||
|
for i, key := range keys {
|
||||||
|
args[1+i] = key
|
||||||
|
}
|
||||||
|
args[len(args)-1] = formatSec(timeout)
|
||||||
|
cmd := NewZWithKeyCmd(args...)
|
||||||
|
cmd.setReadTimeout(timeout)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
// Redis `BZPOPMIN key [key ...] timeout` command.
|
||||||
|
func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
|
||||||
|
args := make([]interface{}, 1+len(keys)+1)
|
||||||
|
args[0] = "bzpopmin"
|
||||||
|
for i, key := range keys {
|
||||||
|
args[1+i] = key
|
||||||
|
}
|
||||||
|
args[len(args)-1] = formatSec(timeout)
|
||||||
|
cmd := NewZWithKeyCmd(args...)
|
||||||
|
cmd.setReadTimeout(timeout)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
|
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
|
||||||
for i, m := range members {
|
for i, m := range members {
|
||||||
a[n+2*i] = m.Score
|
a[n+2*i] = m.Score
|
||||||
|
|
156
commands_test.go
156
commands_test.go
|
@ -2118,6 +2118,162 @@ var _ = Describe("Commands", func() {
|
||||||
|
|
||||||
Describe("sorted sets", func() {
|
Describe("sorted sets", func() {
|
||||||
|
|
||||||
|
It("should BZPopMax", func() {
|
||||||
|
err := client.ZAdd("zset1", redis.Z{
|
||||||
|
Score: 1,
|
||||||
|
Member: "one",
|
||||||
|
}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = client.ZAdd("zset1", redis.Z{
|
||||||
|
Score: 2,
|
||||||
|
Member: "two",
|
||||||
|
}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = client.ZAdd("zset1", redis.Z{
|
||||||
|
Score: 3,
|
||||||
|
Member: "three",
|
||||||
|
}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
member, err := client.BZPopMax(0, "zset1", "zset2").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(member).To(Equal(redis.ZWithKey{
|
||||||
|
Score: 3,
|
||||||
|
Member: "three",
|
||||||
|
Key: "zset1",
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should BZPopMax blocks", func() {
|
||||||
|
started := make(chan bool)
|
||||||
|
done := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
|
||||||
|
started <- true
|
||||||
|
bZPopMax := client.BZPopMax(0, "zset")
|
||||||
|
Expect(bZPopMax.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(bZPopMax.Val()).To(Equal(redis.ZWithKey{
|
||||||
|
Member: "a",
|
||||||
|
Score: 1,
|
||||||
|
Key: "zset",
|
||||||
|
}))
|
||||||
|
done <- true
|
||||||
|
}()
|
||||||
|
<-started
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
Fail("BZPopMax is not blocked")
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
// ok
|
||||||
|
}
|
||||||
|
|
||||||
|
zAdd := client.ZAdd("zset", redis.Z{
|
||||||
|
Member: "a",
|
||||||
|
Score: 1,
|
||||||
|
})
|
||||||
|
Expect(zAdd.Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// ok
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
Fail("BZPopMax is still blocked")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should BZPopMax timeout", func() {
|
||||||
|
val, err := client.BZPopMax(time.Second, "zset1").Result()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
Expect(val).To(Equal(redis.ZWithKey{}))
|
||||||
|
|
||||||
|
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
stats := client.PoolStats()
|
||||||
|
Expect(stats.Hits).To(Equal(uint32(1)))
|
||||||
|
Expect(stats.Misses).To(Equal(uint32(2)))
|
||||||
|
Expect(stats.Timeouts).To(Equal(uint32(0)))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should BZPopMin", func() {
|
||||||
|
err := client.ZAdd("zset1", redis.Z{
|
||||||
|
Score: 1,
|
||||||
|
Member: "one",
|
||||||
|
}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = client.ZAdd("zset1", redis.Z{
|
||||||
|
Score: 2,
|
||||||
|
Member: "two",
|
||||||
|
}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = client.ZAdd("zset1", redis.Z{
|
||||||
|
Score: 3,
|
||||||
|
Member: "three",
|
||||||
|
}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
member, err := client.BZPopMin(0, "zset1", "zset2").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(member).To(Equal(redis.ZWithKey{
|
||||||
|
Score: 1,
|
||||||
|
Member: "one",
|
||||||
|
Key: "zset1",
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should BZPopMin blocks", func() {
|
||||||
|
started := make(chan bool)
|
||||||
|
done := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
|
||||||
|
started <- true
|
||||||
|
bZPopMin := client.BZPopMin(0, "zset")
|
||||||
|
Expect(bZPopMin.Err()).NotTo(HaveOccurred())
|
||||||
|
Expect(bZPopMin.Val()).To(Equal(redis.ZWithKey{
|
||||||
|
Member: "a",
|
||||||
|
Score: 1,
|
||||||
|
Key: "zset",
|
||||||
|
}))
|
||||||
|
done <- true
|
||||||
|
}()
|
||||||
|
<-started
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
Fail("BZPopMin is not blocked")
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
// ok
|
||||||
|
}
|
||||||
|
|
||||||
|
zAdd := client.ZAdd("zset", redis.Z{
|
||||||
|
Member: "a",
|
||||||
|
Score: 1,
|
||||||
|
})
|
||||||
|
Expect(zAdd.Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// ok
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
Fail("BZPopMin is still blocked")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should BZPopMin timeout", func() {
|
||||||
|
val, err := client.BZPopMin(time.Second, "zset1").Result()
|
||||||
|
Expect(err).To(Equal(redis.Nil))
|
||||||
|
Expect(val).To(Equal(redis.ZWithKey{}))
|
||||||
|
|
||||||
|
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
stats := client.PoolStats()
|
||||||
|
Expect(stats.Hits).To(Equal(uint32(1)))
|
||||||
|
Expect(stats.Misses).To(Equal(uint32(2)))
|
||||||
|
Expect(stats.Timeouts).To(Equal(uint32(0)))
|
||||||
|
})
|
||||||
|
|
||||||
It("should ZAdd", func() {
|
It("should ZAdd", func() {
|
||||||
added, err := client.ZAdd("zset", redis.Z{
|
added, err := client.ZAdd("zset", redis.Z{
|
||||||
Score: 1,
|
Score: 1,
|
||||||
|
|
Loading…
Reference in New Issue