xgroup/xadd/xtrim supports new options (#1787)

* support cmd option

XGROUP CREATECONSUMER
XTRIM MINID LIMIT
XADD NOMKSTREAM MINID LIMIT

Signed-off-by: monkey <golang@88.com>

* add XAddArgs.Approx doc

Signed-off-by: monkey92t <golang@88.com>
This commit is contained in:
monkey92t 2021-06-15 16:48:35 +08:00 committed by GitHub
parent 14d82a2d93
commit 43ec1464d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 157 additions and 16 deletions

View File

@ -226,6 +226,7 @@ type Cmdable interface {
XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd
XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd
XGroupDestroy(ctx context.Context, stream, group string) *IntCmd XGroupDestroy(ctx context.Context, stream, group string) *IntCmd
XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
@ -233,8 +234,14 @@ type Cmdable interface {
XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
// TODO: XTrim and XTrimApprox remove in v9.
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
@ -1621,22 +1628,50 @@ func (c cmdable) SUnionStore(ctx context.Context, destination string, keys ...st
// - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"} // - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"}
// //
// Note that map will not preserve the order of key-value pairs. // Note that map will not preserve the order of key-value pairs.
// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used.
type XAddArgs struct { type XAddArgs struct {
Stream string Stream string
NoMkStream bool
MaxLen int64 // MAXLEN N MaxLen int64 // MAXLEN N
// Deprecated: use MaxLen+Approx, remove in v9.
MaxLenApprox int64 // MAXLEN ~ N MaxLenApprox int64 // MAXLEN ~ N
MinID string
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
Approx bool
Limit int64
ID string ID string
Values interface{} Values interface{}
} }
// XAdd a.Limit has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd { func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
args := make([]interface{}, 0, 8) args := make([]interface{}, 0, 11)
args = append(args, "xadd") args = append(args, "xadd", a.Stream)
args = append(args, a.Stream) if a.NoMkStream {
if a.MaxLen > 0 { args = append(args, "nomkstream")
}
switch {
case a.MaxLen > 0:
if a.Approx {
args = append(args, "maxlen", "~", a.MaxLen)
} else {
args = append(args, "maxlen", a.MaxLen) args = append(args, "maxlen", a.MaxLen)
} else if a.MaxLenApprox > 0 { }
case a.MaxLenApprox > 0:
// TODO remove in v9.
args = append(args, "maxlen", "~", a.MaxLenApprox) args = append(args, "maxlen", "~", a.MaxLenApprox)
case a.MinID != "":
if a.Approx {
args = append(args, "minid", "~", a.MinID)
} else {
args = append(args, "minid", a.MinID)
}
}
if a.Limit > 0 {
args = append(args, "limit", a.Limit)
} }
if a.ID != "" { if a.ID != "" {
args = append(args, a.ID) args = append(args, a.ID)
@ -1757,6 +1792,12 @@ func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCm
return cmd return cmd
} }
func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd { func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer) cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
_ = c(ctx, cmd) _ = c(ctx, cmd)
@ -1914,16 +1955,63 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
return args return args
} }
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd { // xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", maxLen) // example:
// XTRIM key MAXLEN/MINID threshold LIMIT limit.
// XTRIM key MAXLEN/MINID ~ threshold LIMIT limit.
// The redis-server version is lower than 6.2, please set limit to 0.
func (c cmdable) xTrim(
ctx context.Context, key, strategy string,
approx bool, threshold interface{}, limit int64,
) *IntCmd {
args := make([]interface{}, 0, 7)
args = append(args, "xtrim", key, strategy)
if approx {
args = append(args, "~")
}
args = append(args, threshold)
if limit > 0 {
args = append(args, "limit", limit)
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
// Deprecated: use XTrimMaxLen, remove in v9.
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
}
// Deprecated: use XTrimMaxLenApprox, remove in v9.
func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd { func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd {
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", "~", maxLen) return c.xTrim(ctx, key, "maxlen", true, maxLen, 0)
_ = c(ctx, cmd) }
return cmd
// XTrimMaxLen No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MAXLEN maxLen
func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
}
// XTrimMaxLenApprox LIMIT has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
// cmd: XTRIM key MAXLEN ~ maxLen LIMIT limit
func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", true, maxLen, limit)
}
// XTrimMinID No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MINID minID
func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd {
return c.xTrim(ctx, key, "minid", false, minID, 0)
}
// XTrimMinIDApprox LIMIT has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
// cmd: XTRIM key MINID ~ minID LIMIT limit
func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd {
return c.xTrim(ctx, key, "minid", true, minID, limit)
} }
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd { func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {

View File

@ -4104,18 +4104,47 @@ var _ = Describe("Commands", func() {
Expect(id).To(Equal("3-0")) Expect(id).To(Equal("3-0"))
}) })
// TODO remove in v9.
It("should XTrim", func() { It("should XTrim", func() {
n, err := client.XTrim(ctx, "stream", 0).Result() n, err := client.XTrim(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3))) Expect(n).To(Equal(int64(3)))
}) })
// TODO remove in v9.
It("should XTrimApprox", func() { It("should XTrimApprox", func() {
n, err := client.XTrimApprox(ctx, "stream", 0).Result() n, err := client.XTrimApprox(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3))) Expect(n).To(Equal(int64(3)))
}) })
// TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter.
// TODO Don't test it for now.
// TODO link: https://github.com/redis/redis/issues/9046
It("should XTrimMaxLen", func() {
n, err := client.XTrimMaxLen(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})
It("should XTrimMaxLenApprox", func() {
n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})
It("should XTrimMinID", func() {
n, err := client.XTrimMinID(ctx, "stream", "4-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})
It("should XTrimMinIDApprox", func() {
n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})
It("should XAdd", func() { It("should XAdd", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{ id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream", Stream: "stream",
@ -4133,6 +4162,9 @@ var _ = Describe("Commands", func() {
})) }))
}) })
// TODO XAdd There is a bug in the limit parameter.
// TODO Don't test it for now.
// TODO link: https://github.com/redis/redis/issues/9046
It("should XAdd with MaxLen", func() { It("should XAdd with MaxLen", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{ id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream", Stream: "stream",
@ -4148,6 +4180,21 @@ var _ = Describe("Commands", func() {
})) }))
}) })
It("should XAdd with MinID", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
MinID: "5-0",
ID: "4-0",
Values: map[string]interface{}{"quatro": "quatre"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(id).To(Equal("4-0"))
vals, err := client.XRange(ctx, "stream", "-", "+").Result()
Expect(err).NotTo(HaveOccurred())
Expect(vals).To(HaveLen(0))
})
It("should XDel", func() { It("should XDel", func() {
n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result() n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -4380,8 +4427,14 @@ var _ = Describe("Commands", func() {
infoExt, err = client.XPendingExt(ctx, args).Result() infoExt, err = client.XPendingExt(ctx, args).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(infoExt).To(HaveLen(0)) Expect(infoExt).To(HaveLen(0))
})
n, err := client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result() It("should XGroup Create Delete Consumer", func() {
n, err := client.XGroupCreateConsumer(ctx, "stream", "group", "c1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
n, err = client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3))) Expect(n).To(Equal(int64(3)))
}) })