From 31495ac5706063f7bb3b06efc1de46c7e7a0c448 Mon Sep 17 00:00:00 2001 From: Parvez Date: Thu, 13 May 2021 20:45:37 +0530 Subject: [PATCH 01/13] Added missing idle args in XPendingExtArgs (#1750) Added missing idle args in XPendingExtArgs --- commands.go | 9 +++++++-- commands_test.go | 11 ++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/commands.go b/commands.go index 9e572832..3fa8b758 100644 --- a/commands.go +++ b/commands.go @@ -1811,6 +1811,7 @@ func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCm type XPendingExtArgs struct { Stream string Group string + Idle time.Duration Start string End string Count int64 @@ -1818,8 +1819,12 @@ type XPendingExtArgs struct { } func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd { - args := make([]interface{}, 0, 7) - args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + args := make([]interface{}, 0, 9) + args = append(args, "xpending", a.Stream, a.Group) + if a.Idle != 0 { + args = append(args, "idle", formatMs(ctx, a.Idle)) + } + args = append(args, a.Start, a.End, a.Count) if a.Consumer != "" { args = append(args, a.Consumer) } diff --git a/commands_test.go b/commands_test.go index f64dc97b..47233cbf 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4225,15 +4225,15 @@ var _ = Describe("Commands", func() { Higher: "3-0", Consumers: map[string]int64{"consumer": 3}, })) - - infoExt, err := client.XPendingExt(ctx, &redis.XPendingExtArgs{ + args := &redis.XPendingExtArgs{ Stream: "stream", Group: "group", Start: "-", End: "+", Count: 10, Consumer: "consumer", - }).Result() + } + infoExt, err := client.XPendingExt(ctx, args).Result() Expect(err).NotTo(HaveOccurred()) for i := range infoExt { infoExt[i].Idle = 0 @@ -4244,6 +4244,11 @@ var _ = Describe("Commands", func() { {ID: "3-0", Consumer: "consumer", Idle: 0, RetryCount: 1}, })) + args.Idle = 72 * time.Hour + infoExt, err = client.XPendingExt(ctx, args).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(infoExt).To(HaveLen(0)) + n, err := client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result() Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(3))) From 8f0fbd2fe81b4af1a394a0109820362df011c0ae Mon Sep 17 00:00:00 2001 From: monkey92t Date: Sun, 16 May 2021 00:37:22 +0800 Subject: [PATCH 02/13] fix #1754 (#1756) --- commands.go | 7 +++++++ commands_test.go | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/commands.go b/commands.go index 3fa8b758..0619675a 100644 --- a/commands.go +++ b/commands.go @@ -179,6 +179,7 @@ type Cmdable interface { LInsertAfter(ctx context.Context, key string, pivot, value interface{}) *IntCmd LLen(ctx context.Context, key string) *IntCmd LPop(ctx context.Context, key string) *StringCmd + LPopCount(ctx context.Context, key string, count int) *StringSliceCmd LPos(ctx context.Context, key string, value string, args LPosArgs) *IntCmd LPosCount(ctx context.Context, key string, value string, count int64, args LPosArgs) *IntSliceCmd LPush(ctx context.Context, key string, values ...interface{}) *IntCmd @@ -1314,6 +1315,12 @@ func (c cmdable) LPop(ctx context.Context, key string) *StringCmd { return cmd } +func (c cmdable) LPopCount(ctx context.Context, key string, count int) *StringSliceCmd { + cmd := NewStringSliceCmd(ctx, "lpop", key, count) + _ = c(ctx, cmd) + return cmd +} + type LPosArgs struct { Rank, MaxLen int64 } diff --git a/commands_test.go b/commands_test.go index 47233cbf..e7648232 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2050,6 +2050,25 @@ var _ = Describe("Commands", func() { Expect(lRange.Val()).To(Equal([]string{"two", "three"})) }) + It("should LPopCount", func() { + rPush := client.RPush(ctx, "list", "one") + Expect(rPush.Err()).NotTo(HaveOccurred()) + rPush = client.RPush(ctx, "list", "two") + Expect(rPush.Err()).NotTo(HaveOccurred()) + rPush = client.RPush(ctx, "list", "three") + Expect(rPush.Err()).NotTo(HaveOccurred()) + rPush = client.RPush(ctx, "list", "four") + Expect(rPush.Err()).NotTo(HaveOccurred()) + + lPopCount := client.LPopCount(ctx, "list", 2) + Expect(lPopCount.Err()).NotTo(HaveOccurred()) + Expect(lPopCount.Val()).To(Equal([]string{"one", "two"})) + + lRange := client.LRange(ctx, "list", 0, -1) + Expect(lRange.Err()).NotTo(HaveOccurred()) + Expect(lRange.Val()).To(Equal([]string{"three", "four"})) + }) + It("should LPos", func() { rPush := client.RPush(ctx, "list", "a") Expect(rPush.Err()).NotTo(HaveOccurred()) From f33c425a30e75334163b64704a06bf544c7e1d21 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 17 May 2021 11:56:12 +0300 Subject: [PATCH 03/13] Replace go-pg with bun --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1a0e460e..e58d867d 100644 --- a/README.md +++ b/README.md @@ -163,7 +163,5 @@ go test ## See also -- [Fast and flexible HTTP router](https://github.com/vmihailenco/treemux) -- [Golang PostgreSQL ORM](https://github.com/go-pg/pg) -- [Golang msgpack](https://github.com/vmihailenco/msgpack) -- [Golang message task queue](https://github.com/vmihailenco/taskq) +- [Fast and flexible ORM](https://github.com/uptrace/bun) +- [msgpack for Go](https://github.com/vmihailenco/msgpack) From 3871963e2d9a686b5ab1724410da0f54e80f90c5 Mon Sep 17 00:00:00 2001 From: monkey Date: Tue, 18 May 2021 15:41:20 +0800 Subject: [PATCH 04/13] fix #1755 Signed-off-by: monkey --- command.go | 295 ++++++++++++++++++++++++++++++++++++++++++++++- commands.go | 13 +++ commands_test.go | 127 ++++++++++++++++++++ 3 files changed, 434 insertions(+), 1 deletion(-) diff --git a/command.go b/command.go index f10c4781..9ff8a260 100644 --- a/command.go +++ b/command.go @@ -1512,7 +1512,7 @@ type XInfoConsumer struct { Idle int64 } -var _ Cmder = (*XInfoGroupsCmd)(nil) +var _ Cmder = (*XInfoConsumersCmd)(nil) func NewXInfoConsumersCmd(ctx context.Context, stream string, group string) *XInfoConsumersCmd { return &XInfoConsumersCmd{ @@ -1784,6 +1784,299 @@ func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) { //------------------------------------------------------------------------------ +type XInfoStreamFullCmd struct { + baseCmd + val *XInfoStreamFull +} + +type XInfoStreamFull struct { + Length int64 + RadixTreeKeys int64 + RadixTreeNodes int64 + LastGeneratedID string + Entries []XMessage + Groups []XInfoStreamGroup +} + +type XInfoStreamGroup struct { + Name string + LastDeliveredID string + PelCount int64 + Pending []XInfoStreamGroupPending + Consumers []XInfoStreamConsumer +} + +type XInfoStreamGroupPending struct { + ID string + Consumer string + DeliveryTime time.Time + DeliveryCount int64 +} + +type XInfoStreamConsumer struct { + Name string + SeenTime time.Time + PelCount int64 + Pending []XInfoStreamConsumerPending +} + +type XInfoStreamConsumerPending struct { + ID string + DeliveryTime time.Time + DeliveryCount int64 +} + +var _ Cmder = (*XInfoStreamFullCmd)(nil) + +func NewXInfoStreamFullCmd(ctx context.Context, args ...interface{}) *XInfoStreamFullCmd { + return &XInfoStreamFullCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *XInfoStreamFullCmd) Val() *XInfoStreamFull { + return cmd.val +} + +func (cmd *XInfoStreamFullCmd) Result() (*XInfoStreamFull, error) { + return cmd.val, cmd.err +} + +func (cmd *XInfoStreamFullCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + if n != 12 { + return fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 12", n) + } + + cmd.val = &XInfoStreamFull{} + + for i := 0; i < 6; i++ { + key, err := rd.ReadString() + if err != nil { + return err + } + + switch key { + case "length": + cmd.val.Length, err = rd.ReadIntReply() + case "radix-tree-keys": + cmd.val.RadixTreeKeys, err = rd.ReadIntReply() + case "radix-tree-nodes": + cmd.val.RadixTreeNodes, err = rd.ReadIntReply() + case "last-generated-id": + cmd.val.LastGeneratedID, err = rd.ReadString() + case "entries": + cmd.val.Entries, err = readXMessageSlice(rd) + case "groups": + groups, err := rd.ReadReply(readStreamGroups) + if err != nil { + return err + } + cmd.val.Groups = groups.([]XInfoStreamGroup) + default: + return fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + if err != nil { + return err + } + } + return nil +} + +func readStreamGroups(rd *proto.Reader, n int64) (interface{}, error) { + groups := make([]XInfoStreamGroup, 0, n) + for i := int64(0); i < n; i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 10 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 10", nn) + } + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + group := XInfoStreamGroup{} + + switch key { + case "name": + group.Name, err = rd.ReadString() + case "last-delivered-id": + group.LastDeliveredID, err = rd.ReadString() + case "pel-count": + group.PelCount, err = rd.ReadIntReply() + case "pending": + group.Pending, err = readXInfoStreamGroupPending(rd) + case "consumers": + group.Consumers, err = readXInfoStreamConsumers(rd) + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + + if err != nil { + return nil, err + } + + groups = append(groups, group) + } + + return groups, nil +} + +func readXInfoStreamGroupPending(rd *proto.Reader) ([]XInfoStreamGroupPending, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + pending := make([]XInfoStreamGroupPending, 0, n) + + for i := 0; i < n; i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 4 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 4", nn) + } + + p := XInfoStreamGroupPending{} + + p.ID, err = rd.ReadString() + if err != nil { + return nil, err + } + + p.Consumer, err = rd.ReadString() + if err != nil { + return nil, err + } + + delivery, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + + p.DeliveryCount, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + + pending = append(pending, p) + } + + return pending, nil +} + +func readXInfoStreamConsumers(rd *proto.Reader) ([]XInfoStreamConsumer, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + consumers := make([]XInfoStreamConsumer, 0, n) + + for i := 0; i < n; i++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 8 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ + "wanted 8", nn) + } + + cKey, err := rd.ReadString() + if err != nil { + return nil, err + } + + c := XInfoStreamConsumer{} + + switch cKey { + case "name": + c.Name, err = rd.ReadString() + case "seen-time": + seen, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond)) + case "pel-count": + c.PelCount, err = rd.ReadIntReply() + case "pending": + pendingNumber, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + + c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) + + for f := 0; f < pendingNumber; f++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 3 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ + "wanted 3", nn) + } + + p := XInfoStreamConsumerPending{} + + p.ID, err = rd.ReadString() + if err != nil { + return nil, err + } + + delivery, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + + p.DeliveryCount, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + + c.Pending = append(c.Pending, p) + } + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", cKey) + } + + if err != nil { + return nil, err + } + + consumers = append(consumers, c) + } + + return consumers, nil +} + +//------------------------------------------------------------------------------ + type ZSliceCmd struct { baseCmd diff --git a/commands.go b/commands.go index 0619675a..2302db64 100644 --- a/commands.go +++ b/commands.go @@ -1906,6 +1906,19 @@ func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd { return cmd } +// XInfoStreamFull XINFO STREAM FULL [COUNT count] +// redis-server >= 6.0. +func (c cmdable) XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd { + args := make([]interface{}, 0, 6) + args = append(args, "xinfo", "stream", key, "full") + if count > 0 { + args = append(args, "count", count) + } + cmd := NewXInfoStreamFullCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + //------------------------------------------------------------------------------ // Z represents sorted set member. diff --git a/commands_test.go b/commands_test.go index e7648232..2d89017f 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4392,6 +4392,133 @@ var _ = Describe("Commands", func() { })) }) + It("should XINFO STREAM FULL", func() { + res, err := client.XInfoStreamFull(ctx, "stream", 2).Result() + Expect(err).NotTo(HaveOccurred()) + res.RadixTreeKeys = 0 + res.RadixTreeNodes = 0 + + // Verify DeliveryTime + now := time.Now() + maxElapsed := 10 * time.Minute + for k, g := range res.Groups { + for k2, p := range g.Pending { + Expect(now.Sub(p.DeliveryTime)).To(BeNumerically("<=", maxElapsed)) + res.Groups[k].Pending[k2].DeliveryTime = time.Time{} + } + for k3, c := range g.Consumers { + Expect(now.Sub(c.SeenTime)).To(BeNumerically("<=", maxElapsed)) + res.Groups[k].Consumers[k3].SeenTime = time.Time{} + + for k4, p := range c.Pending { + Expect(now.Sub(p.DeliveryTime)).To(BeNumerically("<=", maxElapsed)) + res.Groups[k].Consumers[k3].Pending[k4].DeliveryTime = time.Time{} + } + } + } + + Expect(res).To(Equal(&redis.XInfoStreamFull{ + Length: 3, + RadixTreeKeys: 0, + RadixTreeNodes: 0, + LastGeneratedID: "3-0", + Entries: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, + {ID: "2-0", Values: map[string]interface{}{"dos": "deux"}}, + }, + Groups: []redis.XInfoStreamGroup{ + { + Name: "group1", + LastDeliveredID: "3-0", + PelCount: 3, + Pending: []redis.XInfoStreamGroupPending{ + { + ID: "1-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "2-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + Consumers: []redis.XInfoStreamConsumer{ + { + Name: "consumer1", + SeenTime: time.Time{}, + PelCount: 2, + Pending: []redis.XInfoStreamConsumerPending{ + { + ID: "1-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "2-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + }, + { + Name: "consumer2", + SeenTime: time.Time{}, + PelCount: 1, + Pending: []redis.XInfoStreamConsumerPending{ + { + ID: "3-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + }, + }, + }, + { + Name: "group2", + LastDeliveredID: "3-0", + PelCount: 2, + Pending: []redis.XInfoStreamGroupPending{ + { + ID: "2-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "3-0", + Consumer: "consumer1", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + Consumers: []redis.XInfoStreamConsumer{ + { + Name: "consumer1", + SeenTime: time.Time{}, + PelCount: 2, + Pending: []redis.XInfoStreamConsumerPending{ + { + ID: "2-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + { + ID: "3-0", + DeliveryTime: time.Time{}, + DeliveryCount: 1, + }, + }, + }, + }, + }, + }, + })) + }) + It("should XINFO GROUPS", func() { res, err := client.XInfoGroups(ctx, "stream").Result() Expect(err).NotTo(HaveOccurred()) From 76393b5b71e632ee425376afdc15406ed854403d Mon Sep 17 00:00:00 2001 From: monkey Date: Tue, 18 May 2021 16:05:17 +0800 Subject: [PATCH 05/13] fix read data Signed-off-by: monkey --- command.go | 155 +++++++++++++++++++++++++++-------------------------- 1 file changed, 79 insertions(+), 76 deletions(-) diff --git a/command.go b/command.go index 9ff8a260..2edba95d 100644 --- a/command.go +++ b/command.go @@ -1879,11 +1879,7 @@ func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { case "entries": cmd.val.Entries, err = readXMessageSlice(rd) case "groups": - groups, err := rd.ReadReply(readStreamGroups) - if err != nil { - return err - } - cmd.val.Groups = groups.([]XInfoStreamGroup) + cmd.val.Groups, err = readStreamGroups(rd) default: return fmt.Errorf("redis: unexpected content %s "+ "in XINFO STREAM reply", key) @@ -1895,9 +1891,13 @@ func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { return nil } -func readStreamGroups(rd *proto.Reader, n int64) (interface{}, error) { +func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } groups := make([]XInfoStreamGroup, 0, n) - for i := int64(0); i < n; i++ { + for i := 0; i < n; i++ { nn, err := rd.ReadArrayLen() if err != nil { return nil, err @@ -1906,31 +1906,34 @@ func readStreamGroups(rd *proto.Reader, n int64) (interface{}, error) { return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM FULL reply,"+ "wanted 10", nn) } - key, err := rd.ReadString() - if err != nil { - return nil, err - } group := XInfoStreamGroup{} - switch key { - case "name": - group.Name, err = rd.ReadString() - case "last-delivered-id": - group.LastDeliveredID, err = rd.ReadString() - case "pel-count": - group.PelCount, err = rd.ReadIntReply() - case "pending": - group.Pending, err = readXInfoStreamGroupPending(rd) - case "consumers": - group.Consumers, err = readXInfoStreamConsumers(rd) - default: - return nil, fmt.Errorf("redis: unexpected content %s "+ - "in XINFO STREAM reply", key) - } + for f := 0; f < 5; f++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } - if err != nil { - return nil, err + switch key { + case "name": + group.Name, err = rd.ReadString() + case "last-delivered-id": + group.LastDeliveredID, err = rd.ReadString() + case "pel-count": + group.PelCount, err = rd.ReadIntReply() + case "pending": + group.Pending, err = readXInfoStreamGroupPending(rd) + case "consumers": + group.Consumers, err = readXInfoStreamConsumers(rd) + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", key) + } + + if err != nil { + return nil, err + } } groups = append(groups, group) @@ -2004,71 +2007,71 @@ func readXInfoStreamConsumers(rd *proto.Reader) ([]XInfoStreamConsumer, error) { "wanted 8", nn) } - cKey, err := rd.ReadString() - if err != nil { - return nil, err - } - c := XInfoStreamConsumer{} - switch cKey { - case "name": - c.Name, err = rd.ReadString() - case "seen-time": - seen, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond)) - case "pel-count": - c.PelCount, err = rd.ReadIntReply() - case "pending": - pendingNumber, err := rd.ReadArrayLen() + for f := 0; f < 4; f++ { + cKey, err := rd.ReadString() if err != nil { return nil, err } - c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) - - for f := 0; f < pendingNumber; f++ { - nn, err := rd.ReadArrayLen() + switch cKey { + case "name": + c.Name, err = rd.ReadString() + case "seen-time": + seen, err := rd.ReadIntReply() if err != nil { return nil, err } - if nn != 3 { - return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ - "wanted 3", nn) - } - - p := XInfoStreamConsumerPending{} - - p.ID, err = rd.ReadString() + c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond)) + case "pel-count": + c.PelCount, err = rd.ReadIntReply() + case "pending": + pendingNumber, err := rd.ReadArrayLen() if err != nil { return nil, err } - delivery, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) - p.DeliveryCount, err = rd.ReadIntReply() - if err != nil { - return nil, err - } + for f := 0; f < pendingNumber; f++ { + nn, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if nn != 3 { + return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+ + "wanted 3", nn) + } - c.Pending = append(c.Pending, p) + p := XInfoStreamConsumerPending{} + + p.ID, err = rd.ReadString() + if err != nil { + return nil, err + } + + delivery, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond)) + + p.DeliveryCount, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + + c.Pending = append(c.Pending, p) + } + default: + return nil, fmt.Errorf("redis: unexpected content %s "+ + "in XINFO STREAM reply", cKey) + } + if err != nil { + return nil, err } - default: - return nil, fmt.Errorf("redis: unexpected content %s "+ - "in XINFO STREAM reply", cKey) } - - if err != nil { - return nil, err - } - consumers = append(consumers, c) } From 1393126c293906d4c72c5ab95caf0448b403197d Mon Sep 17 00:00:00 2001 From: monkey92t Date: Wed, 19 May 2021 16:52:13 +0800 Subject: [PATCH 06/13] fix #1758 (#1759) fix #1758 --- command.go | 8 +++++++- commands_test.go | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/command.go b/command.go index 2edba95d..9a3018d3 100644 --- a/command.go +++ b/command.go @@ -1769,8 +1769,14 @@ func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) { info.LastGeneratedID, err = rd.ReadString() case "first-entry": info.FirstEntry, err = readXMessage(rd) + if err == Nil { + err = nil + } case "last-entry": info.LastEntry, err = readXMessage(rd) + if err == Nil { + err = nil + } default: return nil, fmt.Errorf("redis: unexpected content %s "+ "in XINFO STREAM reply", key) @@ -2034,7 +2040,7 @@ func readXInfoStreamConsumers(rd *proto.Reader) ([]XInfoStreamConsumer, error) { c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber) - for f := 0; f < pendingNumber; f++ { + for pn := 0; pn < pendingNumber; pn++ { nn, err := rd.ReadArrayLen() if err != nil { return nil, err diff --git a/commands_test.go b/commands_test.go index 2d89017f..3ae3007e 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4390,6 +4390,26 @@ var _ = Describe("Commands", func() { FirstEntry: redis.XMessage{ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, LastEntry: redis.XMessage{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, })) + + // stream is empty + n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + + res, err = client.XInfoStream(ctx, "stream").Result() + Expect(err).NotTo(HaveOccurred()) + res.RadixTreeKeys = 0 + res.RadixTreeNodes = 0 + + Expect(res).To(Equal(&redis.XInfoStream{ + Length: 0, + RadixTreeKeys: 0, + RadixTreeNodes: 0, + Groups: 2, + LastGeneratedID: "3-0", + FirstEntry: redis.XMessage{}, + LastEntry: redis.XMessage{}, + })) }) It("should XINFO STREAM FULL", func() { From 7a7b75e0816ba9fc5c9f945d82797c0575ec1a60 Mon Sep 17 00:00:00 2001 From: chenguijun <18811388234@163.com> Date: Sun, 23 May 2021 16:55:03 +0800 Subject: [PATCH 07/13] add command zDiff Signed-off-by: chenguijun <18811388234@163.com> --- commands.go | 31 +++++++++++++++++++++++++++++++ commands_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/commands.go b/commands.go index 2302db64..9c643904 100644 --- a/commands.go +++ b/commands.go @@ -277,6 +277,8 @@ type Cmdable interface { ZScore(ctx context.Context, key, member string) *FloatCmd ZUnionStore(ctx context.Context, dest string, store *ZStore) *IntCmd ZRandMember(ctx context.Context, key string, count int, withScores bool) *StringSliceCmd + ZDiff(ctx context.Context, keys ...string) *StringSliceCmd + ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd PFAdd(ctx context.Context, key string, els ...interface{}) *IntCmd PFCount(ctx context.Context, keys ...string) *IntCmd @@ -2368,6 +2370,35 @@ func (c cmdable) ZRandMember(ctx context.Context, key string, count int, withSco return cmd } +// redis-server version >= 6.2.0. +func (c cmdable) ZDiff(ctx context.Context, keys ...string) *StringSliceCmd { + args := make([]interface{}, 2+len(keys)) + args[0] = "zdiff" + args[1] = len(keys) + for i, key := range keys { + args[i+2] = key + } + + cmd := NewStringSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +// redis-server version >= 6.2.0. +func (c cmdable) ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd { + args := make([]interface{}, 3+len(keys)) + args[0] = "zdiff" + args[1] = len(keys) + for i, key := range keys { + args[i+2] = key + } + args[len(keys)+2] = "withscores" + + cmd := NewZSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + //------------------------------------------------------------------------------ func (c cmdable) PFAdd(ctx context.Context, key string, els ...interface{}) *IntCmd { diff --git a/commands_test.go b/commands_test.go index 3ae3007e..1ce88725 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3960,6 +3960,45 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(slice).To(Or(Equal([]string{"one", "1"}), Equal([]string{"two", "2"}))) }) + + It("should ZDiff", func() { + err := client.ZAdd(ctx, "zset1", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 3, Member: "three"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + + v, err := client.ZDiff(ctx, "zset1", "zset2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal([]string{"two", "three"})) + }) + + It("should ZDiffWithScores", func() { + err := client.ZAdd(ctx, "zset1", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 3, Member: "three"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + + v, err := client.ZDiffWithScores(ctx, "zset1", "zset2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal([]redis.Z{ + { + Member: "two", + Score: 2, + }, + { + Member: "three", + Score: 3, + }, + })) + }) }) Describe("streams", func() { From 8e8510431d287721017e359fc3ea57c445a188b1 Mon Sep 17 00:00:00 2001 From: monkey92t Date: Wed, 26 May 2021 11:25:18 +0800 Subject: [PATCH 08/13] Improve pubsub (#1764) * Improve pubsub Signed-off-by: monkey92t * Extract code to channel struct and tweak API * Move chanSendTimeout to channel * Cleanup health check * Add WithChannelSendTimeout and tweak comments * clear notes Signed-off-by: monkey92t Co-authored-by: Vladimir Mihailenco --- pubsub.go | 261 ++++++++++++++++++++++++++++--------------------- pubsub_test.go | 19 ++++ 2 files changed, 166 insertions(+), 114 deletions(-) diff --git a/pubsub.go b/pubsub.go index c56270b4..68cea710 100644 --- a/pubsub.go +++ b/pubsub.go @@ -2,7 +2,6 @@ package redis import ( "context" - "errors" "fmt" "strings" "sync" @@ -13,13 +12,6 @@ import ( "github.com/go-redis/redis/v8/internal/proto" ) -const ( - pingTimeout = time.Second - chanSendTimeout = time.Minute -) - -var errPingTimeout = errors.New("redis: ping timeout") - // PubSub implements Pub/Sub commands as described in // http://redis.io/topics/pubsub. Message receiving is NOT safe // for concurrent use by multiple goroutines. @@ -43,9 +35,12 @@ type PubSub struct { cmd *Cmd chOnce sync.Once - msgCh chan *Message - allCh chan interface{} - ping chan struct{} + msgCh *channel + allCh *channel +} + +func (c *PubSub) init() { + c.exit = make(chan struct{}) } func (c *PubSub) String() string { @@ -54,10 +49,6 @@ func (c *PubSub) String() string { return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", ")) } -func (c *PubSub) init() { - c.exit = make(chan struct{}) -} - func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) { c.mu.Lock() cn, err := c.conn(ctx, nil) @@ -418,56 +409,6 @@ func (c *PubSub) ReceiveMessage(ctx context.Context) (*Message, error) { } } -// Channel returns a Go channel for concurrently receiving messages. -// The channel is closed together with the PubSub. If the Go channel -// is blocked full for 30 seconds the message is dropped. -// Receive* APIs can not be used after channel is created. -// -// go-redis periodically sends ping messages to test connection health -// and re-subscribes if ping can not not received for 30 seconds. -func (c *PubSub) Channel() <-chan *Message { - return c.ChannelSize(100) -} - -// ChannelSize is like Channel, but creates a Go channel -// with specified buffer size. -func (c *PubSub) ChannelSize(size int) <-chan *Message { - c.chOnce.Do(func() { - c.initPing() - c.initMsgChan(size) - }) - if c.msgCh == nil { - err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions") - panic(err) - } - if cap(c.msgCh) != size { - err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created") - panic(err) - } - return c.msgCh -} - -// ChannelWithSubscriptions is like Channel, but message type can be either -// *Subscription or *Message. Subscription messages can be used to detect -// reconnections. -// -// ChannelWithSubscriptions can not be used together with Channel or ChannelSize. -func (c *PubSub) ChannelWithSubscriptions(ctx context.Context, size int) <-chan interface{} { - c.chOnce.Do(func() { - c.initPing() - c.initAllChan(size) - }) - if c.allCh == nil { - err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel") - panic(err) - } - if cap(c.allCh) != size { - err := fmt.Errorf("redis: PubSub.Channel size can not be changed once created") - panic(err) - } - return c.allCh -} - func (c *PubSub) getContext() context.Context { if c.cmd != nil { return c.cmd.ctx @@ -475,36 +416,135 @@ func (c *PubSub) getContext() context.Context { return context.Background() } -func (c *PubSub) initPing() { +//------------------------------------------------------------------------------ + +// Channel returns a Go channel for concurrently receiving messages. +// The channel is closed together with the PubSub. If the Go channel +// is blocked full for 30 seconds the message is dropped. +// Receive* APIs can not be used after channel is created. +// +// go-redis periodically sends ping messages to test connection health +// and re-subscribes if ping can not not received for 30 seconds. +func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message { + c.chOnce.Do(func() { + c.msgCh = newChannel(c, opts...) + c.msgCh.initMsgChan() + }) + if c.msgCh == nil { + err := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions") + panic(err) + } + return c.msgCh.msgCh +} + +// ChannelSize is like Channel, but creates a Go channel +// with specified buffer size. +// +// Deprecated: use Channel(WithChannelSize(size)), remove in v9. +func (c *PubSub) ChannelSize(size int) <-chan *Message { + return c.Channel(WithChannelSize(size)) +} + +// ChannelWithSubscriptions is like Channel, but message type can be either +// *Subscription or *Message. Subscription messages can be used to detect +// reconnections. +// +// ChannelWithSubscriptions can not be used together with Channel or ChannelSize. +func (c *PubSub) ChannelWithSubscriptions(_ context.Context, size int) <-chan interface{} { + c.chOnce.Do(func() { + c.allCh = newChannel(c, WithChannelSize(size)) + c.allCh.initAllChan() + }) + if c.allCh == nil { + err := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel") + panic(err) + } + return c.allCh.allCh +} + +type ChannelOption func(c *channel) + +// WithChannelSize specifies the Go chan size that is used to buffer incoming messages. +// +// The default is 100 messages. +func WithChannelSize(size int) ChannelOption { + return func(c *channel) { + c.chanSize = size + } +} + +// WithChannelHealthCheckInterval specifies the health check interval. +// PubSub will ping Redis Server if it does not receive any messages within the interval. +// To disable health check, use zero interval. +// +// The default is 3 seconds. +func WithChannelHealthCheckInterval(d time.Duration) ChannelOption { + return func(c *channel) { + c.checkInterval = d + } +} + +// WithChannelSendTimeout specifies that channel send timeout after which +// the message is dropped. +// +// The default is 60 seconds. +func WithChannelSendTimeout(d time.Duration) ChannelOption { + return func(c *channel) { + c.chanSendTimeout = d + } +} + +type channel struct { + pubSub *PubSub + + msgCh chan *Message + allCh chan interface{} + ping chan struct{} + + chanSize int + chanSendTimeout time.Duration + checkInterval time.Duration +} + +func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { + c := &channel{ + pubSub: pubSub, + + chanSize: 100, + chanSendTimeout: time.Minute, + checkInterval: 3 * time.Second, + } + for _, opt := range opts { + opt(c) + } + if c.checkInterval > 0 { + c.initHealthCheck() + } + return c +} + +func (c *channel) initHealthCheck() { ctx := context.TODO() c.ping = make(chan struct{}, 1) + go func() { timer := time.NewTimer(time.Minute) timer.Stop() - healthy := true for { - timer.Reset(pingTimeout) + timer.Reset(c.checkInterval) select { case <-c.ping: - healthy = true if !timer.Stop() { <-timer.C } case <-timer.C: - pingErr := c.Ping(ctx) - if healthy { - healthy = false - } else { - if pingErr == nil { - pingErr = errPingTimeout - } - c.mu.Lock() - c.reconnect(ctx, pingErr) - healthy = true - c.mu.Unlock() + if pingErr := c.pubSub.Ping(ctx); pingErr != nil { + c.pubSub.mu.Lock() + c.pubSub.reconnect(ctx, pingErr) + c.pubSub.mu.Unlock() } - case <-c.exit: + case <-c.pubSub.exit: return } } @@ -512,16 +552,17 @@ func (c *PubSub) initPing() { } // initMsgChan must be in sync with initAllChan. -func (c *PubSub) initMsgChan(size int) { +func (c *channel) initMsgChan() { ctx := context.TODO() - c.msgCh = make(chan *Message, size) + c.msgCh = make(chan *Message, c.chanSize) + go func() { timer := time.NewTimer(time.Minute) timer.Stop() var errCount int for { - msg, err := c.Receive(ctx) + msg, err := c.pubSub.Receive(ctx) if err != nil { if err == pool.ErrClosed { close(c.msgCh) @@ -548,7 +589,7 @@ func (c *PubSub) initMsgChan(size int) { case *Pong: // Ignore. case *Message: - timer.Reset(chanSendTimeout) + timer.Reset(c.chanSendTimeout) select { case c.msgCh <- msg: if !timer.Stop() { @@ -556,30 +597,28 @@ func (c *PubSub) initMsgChan(size int) { } case <-timer.C: internal.Logger.Printf( - c.getContext(), - "redis: %s channel is full for %s (message is dropped)", - c, - chanSendTimeout, - ) + ctx, "redis: %s channel is full for %s (message is dropped)", + c, c.chanSendTimeout) } default: - internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg) + internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) } } }() } // initAllChan must be in sync with initMsgChan. -func (c *PubSub) initAllChan(size int) { +func (c *channel) initAllChan() { ctx := context.TODO() - c.allCh = make(chan interface{}, size) + c.allCh = make(chan interface{}, c.chanSize) + go func() { - timer := time.NewTimer(pingTimeout) + timer := time.NewTimer(time.Minute) timer.Stop() var errCount int for { - msg, err := c.Receive(ctx) + msg, err := c.pubSub.Receive(ctx) if err != nil { if err == pool.ErrClosed { close(c.allCh) @@ -601,29 +640,23 @@ func (c *PubSub) initAllChan(size int) { } switch msg := msg.(type) { - case *Subscription: - c.sendMessage(msg, timer) case *Pong: // Ignore. - case *Message: - c.sendMessage(msg, timer) + case *Subscription, *Message: + timer.Reset(c.chanSendTimeout) + select { + case c.allCh <- msg: + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + internal.Logger.Printf( + ctx, "redis: %s channel is full for %s (message is dropped)", + c, c.chanSendTimeout) + } default: - internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg) + internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) } } }() } - -func (c *PubSub) sendMessage(msg interface{}, timer *time.Timer) { - timer.Reset(pingTimeout) - select { - case c.allCh <- msg: - if !timer.Stop() { - <-timer.C - } - case <-timer.C: - internal.Logger.Printf( - c.getContext(), - "redis: %s channel is full for %s (message is dropped)", c, pingTimeout) - } -} diff --git a/pubsub_test.go b/pubsub_test.go index d32d5e0b..3e0bf9b0 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -473,4 +473,23 @@ var _ = Describe("PubSub", func() { Fail("timeout") } }) + + It("should ChannelMessage", func() { + pubsub := client.Subscribe(ctx, "mychannel") + defer pubsub.Close() + + ch := pubsub.Channel( + redis.WithChannelSize(10), + redis.WithChannelHealthCheckInterval(time.Second), + ) + + text := "test channel message" + err := client.Publish(ctx, "mychannel", text).Err() + Expect(err).NotTo(HaveOccurred()) + + var msg *redis.Message + Eventually(ch).Should(Receive(&msg)) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal(text)) + }) }) From 37f5a2b1cdf3ee2b2752bdd46c6e1e07e3c8a57a Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 26 May 2021 15:12:25 +0300 Subject: [PATCH 09/13] Update changelog --- CHANGELOG.md | 5 +++++ pubsub.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 484d40f5..6b540a4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ > :heart: > [**Uptrace.dev** - All-in-one tool to optimize performance and monitor errors & logs](https://uptrace.dev) +## v8.9 + +- Changed `PubSub.Channel` to only rely on `Ping` result. You can now use `WithChannelSize`, + `WithChannelHealthCheckInterval`, and `WithChannelSendTimeout` to override default settings. + ## v8.8 - To make updating easier, extra modules now have the same version as go-redis does. That means that diff --git a/pubsub.go b/pubsub.go index 68cea710..c6ffb256 100644 --- a/pubsub.go +++ b/pubsub.go @@ -484,7 +484,7 @@ func WithChannelHealthCheckInterval(d time.Duration) ChannelOption { } } -// WithChannelSendTimeout specifies that channel send timeout after which +// WithChannelSendTimeout specifies the channel send timeout after which // the message is dropped. // // The default is 60 seconds. From ad4d0a506a469743b5d619f3290310325e965a11 Mon Sep 17 00:00:00 2001 From: monkey92t Date: Thu, 27 May 2021 11:24:24 +0800 Subject: [PATCH 10/13] fix #1766 --- commands.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands.go b/commands.go index 9c643904..3dc06bb6 100644 --- a/commands.go +++ b/commands.go @@ -1773,7 +1773,7 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic args := make([]interface{}, 0, 8+len(a.Streams)) args = append(args, "xreadgroup", "group", a.Group, a.Consumer) - keyPos := int8(1) + keyPos := int8(4) if a.Count > 0 { args = append(args, "count", a.Count) keyPos += 2 From f521e51383edae8e5dfc98fce17d599b33e304dc Mon Sep 17 00:00:00 2001 From: monkey92t Date: Fri, 28 May 2021 23:15:26 +0800 Subject: [PATCH 11/13] the "keys have no pre-determined position" command specifies the position of the key (#1769) --- commands.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/commands.go b/commands.go index 3dc06bb6..516c1d9c 100644 --- a/commands.go +++ b/commands.go @@ -2380,6 +2380,7 @@ func (c cmdable) ZDiff(ctx context.Context, keys ...string) *StringSliceCmd { } cmd := NewStringSliceCmd(ctx, args...) + cmd.setFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2395,6 +2396,7 @@ func (c cmdable) ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd args[len(keys)+2] = "withscores" cmd := NewZSliceCmd(ctx, args...) + cmd.setFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2658,6 +2660,7 @@ func (c cmdable) MemoryUsage(ctx context.Context, key string, samples ...int) *I args = append(args, "SAMPLES", samples[0]) } cmd := NewIntCmd(ctx, args...) + cmd.setFirstKeyPos(2) _ = c(ctx, cmd) return cmd } @@ -2674,6 +2677,7 @@ func (c cmdable) Eval(ctx context.Context, script string, keys []string, args .. } cmdArgs = appendArgs(cmdArgs, args) cmd := NewCmd(ctx, cmdArgs...) + cmd.setFirstKeyPos(3) _ = c(ctx, cmd) return cmd } @@ -2688,6 +2692,7 @@ func (c cmdable) EvalSha(ctx context.Context, sha1 string, keys []string, args . } cmdArgs = appendArgs(cmdArgs, args) cmd := NewCmd(ctx, cmdArgs...) + cmd.setFirstKeyPos(3) _ = c(ctx, cmd) return cmd } From 74246e0ccf0519821f23196016e2d4872585c814 Mon Sep 17 00:00:00 2001 From: MrChenCode Date: Sat, 29 May 2021 21:11:47 +0800 Subject: [PATCH 12/13] add ZInter command (#1768) --- commands.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++-- commands_test.go | 49 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/commands.go b/commands.go index 516c1d9c..37a0429c 100644 --- a/commands.go +++ b/commands.go @@ -254,6 +254,8 @@ type Cmdable interface { ZCount(ctx context.Context, key, min, max string) *IntCmd ZLexCount(ctx context.Context, key, min, max string) *IntCmd ZIncrBy(ctx context.Context, key string, increment float64, member string) *FloatCmd + ZInter(ctx context.Context, store *ZStore) *StringSliceCmd + ZInterWithScores(ctx context.Context, store *ZStore) *ZSliceCmd ZInterStore(ctx context.Context, destination string, store *ZStore) *IntCmd ZMScore(ctx context.Context, key string, members ...string) *FloatSliceCmd ZPopMax(ctx context.Context, key string, count ...int64) *ZSliceCmd @@ -1943,6 +1945,17 @@ type ZStore struct { Aggregate string } +func (z *ZStore) len() (n int) { + n = len(z.Keys) + if len(z.Weights) > 0 { + n += 1 + len(z.Weights) + } + if z.Aggregate != "" { + n += 2 + } + return n +} + // Redis `BZPOPMAX key [key ...] timeout` command. func (c cmdable) BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd { args := make([]interface{}, 1+len(keys)+1) @@ -2088,7 +2101,7 @@ func (c cmdable) ZIncrBy(ctx context.Context, key string, increment float64, mem } func (c cmdable) ZInterStore(ctx context.Context, destination string, store *ZStore) *IntCmd { - args := make([]interface{}, 0, 3+len(store.Keys)) + args := make([]interface{}, 0, 3+store.len()) args = append(args, "zinterstore", destination, len(store.Keys)) for _, key := range store.Keys { args = append(args, key) @@ -2108,6 +2121,50 @@ func (c cmdable) ZInterStore(ctx context.Context, destination string, store *ZSt return cmd } +func (c cmdable) ZInter(ctx context.Context, store *ZStore) *StringSliceCmd { + args := make([]interface{}, 0, 2+store.len()) + args = append(args, "zinter", len(store.Keys)) + for _, key := range store.Keys { + args = append(args, key) + } + if len(store.Weights) > 0 { + args = append(args, "weights") + for _, weights := range store.Weights { + args = append(args, weights) + } + } + + if store.Aggregate != "" { + args = append(args, "aggregate", store.Aggregate) + } + cmd := NewStringSliceCmd(ctx, args...) + cmd.setFirstKeyPos(2) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) ZInterWithScores(ctx context.Context, store *ZStore) *ZSliceCmd { + args := make([]interface{}, 0, 3+store.len()) + args = append(args, "zinter", len(store.Keys)) + for _, key := range store.Keys { + args = append(args, key) + } + if len(store.Weights) > 0 { + args = append(args, "weights") + for _, weights := range store.Weights { + args = append(args, weights) + } + } + if store.Aggregate != "" { + args = append(args, "aggregate", store.Aggregate) + } + args = append(args, "withscores") + cmd := NewZSliceCmd(ctx, args...) + cmd.setFirstKeyPos(2) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ZMScore(ctx context.Context, key string, members ...string) *FloatSliceCmd { args := make([]interface{}, 2+len(members)) args[0] = "zmscore" @@ -2334,7 +2391,7 @@ func (c cmdable) ZScore(ctx context.Context, key, member string) *FloatCmd { } func (c cmdable) ZUnionStore(ctx context.Context, dest string, store *ZStore) *IntCmd { - args := make([]interface{}, 0, 3+len(store.Keys)) + args := make([]interface{}, 0, 3+store.len()) args = append(args, "zunionstore", dest, len(store.Keys)) for _, key := range store.Keys { args = append(args, key) diff --git a/commands_test.go b/commands_test.go index 1ce88725..d2b5b5b5 100644 --- a/commands_test.go +++ b/commands_test.go @@ -3999,6 +3999,55 @@ var _ = Describe("Commands", func() { }, })) }) + + It("should ZInter", func() { + err := client.ZAdd(ctx, "zset1", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 3, Member: "three"}).Err() + Expect(err).NotTo(HaveOccurred()) + + v, err := client.ZInter(ctx, &redis.ZStore{ + Keys: []string{"zset1", "zset2"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal([]string{"one", "two"})) + }) + + It("should ZInterWithScores", func() { + err := client.ZAdd(ctx, "zset1", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 3, Member: "three"}).Err() + Expect(err).NotTo(HaveOccurred()) + + v, err := client.ZInterWithScores(ctx, &redis.ZStore{ + Keys: []string{"zset1", "zset2"}, + Weights: []float64{2, 3}, + Aggregate: "Max", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal([]redis.Z{ + { + Member: "one", + Score: 3, + }, + { + Member: "two", + Score: 6, + }, + })) + }) }) Describe("streams", func() { From 036605d7c6def5b0a3500e588c1660e908dffc46 Mon Sep 17 00:00:00 2001 From: MrChenCode <18811388234@163.com> Date: Tue, 1 Jun 2021 10:02:09 +0800 Subject: [PATCH 13/13] add ZDiffStore command (#1775) --- commands.go | 13 +++++++++++++ commands_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/commands.go b/commands.go index 37a0429c..4ac42ecd 100644 --- a/commands.go +++ b/commands.go @@ -281,6 +281,7 @@ type Cmdable interface { ZRandMember(ctx context.Context, key string, count int, withScores bool) *StringSliceCmd ZDiff(ctx context.Context, keys ...string) *StringSliceCmd ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd + ZDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd PFAdd(ctx context.Context, key string, els ...interface{}) *IntCmd PFCount(ctx context.Context, keys ...string) *IntCmd @@ -2458,6 +2459,18 @@ func (c cmdable) ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd return cmd } +// redis-server version >=6.2.0. +func (c cmdable) ZDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd { + args := make([]interface{}, 0, 3+len(keys)) + args = append(args, "zdiffstore", destination, len(keys)) + for _, key := range keys { + args = append(args, key) + } + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + //------------------------------------------------------------------------------ func (c cmdable) PFAdd(ctx context.Context, key string, els ...interface{}) *IntCmd { diff --git a/commands_test.go b/commands_test.go index d2b5b5b5..427fec79 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4048,6 +4048,31 @@ var _ = Describe("Commands", func() { }, })) }) + + It("should ZDiffStore", func() { + err := client.ZAdd(ctx, "zset1", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset1", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 1, Member: "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 2, Member: "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "zset2", &redis.Z{Score: 3, Member: "three"}).Err() + Expect(err).NotTo(HaveOccurred()) + v, err := client.ZDiffStore(ctx, "out1", "zset1", "zset2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(0))) + v, err = client.ZDiffStore(ctx, "out1", "zset2", "zset1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1))) + vals, err := client.ZRangeWithScores(ctx, "out1", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 3, + Member: "three", + }})) + }) }) Describe("streams", func() {