From bd0d1c2293b0985b77858396ee6fb574c6830f35 Mon Sep 17 00:00:00 2001 From: ZhiQiang Li <44493045+yikuaibro@users.noreply.github.com> Date: Tue, 14 Feb 2023 23:50:51 +0800 Subject: [PATCH] Add support for BLMPOP (#2442) * Add support for BLMPOP --- commands.go | 16 +++++++++++ commands_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/commands.go b/commands.go index dd3cfd24..ca6b1074 100644 --- a/commands.go +++ b/commands.go @@ -218,6 +218,7 @@ type Cmdable interface { HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd BLPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd + BLMPop(ctx context.Context, timeout time.Duration, direction string, count int64, keys ...string) *KeyValuesCmd BRPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd BRPopLPush(ctx context.Context, source, destination string, timeout time.Duration) *StringCmd LIndex(ctx context.Context, key string, index int64) *StringCmd @@ -1432,6 +1433,21 @@ func (c cmdable) BLPop(ctx context.Context, timeout time.Duration, keys ...strin return cmd } +func (c cmdable) BLMPop(ctx context.Context, timeout time.Duration, direction string, count int64, keys ...string) *KeyValuesCmd { + args := make([]interface{}, 3+len(keys), 6+len(keys)) + args[0] = "blmpop" + args[1] = formatSec(ctx, timeout) + args[2] = len(keys) + for i, key := range keys { + args[3+i] = key + } + args = append(args, strings.ToLower(direction), "count", count) + cmd := NewKeyValuesCmd(ctx, args...) + cmd.setReadTimeout(timeout) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) BRPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd { args := make([]interface{}, 1+len(keys)+1) args[0] = "brpop" diff --git a/commands_test.go b/commands_test.go index ba06a7c7..d34a30fb 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2312,6 +2312,81 @@ var _ = Describe("Commands", func() { Expect(err).To(HaveOccurred()) }) + It("should BLMPop", func() { + err := client.LPush(ctx, "list1", "one", "two", "three", "four", "five").Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.LPush(ctx, "list2", "a", "b", "c", "d", "e").Err() + Expect(err).NotTo(HaveOccurred()) + + key, val, err := client.BLMPop(ctx, 0, "left", 3, "list1", "list2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(key).To(Equal("list1")) + Expect(val).To(Equal([]string{"five", "four", "three"})) + + key, val, err = client.BLMPop(ctx, 0, "right", 3, "list1", "list2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(key).To(Equal("list1")) + Expect(val).To(Equal([]string{"one", "two"})) + + key, val, err = client.BLMPop(ctx, 0, "left", 1, "list1", "list2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(key).To(Equal("list2")) + Expect(val).To(Equal([]string{"e"})) + + key, val, err = client.BLMPop(ctx, 0, "right", 10, "list1", "list2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(key).To(Equal("list2")) + Expect(val).To(Equal([]string{"a", "b", "c", "d"})) + + }) + + It("should BLMPopBlocks", func() { + started := make(chan bool) + done := make(chan bool) + go func() { + defer GinkgoRecover() + + started <- true + key, val, err := client.BLMPop(ctx, 0, "left", 1, "list_list").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(key).To(Equal("list_list")) + Expect(val).To(Equal([]string{"a"})) + done <- true + }() + <-started + + select { + case <-done: + Fail("BLMPop is not blocked") + case <-time.After(time.Second): + //ok + } + + _, err := client.LPush(ctx, "list_list", "a").Result() + Expect(err).NotTo(HaveOccurred()) + + select { + case <-done: + //ok + case <-time.After(time.Second): + Fail("BLMPop is still blocked") + } + }) + + It("should BLMPop timeout", func() { + _, val, err := client.BLMPop(ctx, time.Second, "left", 1, "list1").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(val).To(BeNil()) + + Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred()) + + stats := client.PoolStats() + Expect(stats.Hits).To(Equal(uint32(2))) + Expect(stats.Misses).To(Equal(uint32(1))) + Expect(stats.Timeouts).To(Equal(uint32(0))) + }) + It("should LLen", func() { lPush := client.LPush(ctx, "list", "World") Expect(lPush.Err()).NotTo(HaveOccurred())