sync master (#1800)

* Remove OpenTelemetry from the code (but leave redisotel as is) (#1782)

* Add XAutoClaim command (#1780)

* fix typo (#1788)

* xgroup/xadd/xtrim supports new options (#1787)

* support cmd option

XGROUP CREATECONSUMER
XTRIM MINID LIMIT
XADD NOMKSTREAM MINID LIMIT

Signed-off-by: monkey <golang@88.com>

* add XAddArgs.Approx doc

Signed-off-by: monkey92t <golang@88.com>

* Add Bun to readme

* Upgrade the <sorted set> series of commands (#1792)

* Upgrade the <sorted set> series of commands

Signed-off-by: monkey92t <golang@88.com>

* Cancel the Deprecated mark of ZAddNX and ZAddXX

Signed-off-by: monkey92t <golang@88.com>

* Explain the use restrictions of KeepTTL. (#1799)

Signed-off-by: monkey92t <golang@88.com>

* Adjust KeepTTL annotation.

Signed-off-by: monkey92t <golang@88.com>

* the hello command throws possible errors, It may affect the "read timeout" test result.

Signed-off-by: monkey92t <golang@88.com>

Co-authored-by: Vladimir Mihailenco <vladimir.webdev@gmail.com>
Co-authored-by: ericmillin <31105612+ericmillin@users.noreply.github.com>
Co-authored-by: heyanfu <1145291570@qq.com>
This commit is contained in:
monkey92t 2021-06-28 17:40:38 +08:00 committed by GitHub
parent bd6402ab3e
commit 63df0e5e75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 951 additions and 452 deletions

View File

@ -3,6 +3,19 @@
> :heart:
> [**Uptrace.dev** - All-in-one tool to optimize performance and monitor errors & logs](https://uptrace.dev)
## v8.10
- Removed extra OpenTelemetry spans from go-redis core. Now go-redis instrumentation only adds a
single span with a Redis command (instead of 4 spans). There are multiple reasons behind this
decision:
- Traces become smaller and less noisy.
- It may be costly to process those 3 extra spans for each query.
- go-redis no longer depends on OpenTelemetry.
Eventually we hope to replace the information that we no longer collect with OpenTelemetry
Metrics.
## v8.9
- Changed `PubSub.Channel` to only rely on `Ping` result. You can now use `WithChannelSize`,

View File

@ -17,6 +17,9 @@
- [Examples](https://pkg.go.dev/github.com/go-redis/redis/v8?tab=doc#pkg-examples)
- [RealWorld example app](https://github.com/uptrace/go-treemux-realworld-example-app)
> :heart: Please check [Bun](https://bun.uptrace.dev) - fast and simple SQL client for PostgreSQL,
> MySQL, and SQLite.
## Ecosystem
- [Redis Mock](https://github.com/go-redis/redismock).
@ -160,8 +163,3 @@ Lastly, run:
```
go test
```
## See also
- [Fast and flexible ORM](https://github.com/uptrace/bun)
- [msgpack for Go](https://github.com/vmihailenco/msgpack)

View File

@ -222,7 +222,7 @@ func BenchmarkZAdd(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := client.ZAdd(ctx, "key", &redis.Z{
err := client.ZAdd(ctx, "key", redis.Z{
Score: float64(1),
Member: "hello",
}).Err()

View File

@ -1509,6 +1509,112 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
//------------------------------------------------------------------------------
type XAutoClaimCmd struct {
baseCmd
start string
val []XMessage
}
var _ Cmder = (*XAutoClaimCmd)(nil)
func NewXAutoClaimCmd(ctx context.Context, args ...interface{}) *XAutoClaimCmd {
return &XAutoClaimCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}
func (cmd *XAutoClaimCmd) Val() (messages []XMessage, start string) {
return cmd.val, cmd.start
}
func (cmd *XAutoClaimCmd) Result() (messages []XMessage, start string, err error) {
return cmd.val, cmd.start, cmd.err
}
func (cmd *XAutoClaimCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error {
var err error
if err = rd.ReadFixedArrayLen(2); err != nil {
return err
}
cmd.start, err = rd.ReadString()
if err != nil {
return err
}
cmd.val, err = readXMessageSlice(rd)
if err != nil {
return err
}
return nil
}
//------------------------------------------------------------------------------
type XAutoClaimJustIDCmd struct {
baseCmd
start string
val []string
}
var _ Cmder = (*XAutoClaimJustIDCmd)(nil)
func NewXAutoClaimJustIDCmd(ctx context.Context, args ...interface{}) *XAutoClaimJustIDCmd {
return &XAutoClaimJustIDCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}
func (cmd *XAutoClaimJustIDCmd) Val() (ids []string, start string) {
return cmd.val, cmd.start
}
func (cmd *XAutoClaimJustIDCmd) Result() (ids []string, start string, err error) {
return cmd.val, cmd.start, cmd.err
}
func (cmd *XAutoClaimJustIDCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error {
var err error
if err = rd.ReadFixedArrayLen(2); err != nil {
return err
}
cmd.start, err = rd.ReadString()
if err != nil {
return err
}
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val = make([]string, n)
for i := 0; i < n; i++ {
cmd.val[i], err = rd.ReadString()
if err != nil {
return err
}
}
return nil
}
//------------------------------------------------------------------------------
type XInfoConsumersCmd struct {
baseCmd
val []XInfoConsumer

View File

@ -9,7 +9,8 @@ import (
"github.com/go-redis/redis/v8/internal"
)
// KeepTTL is an option for Set command to keep key's existing TTL.
// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
// otherwise you will receive an error: (error) ERR syntax error.
// For example:
//
// rdb.Set(ctx, key, value, redis.KeepTTL)
@ -132,8 +133,7 @@ type Cmdable interface {
MSetNX(ctx context.Context, values ...interface{}) *BoolCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd
// TODO: rename to SetEx
SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd
SetXX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd
SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd
@ -227,6 +227,7 @@ type Cmdable interface {
XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd
XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd
XGroupDestroy(ctx context.Context, stream, group string) *IntCmd
XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
@ -234,23 +235,23 @@ type Cmdable interface {
XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddNX(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddXX(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddCh(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddNXCh(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddXXCh(ctx context.Context, key string, members ...*Z) *IntCmd
ZIncr(ctx context.Context, key string, member *Z) *FloatCmd
ZIncrNX(ctx context.Context, key string, member *Z) *FloatCmd
ZIncrXX(ctx context.Context, key string, member *Z) *FloatCmd
ZAdd(ctx context.Context, key string, members ...Z) *IntCmd
ZAddNX(ctx context.Context, key string, members ...Z) *IntCmd
ZAddXX(ctx context.Context, key string, members ...Z) *IntCmd
ZAddArgs(ctx context.Context, key string, args ZAddArgs) *IntCmd
ZAddArgsIncr(ctx context.Context, key string, args ZAddArgs) *FloatCmd
ZCard(ctx context.Context, key string) *IntCmd
ZCount(ctx context.Context, key, min, max string) *IntCmd
ZLexCount(ctx context.Context, key, min, max string) *IntCmd
@ -266,6 +267,9 @@ type Cmdable interface {
ZRangeByScore(ctx context.Context, key string, opt *ZRangeBy) *StringSliceCmd
ZRangeByLex(ctx context.Context, key string, opt *ZRangeBy) *StringSliceCmd
ZRangeByScoreWithScores(ctx context.Context, key string, opt *ZRangeBy) *ZSliceCmd
ZRangeArgs(ctx context.Context, z ZRangeArgs) *StringSliceCmd
ZRangeArgsWithScores(ctx context.Context, z ZRangeArgs) *ZSliceCmd
ZRangeStore(ctx context.Context, dst string, z ZRangeArgs) *IntCmd
ZRank(ctx context.Context, key, member string) *IntCmd
ZRem(ctx context.Context, key string, members ...interface{}) *IntCmd
ZRemRangeByRank(ctx context.Context, key string, start, stop int64) *IntCmd
@ -281,6 +285,8 @@ type Cmdable interface {
ZUnionStore(ctx context.Context, dest string, store *ZStore) *IntCmd
ZRandMember(ctx context.Context, key string, count int) *StringSliceCmd
ZRandMemberWithScores(ctx context.Context, key string, count int) *ZSliceCmd
ZUnion(ctx context.Context, store ZStore) *StringSliceCmd
ZUnionWithScores(ctx context.Context, store ZStore) *ZSliceCmd
ZDiff(ctx context.Context, keys ...string) *StringSliceCmd
ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd
ZDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd
@ -733,7 +739,7 @@ func (c cmdable) DecrBy(ctx context.Context, key string, decrement int64) *IntCm
return cmd
}
// Get redis `GET key` command. It returns redis.Nil error when key does not exist.
// Get Redis `GET key` command. It returns redis.Nil error when key does not exist.
func (c cmdable) Get(ctx context.Context, key string) *StringCmd {
cmd := NewStringCmd(ctx, "get", key)
_ = c(ctx, cmd)
@ -835,10 +841,11 @@ func (c cmdable) MSetNX(ctx context.Context, values ...interface{}) *BoolCmd {
}
// Set Redis `SET key value [expiration]` command.
// Use expiration for `SETEX`-like behavior.
// Use expiration for `SETEx`-like behavior.
//
// Zero expiration means the key has no expiration time.
// KeepTTL(-1) expiration is a Redis KEEPTTL option to keep existing TTL.
// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
// otherwise you will receive an error: (error) ERR syntax error.
func (c cmdable) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
args := make([]interface{}, 3, 5)
args[0] = "set"
@ -871,7 +878,8 @@ type SetArgs struct {
// When Get is true, the command returns the old value stored at key, or nil when key did not exist.
Get bool
// KeepTTL is a Redis KEEPTTL option to keep existing TTL.
// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
// otherwise you will receive an error: (error) ERR syntax error.
KeepTTL bool
}
@ -909,8 +917,8 @@ func (c cmdable) SetArgs(ctx context.Context, key string, value interface{}, a S
return cmd
}
// SetEX Redis `SETEX key expiration value` command.
func (c cmdable) SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
// SetEx Redis `SETEx key expiration value` command.
func (c cmdable) SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
cmd := NewStatusCmd(ctx, "setex", key, formatSec(ctx, expiration), value)
_ = c(ctx, cmd)
return cmd
@ -919,7 +927,8 @@ func (c cmdable) SetEX(ctx context.Context, key string, value interface{}, expir
// SetNX Redis `SET key value [expiration] NX` command.
//
// Zero expiration means the key has no expiration time.
// KeepTTL(-1) expiration is a Redis KEEPTTL option to keep existing TTL.
// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
// otherwise you will receive an error: (error) ERR syntax error.
func (c cmdable) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd {
var cmd *BoolCmd
switch expiration {
@ -943,7 +952,8 @@ func (c cmdable) SetNX(ctx context.Context, key string, value interface{}, expir
// SetXX Redis `SET key value [expiration] XX` command.
//
// Zero expiration means the key has no expiration time.
// KeepTTL(-1) expiration is a Redis KEEPTTL option to keep existing TTL.
// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
// otherwise you will receive an error: (error) ERR syntax error.
func (c cmdable) SetXX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd {
var cmd *BoolCmd
switch expiration {
@ -1643,22 +1653,44 @@ func (c cmdable) SUnionStore(ctx context.Context, destination string, keys ...st
// - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"}
//
// Note that map will not preserve the order of key-value pairs.
// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used.
type XAddArgs struct {
Stream string
MaxLen int64 // MAXLEN N
MaxLenApprox int64 // MAXLEN ~ N
ID string
Values interface{}
Stream string
NoMkStream bool
MaxLen int64 // MAXLEN N
MinID string
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
Approx bool
Limit int64
ID string
Values interface{}
}
// XAdd a.Limit has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
args := make([]interface{}, 0, 8)
args = append(args, "xadd")
args = append(args, a.Stream)
if a.MaxLen > 0 {
args = append(args, "maxlen", a.MaxLen)
} else if a.MaxLenApprox > 0 {
args = append(args, "maxlen", "~", a.MaxLenApprox)
args := make([]interface{}, 0, 11)
args = append(args, "xadd", a.Stream)
if a.NoMkStream {
args = append(args, "nomkstream")
}
switch {
case a.MaxLen > 0:
if a.Approx {
args = append(args, "maxlen", "~", a.MaxLen)
} else {
args = append(args, "maxlen", a.MaxLen)
}
case a.MinID != "":
if a.Approx {
args = append(args, "minid", "~", a.MinID)
} else {
args = append(args, "minid", a.MinID)
}
}
if a.Limit > 0 {
args = append(args, "limit", a.Limit)
}
if a.ID != "" {
args = append(args, a.ID)
@ -1779,6 +1811,12 @@ func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCm
return cmd
}
func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
_ = c(ctx, cmd)
@ -1867,6 +1905,39 @@ func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingE
return cmd
}
type XAutoClaimArgs struct {
Stream string
Group string
MinIdle time.Duration
Start string
Count int64
Consumer string
}
func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd {
args := xAutoClaimArgs(ctx, a)
cmd := NewXAutoClaimCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd {
args := xAutoClaimArgs(ctx, a)
args = append(args, "justid")
cmd := NewXAutoClaimJustIDCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} {
args := make([]interface{}, 0, 9)
args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start)
if a.Count > 0 {
args = append(args, "count", a.Count)
}
return args
}
type XClaimArgs struct {
Stream string
Group string
@ -1903,16 +1974,53 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
return args
}
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", maxLen)
// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
// example:
// XTRIM key MAXLEN/MINID threshold LIMIT limit.
// XTRIM key MAXLEN/MINID ~ threshold LIMIT limit.
// The redis-server version is lower than 6.2, please set limit to 0.
func (c cmdable) xTrim(
ctx context.Context, key, strategy string,
approx bool, threshold interface{}, limit int64,
) *IntCmd {
args := make([]interface{}, 0, 7)
args = append(args, "xtrim", key, strategy)
if approx {
args = append(args, "~")
}
args = append(args, threshold)
if limit > 0 {
args = append(args, "limit", limit)
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd {
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", "~", maxLen)
_ = c(ctx, cmd)
return cmd
// XTrimMaxLen No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MAXLEN maxLen
func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
}
// XTrimMaxLenApprox LIMIT has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
// cmd: XTRIM key MAXLEN ~ maxLen LIMIT limit
func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", true, maxLen, limit)
}
// XTrimMinID No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MINID minID
func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd {
return c.xTrim(ctx, key, "minid", false, minID, 0)
}
// XTrimMinIDApprox LIMIT has a bug, please confirm it and use it.
// issue: https://github.com/redis/redis/issues/9046
// cmd: XTRIM key MINID ~ minID LIMIT limit
func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd {
return c.xTrim(ctx, key, "minid", true, minID, limit)
}
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
@ -1960,7 +2068,7 @@ type ZWithKey struct {
Key string
}
// ZStore is used as an arg to ZInterStore and ZUnionStore.
// ZStore is used as an arg to ZInter/ZInterStore and ZUnion/ZUnionStore.
type ZStore struct {
Keys []string
Weights []float64
@ -1968,7 +2076,7 @@ type ZStore struct {
Aggregate string
}
func (z *ZStore) len() (n int) {
func (z ZStore) len() (n int) {
n = len(z.Keys)
if len(z.Weights) > 0 {
n += 1 + len(z.Weights)
@ -1979,6 +2087,22 @@ func (z *ZStore) len() (n int) {
return n
}
func (z ZStore) appendArgs(args []interface{}) []interface{} {
for _, key := range z.Keys {
args = append(args, key)
}
if len(z.Weights) > 0 {
args = append(args, "weights")
for _, weights := range z.Weights {
args = append(args, weights)
}
}
if z.Aggregate != "" {
args = append(args, "aggregate", z.Aggregate)
}
return args
}
// BZPopMax Redis `BZPOPMAX key [key ...] timeout` command.
func (c cmdable) BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd {
args := make([]interface{}, 1+len(keys)+1)
@ -2007,96 +2131,79 @@ func (c cmdable) BZPopMin(ctx context.Context, timeout time.Duration, keys ...st
return cmd
}
func (c cmdable) zAdd(ctx context.Context, a []interface{}, n int, members ...*Z) *IntCmd {
for i, m := range members {
a[n+2*i] = m.Score
a[n+2*i+1] = m.Member
// ZAddArgs WARN: The GT, LT and NX options are mutually exclusive.
type ZAddArgs struct {
NX bool
XX bool
LT bool
GT bool
Ch bool
Members []Z
}
func (c cmdable) zAddArgs(key string, args ZAddArgs, incr bool) []interface{} {
a := make([]interface{}, 0, 6+2*len(args.Members))
a = append(a, "zadd", key)
// The GT, LT and NX options are mutually exclusive.
if args.NX {
a = append(a, "nx")
} else {
if args.XX {
a = append(a, "xx")
}
if args.GT {
a = append(a, "gt")
} else if args.LT {
a = append(a, "lt")
}
}
cmd := NewIntCmd(ctx, a...)
if args.Ch {
a = append(a, "ch")
}
if incr {
a = append(a, "incr")
}
for _, m := range args.Members {
a = append(a, m.Score)
a = append(a, m.Member)
}
return a
}
func (c cmdable) ZAddArgs(ctx context.Context, key string, args ZAddArgs) *IntCmd {
cmd := NewIntCmd(ctx, c.zAddArgs(key, args, false)...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ZAddArgsIncr(ctx context.Context, key string, args ZAddArgs) *FloatCmd {
cmd := NewFloatCmd(ctx, c.zAddArgs(key, args, true)...)
_ = c(ctx, cmd)
return cmd
}
// ZAdd Redis `ZADD key score member [score member ...]` command.
func (c cmdable) ZAdd(ctx context.Context, key string, members ...*Z) *IntCmd {
const n = 2
a := make([]interface{}, n+2*len(members))
a[0], a[1] = "zadd", key
return c.zAdd(ctx, a, n, members...)
func (c cmdable) ZAdd(ctx context.Context, key string, members ...Z) *IntCmd {
return c.ZAddArgs(ctx, key, ZAddArgs{
Members: members,
})
}
// ZAddNX Redis `ZADD key NX score member [score member ...]` command.
func (c cmdable) ZAddNX(ctx context.Context, key string, members ...*Z) *IntCmd {
const n = 3
a := make([]interface{}, n+2*len(members))
a[0], a[1], a[2] = "zadd", key, "nx"
return c.zAdd(ctx, a, n, members...)
func (c cmdable) ZAddNX(ctx context.Context, key string, members ...Z) *IntCmd {
return c.ZAddArgs(ctx, key, ZAddArgs{
NX: true,
Members: members,
})
}
// ZAddXX Redis `ZADD key XX score member [score member ...]` command.
func (c cmdable) ZAddXX(ctx context.Context, key string, members ...*Z) *IntCmd {
const n = 3
a := make([]interface{}, n+2*len(members))
a[0], a[1], a[2] = "zadd", key, "xx"
return c.zAdd(ctx, a, n, members...)
}
// ZAddCh Redis `ZADD key CH score member [score member ...]` command.
func (c cmdable) ZAddCh(ctx context.Context, key string, members ...*Z) *IntCmd {
const n = 3
a := make([]interface{}, n+2*len(members))
a[0], a[1], a[2] = "zadd", key, "ch"
return c.zAdd(ctx, a, n, members...)
}
// ZAddNXCh Redis `ZADD key NX CH score member [score member ...]` command.
func (c cmdable) ZAddNXCh(ctx context.Context, key string, members ...*Z) *IntCmd {
const n = 4
a := make([]interface{}, n+2*len(members))
a[0], a[1], a[2], a[3] = "zadd", key, "nx", "ch"
return c.zAdd(ctx, a, n, members...)
}
// ZAddXXCh Redis `ZADD key XX CH score member [score member ...]` command.
func (c cmdable) ZAddXXCh(ctx context.Context, key string, members ...*Z) *IntCmd {
const n = 4
a := make([]interface{}, n+2*len(members))
a[0], a[1], a[2], a[3] = "zadd", key, "xx", "ch"
return c.zAdd(ctx, a, n, members...)
}
func (c cmdable) zIncr(ctx context.Context, a []interface{}, n int, members ...*Z) *FloatCmd {
for i, m := range members {
a[n+2*i] = m.Score
a[n+2*i+1] = m.Member
}
cmd := NewFloatCmd(ctx, a...)
_ = c(ctx, cmd)
return cmd
}
// ZIncr Redis `ZADD key INCR score member` command.
func (c cmdable) ZIncr(ctx context.Context, key string, member *Z) *FloatCmd {
const n = 3
a := make([]interface{}, n+2)
a[0], a[1], a[2] = "zadd", key, "incr"
return c.zIncr(ctx, a, n, member)
}
// ZIncrNX Redis `ZADD key NX INCR score member` command.
func (c cmdable) ZIncrNX(ctx context.Context, key string, member *Z) *FloatCmd {
const n = 4
a := make([]interface{}, n+2)
a[0], a[1], a[2], a[3] = "zadd", key, "incr", "nx"
return c.zIncr(ctx, a, n, member)
}
// ZIncrXX Redis `ZADD key XX INCR score member` command.
func (c cmdable) ZIncrXX(ctx context.Context, key string, member *Z) *FloatCmd {
const n = 4
a := make([]interface{}, n+2)
a[0], a[1], a[2], a[3] = "zadd", key, "incr", "xx"
return c.zIncr(ctx, a, n, member)
func (c cmdable) ZAddXX(ctx context.Context, key string, members ...Z) *IntCmd {
return c.ZAddArgs(ctx, key, ZAddArgs{
XX: true,
Members: members,
})
}
func (c cmdable) ZCard(ctx context.Context, key string) *IntCmd {
@ -2126,18 +2233,7 @@ func (c cmdable) ZIncrBy(ctx context.Context, key string, increment float64, mem
func (c cmdable) ZInterStore(ctx context.Context, destination string, store *ZStore) *IntCmd {
args := make([]interface{}, 0, 3+store.len())
args = append(args, "zinterstore", destination, len(store.Keys))
for _, key := range store.Keys {
args = append(args, key)
}
if len(store.Weights) > 0 {
args = append(args, "weights")
for _, weight := range store.Weights {
args = append(args, weight)
}
}
if store.Aggregate != "" {
args = append(args, "aggregate", store.Aggregate)
}
args = store.appendArgs(args)
cmd := NewIntCmd(ctx, args...)
cmd.setFirstKeyPos(3)
_ = c(ctx, cmd)
@ -2147,19 +2243,7 @@ func (c cmdable) ZInterStore(ctx context.Context, destination string, store *ZSt
func (c cmdable) ZInter(ctx context.Context, store *ZStore) *StringSliceCmd {
args := make([]interface{}, 0, 2+store.len())
args = append(args, "zinter", len(store.Keys))
for _, key := range store.Keys {
args = append(args, key)
}
if len(store.Weights) > 0 {
args = append(args, "weights")
for _, weights := range store.Weights {
args = append(args, weights)
}
}
if store.Aggregate != "" {
args = append(args, "aggregate", store.Aggregate)
}
args = store.appendArgs(args)
cmd := NewStringSliceCmd(ctx, args...)
cmd.setFirstKeyPos(2)
_ = c(ctx, cmd)
@ -2169,18 +2253,7 @@ func (c cmdable) ZInter(ctx context.Context, store *ZStore) *StringSliceCmd {
func (c cmdable) ZInterWithScores(ctx context.Context, store *ZStore) *ZSliceCmd {
args := make([]interface{}, 0, 3+store.len())
args = append(args, "zinter", len(store.Keys))
for _, key := range store.Keys {
args = append(args, key)
}
if len(store.Weights) > 0 {
args = append(args, "weights")
for _, weights := range store.Weights {
args = append(args, weights)
}
}
if store.Aggregate != "" {
args = append(args, "aggregate", store.Aggregate)
}
args = store.appendArgs(args)
args = append(args, "withscores")
cmd := NewZSliceCmd(ctx, args...)
cmd.setFirstKeyPos(2)
@ -2240,29 +2313,112 @@ func (c cmdable) ZPopMin(ctx context.Context, key string, count ...int64) *ZSlic
return cmd
}
func (c cmdable) zRange(ctx context.Context, key string, start, stop int64, withScores bool) *StringSliceCmd {
args := []interface{}{
"zrange",
key,
start,
stop,
// ZRangeArgs is all the options of the ZRange command.
// In version> 6.2.0, you can replace the(cmd):
// ZREVRANGE,
// ZRANGEBYSCORE,
// ZREVRANGEBYSCORE,
// ZRANGEBYLEX,
// ZREVRANGEBYLEX.
// Please pay attention to your redis-server version.
//
// Rev, ByScore, ByLex and Offset+Count options require redis-server 6.2.0 and higher.
type ZRangeArgs struct {
Key string
// When the ByScore option is provided, the open interval(exclusive) can be set.
// By default, the score intervals specified by <Start> and <Stop> are closed (inclusive).
// It is similar to the deprecated(6.2.0+) ZRangeByScore command.
// For example:
// ZRangeArgs{
// Key: "example-key",
// Start: "(3",
// Stop: 8,
// ByScore: true,
// }
// cmd: "ZRange example-key (3 8 ByScore" (3 < score <= 8).
//
// For the ByLex option, it is similar to the deprecated(6.2.0+) ZRangeByLex command.
// You can set the <Start> and <Stop> options as follows:
// ZRangeArgs{
// Key: "example-key",
// Start: "[abc",
// Stop: "(def",
// ByLex: true,
// }
// cmd: "ZRange example-key [abc (def ByLex"
//
// For normal cases (ByScore==false && ByLex==false), <Start> and <Stop> should be set to the index range (int).
// You can read the documentation for more information: https://redis.io/commands/zrange
Start interface{}
Stop interface{}
// The ByScore and ByLex options are mutually exclusive.
ByScore bool
ByLex bool
Rev bool
// limit offset count.
Offset int64
Count int64
}
func (z ZRangeArgs) appendArgs(args []interface{}) []interface{} {
// For Rev+ByScore/ByLex, we need to adjust the position of <Start> and <Stop>.
if z.Rev && (z.ByScore || z.ByLex) {
args = append(args, z.Key, z.Stop, z.Start)
} else {
args = append(args, z.Key, z.Start, z.Stop)
}
if withScores {
args = append(args, "withscores")
if z.ByScore {
args = append(args, "byscore")
} else if z.ByLex {
args = append(args, "bylex")
}
if z.Rev {
args = append(args, "rev")
}
if z.Offset != 0 || z.Count != 0 {
args = append(args, "limit", z.Offset, z.Count)
}
return args
}
func (c cmdable) ZRangeArgs(ctx context.Context, z ZRangeArgs) *StringSliceCmd {
args := make([]interface{}, 0, 9)
args = append(args, "zrange")
args = z.appendArgs(args)
cmd := NewStringSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ZRangeArgsWithScores(ctx context.Context, z ZRangeArgs) *ZSliceCmd {
args := make([]interface{}, 0, 10)
args = append(args, "zrange")
args = z.appendArgs(args)
args = append(args, "withscores")
cmd := NewZSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ZRange(ctx context.Context, key string, start, stop int64) *StringSliceCmd {
return c.zRange(ctx, key, start, stop, false)
return c.ZRangeArgs(ctx, ZRangeArgs{
Key: key,
Start: start,
Stop: stop,
})
}
func (c cmdable) ZRangeWithScores(ctx context.Context, key string, start, stop int64) *ZSliceCmd {
cmd := NewZSliceCmd(ctx, "zrange", key, start, stop, "withscores")
_ = c(ctx, cmd)
return cmd
return c.ZRangeArgsWithScores(ctx, ZRangeArgs{
Key: key,
Start: start,
Stop: stop,
})
}
type ZRangeBy struct {
@ -2311,6 +2467,15 @@ func (c cmdable) ZRangeByScoreWithScores(ctx context.Context, key string, opt *Z
return cmd
}
func (c cmdable) ZRangeStore(ctx context.Context, dst string, z ZRangeArgs) *IntCmd {
args := make([]interface{}, 0, 10)
args = append(args, "zrangestore", dst)
args = z.appendArgs(args)
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ZRank(ctx context.Context, key, member string) *IntCmd {
cmd := NewIntCmd(ctx, "zrank", key, member)
_ = c(ctx, cmd)
@ -2413,22 +2578,31 @@ func (c cmdable) ZScore(ctx context.Context, key, member string) *FloatCmd {
return cmd
}
func (c cmdable) ZUnion(ctx context.Context, store ZStore) *StringSliceCmd {
args := make([]interface{}, 0, 2+store.len())
args = append(args, "zunion", len(store.Keys))
args = store.appendArgs(args)
cmd := NewStringSliceCmd(ctx, args...)
cmd.setFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ZUnionWithScores(ctx context.Context, store ZStore) *ZSliceCmd {
args := make([]interface{}, 0, 3+store.len())
args = append(args, "zunion", len(store.Keys))
args = store.appendArgs(args)
args = append(args, "withscores")
cmd := NewZSliceCmd(ctx, args...)
cmd.setFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) ZUnionStore(ctx context.Context, dest string, store *ZStore) *IntCmd {
args := make([]interface{}, 0, 3+store.len())
args = append(args, "zunionstore", dest, len(store.Keys))
for _, key := range store.Keys {
args = append(args, key)
}
if len(store.Weights) > 0 {
args = append(args, "weights")
for _, weight := range store.Weights {
args = append(args, weight)
}
}
if store.Aggregate != "" {
args = append(args, "aggregate", store.Aggregate)
}
args = store.appendArgs(args)
cmd := NewIntCmd(ctx, args...)
cmd.setFirstKeyPos(3)
_ = c(ctx, cmd)

File diff suppressed because it is too large Load Diff

View File

@ -188,8 +188,8 @@ func ExampleClient_Set() {
}
}
func ExampleClient_SetEX() {
err := rdb.SetEX(ctx, "key", "value", time.Hour).Err()
func ExampleClient_SetEx() {
err := rdb.SetEx(ctx, "key", "value", time.Hour).Err()
if err != nil {
panic(err)
}

2
go.mod
View File

@ -7,7 +7,5 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
github.com/onsi/ginkgo v1.15.0
github.com/onsi/gomega v1.10.5
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/metric v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
)

View File

@ -49,7 +49,7 @@ var (
func Struct(dst interface{}) (StructValue, error) {
v := reflect.ValueOf(dst)
// The dstination to scan into should be a struct pointer.
// The destination to scan into should be a struct pointer.
if v.Kind() != reflect.Ptr || v.IsNil() {
return StructValue{}, fmt.Errorf("redis.Scan(non-pointer %T)", dst)
}

View File

@ -65,26 +65,17 @@ func (cn *Conn) RemoteAddr() net.Addr {
}
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
ctx, span := internal.StartSpan(ctx, "redis.with_reader")
defer span.End()
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return internal.RecordError(ctx, span, err)
return err
}
if err := fn(cn.rd); err != nil {
return internal.RecordError(ctx, span, err)
}
return nil
return fn(cn.rd)
}
func (cn *Conn) WithWriter(
ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
) error {
ctx, span := internal.StartSpan(ctx, "redis.with_writer")
defer span.End()
if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
return internal.RecordError(ctx, span, err)
return err
}
if cn.bw.Buffered() > 0 {
@ -92,11 +83,11 @@ func (cn *Conn) WithWriter(
}
if err := fn(cn.wr); err != nil {
return internal.RecordError(ctx, span, err)
return err
}
if err := cn.bw.Flush(); err != nil {
return internal.RecordError(ctx, span, err)
return err
}
internal.WritesCounter.Add(ctx, 1)

View File

@ -4,16 +4,10 @@ import (
"context"
"time"
"github.com/go-redis/redis/v8/internal/proto"
"github.com/go-redis/redis/v8/internal/util"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func Sleep(ctx context.Context, dur time.Duration) error {
_, span := StartSpan(ctx, "time.Sleep")
defer span.End()
t := time.NewTimer(dur)
defer t.Stop()
@ -50,21 +44,3 @@ func isLower(s string) bool {
}
return true
}
//------------------------------------------------------------------------------
var tracer = otel.Tracer("github.com/go-redis/redis")
func StartSpan(ctx context.Context, name string) (context.Context, trace.Span) {
if span := trace.SpanFromContext(ctx); !span.IsRecording() {
return ctx, span
}
return tracer.Start(ctx, name)
}
func RecordError(ctx context.Context, span trace.Span, err error) error {
if err != proto.Nil {
span.RecordError(err)
}
return err
}

View File

@ -12,9 +12,7 @@ import (
"strings"
"time"
"github.com/go-redis/redis/v8/internal"
"github.com/go-redis/redis/v8/internal/pool"
"go.opentelemetry.io/otel/attribute"
)
// Limiter is the interface of a rate limiter or a circuit breaker.
@ -291,21 +289,7 @@ func getUserPassword(u *url.URL) (string, string) {
func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{
Dialer: func(ctx context.Context) (net.Conn, error) {
ctx, span := internal.StartSpan(ctx, "redis.dial")
defer span.End()
if span.IsRecording() {
span.SetAttributes(
attribute.String("db.connection_string", opt.Addr),
)
}
cn, err := opt.Dialer(ctx, opt.Network, opt.Addr)
if err != nil {
return nil, internal.RecordError(ctx, span, err)
}
return cn, nil
return opt.Dialer(ctx, opt.Network, opt.Addr)
},
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,

View File

@ -10,7 +10,6 @@ import (
"github.com/go-redis/redis/v8/internal"
"github.com/go-redis/redis/v8/internal/pool"
"github.com/go-redis/redis/v8/internal/proto"
"go.opentelemetry.io/otel/attribute"
)
// Nil reply returned by Redis when key does not exist.
@ -230,17 +229,18 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
}
cn.Inited = true
ctx, span := internal.StartSpan(ctx, "redis.init_conn")
defer span.End()
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(ctx, c.opt, connPool)
var auth bool
// The low version of redis-server does not support the hello command.
if conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err() == nil {
// For redis-server (<6.0) that does not support the Hello command,
// we continue to provide services with RESP2.
if err := conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err(); err == nil {
auth = true
} else if err.Error() != "ERR unknown command 'hello'" {
return err
}
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
@ -287,20 +287,11 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
ctx, span := internal.StartSpan(ctx, "redis.with_conn")
defer span.End()
cn, err := c.getConn(ctx)
if err != nil {
return err
}
if span.IsRecording() {
if remoteAddr := cn.RemoteAddr(); remoteAddr != nil {
span.SetAttributes(attribute.String("net.peer.ip", remoteAddr.String()))
}
}
defer func() {
c.releaseConn(ctx, cn, err)
}()