diff --git a/commands.go b/commands.go index b3b15c75..32f954ef 100644 --- a/commands.go +++ b/commands.go @@ -226,6 +226,7 @@ type Cmdable interface { XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd XGroupDestroy(ctx context.Context, stream, group string) *IntCmd + XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd @@ -233,8 +234,14 @@ type Cmdable interface { XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd + + // TODO: XTrim and XTrimApprox remove in v9. XTrim(ctx context.Context, key string, maxLen int64) *IntCmd XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd + XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd + XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd + XTrimMinID(ctx context.Context, key string, minID string) *IntCmd + XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd XInfoStream(ctx context.Context, key string) *XInfoStreamCmd XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd @@ -1621,22 +1628,50 @@ func (c cmdable) SUnionStore(ctx context.Context, destination string, keys ...st // - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"} // // Note that map will not preserve the order of key-value pairs. +// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used. type XAddArgs struct { - Stream string - MaxLen int64 // MAXLEN N + Stream string + NoMkStream bool + MaxLen int64 // MAXLEN N + + // Deprecated: use MaxLen+Approx, remove in v9. MaxLenApprox int64 // MAXLEN ~ N - ID string - Values interface{} + + MinID string + // Approx causes MaxLen and MinID to use "~" matcher (instead of "="). + Approx bool + Limit int64 + ID string + Values interface{} } +// XAdd a.Limit has a bug, please confirm it and use it. +// issue: https://github.com/redis/redis/issues/9046 func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd { - args := make([]interface{}, 0, 8) - args = append(args, "xadd") - args = append(args, a.Stream) - if a.MaxLen > 0 { - args = append(args, "maxlen", a.MaxLen) - } else if a.MaxLenApprox > 0 { + args := make([]interface{}, 0, 11) + args = append(args, "xadd", a.Stream) + if a.NoMkStream { + args = append(args, "nomkstream") + } + switch { + case a.MaxLen > 0: + if a.Approx { + args = append(args, "maxlen", "~", a.MaxLen) + } else { + args = append(args, "maxlen", a.MaxLen) + } + case a.MaxLenApprox > 0: + // TODO remove in v9. args = append(args, "maxlen", "~", a.MaxLenApprox) + case a.MinID != "": + if a.Approx { + args = append(args, "minid", "~", a.MinID) + } else { + args = append(args, "minid", a.MinID) + } + } + if a.Limit > 0 { + args = append(args, "limit", a.Limit) } if a.ID != "" { args = append(args, a.ID) @@ -1757,6 +1792,12 @@ func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCm return cmd } +func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { + cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer) _ = c(ctx, cmd) @@ -1914,16 +1955,63 @@ func xClaimArgs(a *XClaimArgs) []interface{} { return args } -func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd { - cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", maxLen) +// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default). +// example: +// XTRIM key MAXLEN/MINID threshold LIMIT limit. +// XTRIM key MAXLEN/MINID ~ threshold LIMIT limit. +// The redis-server version is lower than 6.2, please set limit to 0. +func (c cmdable) xTrim( + ctx context.Context, key, strategy string, + approx bool, threshold interface{}, limit int64, +) *IntCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xtrim", key, strategy) + if approx { + args = append(args, "~") + } + args = append(args, threshold) + if limit > 0 { + args = append(args, "limit", limit) + } + cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) return cmd } +// Deprecated: use XTrimMaxLen, remove in v9. +func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd { + return c.xTrim(ctx, key, "maxlen", false, maxLen, 0) +} + +// Deprecated: use XTrimMaxLenApprox, remove in v9. func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd { - cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", "~", maxLen) - _ = c(ctx, cmd) - return cmd + return c.xTrim(ctx, key, "maxlen", true, maxLen, 0) +} + +// XTrimMaxLen No `~` rules are used, `limit` cannot be used. +// cmd: XTRIM key MAXLEN maxLen +func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd { + return c.xTrim(ctx, key, "maxlen", false, maxLen, 0) +} + +// XTrimMaxLenApprox LIMIT has a bug, please confirm it and use it. +// issue: https://github.com/redis/redis/issues/9046 +// cmd: XTRIM key MAXLEN ~ maxLen LIMIT limit +func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd { + return c.xTrim(ctx, key, "maxlen", true, maxLen, limit) +} + +// XTrimMinID No `~` rules are used, `limit` cannot be used. +// cmd: XTRIM key MINID minID +func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd { + return c.xTrim(ctx, key, "minid", false, minID, 0) +} + +// XTrimMinIDApprox LIMIT has a bug, please confirm it and use it. +// issue: https://github.com/redis/redis/issues/9046 +// cmd: XTRIM key MINID ~ minID LIMIT limit +func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd { + return c.xTrim(ctx, key, "minid", true, minID, limit) } func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd { diff --git a/commands_test.go b/commands_test.go index 11b8592d..2bfa3dfa 100644 --- a/commands_test.go +++ b/commands_test.go @@ -4104,18 +4104,47 @@ var _ = Describe("Commands", func() { Expect(id).To(Equal("3-0")) }) + // TODO remove in v9. It("should XTrim", func() { n, err := client.XTrim(ctx, "stream", 0).Result() Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(3))) }) + // TODO remove in v9. It("should XTrimApprox", func() { n, err := client.XTrimApprox(ctx, "stream", 0).Result() Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(3))) }) + // TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter. + // TODO Don't test it for now. + // TODO link: https://github.com/redis/redis/issues/9046 + It("should XTrimMaxLen", func() { + n, err := client.XTrimMaxLen(ctx, "stream", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimMaxLenApprox", func() { + n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimMinID", func() { + n, err := client.XTrimMinID(ctx, "stream", "4-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + + It("should XTrimMinIDApprox", func() { + n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(3))) + }) + It("should XAdd", func() { id, err := client.XAdd(ctx, &redis.XAddArgs{ Stream: "stream", @@ -4133,6 +4162,9 @@ var _ = Describe("Commands", func() { })) }) + // TODO XAdd There is a bug in the limit parameter. + // TODO Don't test it for now. + // TODO link: https://github.com/redis/redis/issues/9046 It("should XAdd with MaxLen", func() { id, err := client.XAdd(ctx, &redis.XAddArgs{ Stream: "stream", @@ -4148,6 +4180,21 @@ var _ = Describe("Commands", func() { })) }) + It("should XAdd with MinID", func() { + id, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream", + MinID: "5-0", + ID: "4-0", + Values: map[string]interface{}{"quatro": "quatre"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("4-0")) + + vals, err := client.XRange(ctx, "stream", "-", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(HaveLen(0)) + }) + It("should XDel", func() { n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result() Expect(err).NotTo(HaveOccurred()) @@ -4380,8 +4427,14 @@ var _ = Describe("Commands", func() { infoExt, err = client.XPendingExt(ctx, args).Result() Expect(err).NotTo(HaveOccurred()) Expect(infoExt).To(HaveLen(0)) + }) - n, err := client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result() + It("should XGroup Create Delete Consumer", func() { + n, err := client.XGroupCreateConsumer(ctx, "stream", "group", "c1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) + + n, err = client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result() Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(3))) })