Merge branch 'v9'

This commit is contained in:
Vladimir Mihailenco 2022-06-04 14:28:10 +03:00
commit 0aa94538ee
33 changed files with 1852 additions and 1871 deletions

View File

@ -2,9 +2,9 @@ name: Go
on:
push:
branches: [master]
branches: [master, v9]
pull_request:
branches: [master]
branches: [master, v9]
jobs:
build:

View File

@ -7,6 +7,7 @@ on:
branches:
- master
- main
- v9
pull_request:
jobs:

View File

@ -1,177 +1,6 @@
## [8.11.5](https://github.com/go-redis/redis/compare/v8.11.4...v8.11.5) (2022-03-17)
## v9 UNRELEASED
### Bug Fixes
* add missing Expire methods to Cmdable ([17e3b43](https://github.com/go-redis/redis/commit/17e3b43879d516437ada71cf9c0deac6a382ed9a))
* add whitespace for avoid unlikely colisions ([7f7c181](https://github.com/go-redis/redis/commit/7f7c1817617cfec909efb13d14ad22ef05a6ad4c))
* example/otel compile error ([#2028](https://github.com/go-redis/redis/issues/2028)) ([187c07c](https://github.com/go-redis/redis/commit/187c07c41bf68dc3ab280bc3a925e960bbef6475))
* **extra/redisotel:** set span.kind attribute to client ([065b200](https://github.com/go-redis/redis/commit/065b200070b41e6e949710b4f9e01b50ccc60ab2))
* format ([96f53a0](https://github.com/go-redis/redis/commit/96f53a0159a28affa94beec1543a62234e7f8b32))
* invalid type assert in stringArg ([de6c131](https://github.com/go-redis/redis/commit/de6c131865b8263400c8491777b295035f2408e4))
* rename Golang to Go ([#2030](https://github.com/go-redis/redis/issues/2030)) ([b82a2d9](https://github.com/go-redis/redis/commit/b82a2d9d4d2de7b7cbe8fcd4895be62dbcacacbc))
* set timeout for WAIT command. Fixes [#1963](https://github.com/go-redis/redis/issues/1963) ([333fee1](https://github.com/go-redis/redis/commit/333fee1a8fd98a2fbff1ab187c1b03246a7eb01f))
* update some argument counts in pre-allocs ([f6974eb](https://github.com/go-redis/redis/commit/f6974ebb5c40a8adf90d2cacab6dc297f4eba4c2))
### Features
* Add redis v7's NX, XX, GT, LT expire variants ([e19bbb2](https://github.com/go-redis/redis/commit/e19bbb26e2e395c6e077b48d80d79e99f729a8b8))
* add support for acl sentinel auth in universal client ([ab0ccc4](https://github.com/go-redis/redis/commit/ab0ccc47413f9b2a6eabc852fed5005a3ee1af6e))
* add support for COPY command ([#2016](https://github.com/go-redis/redis/issues/2016)) ([730afbc](https://github.com/go-redis/redis/commit/730afbcffb93760e8a36cc06cfe55ab102b693a7))
* add support for passing extra attributes added to spans ([39faaa1](https://github.com/go-redis/redis/commit/39faaa171523834ba527c9789710c4fde87f5a2e))
* add support for time.Duration write and scan ([2f1b74e](https://github.com/go-redis/redis/commit/2f1b74e20cdd7719b2aecf0768d3e3ae7c3e781b))
* **redisotel:** ability to override TracerProvider ([#1998](https://github.com/go-redis/redis/issues/1998)) ([bf8d4aa](https://github.com/go-redis/redis/commit/bf8d4aa60c00366cda2e98c3ddddc8cf68507417))
* set net.peer.name and net.peer.port in otel example ([69bf454](https://github.com/go-redis/redis/commit/69bf454f706204211cd34835f76b2e8192d3766d))
## [8.11.4](https://github.com/go-redis/redis/compare/v8.11.3...v8.11.4) (2021-10-04)
### Features
* add acl auth support for sentinels ([f66582f](https://github.com/go-redis/redis/commit/f66582f44f3dc3a4705a5260f982043fde4aa634))
* add Cmd.{String,Int,Float,Bool}Slice helpers and an example ([5d3d293](https://github.com/go-redis/redis/commit/5d3d293cc9c60b90871e2420602001463708ce24))
* add SetVal method for each command ([168981d](https://github.com/go-redis/redis/commit/168981da2d84ee9e07d15d3e74d738c162e264c4))
## v8.11
- Remove OpenTelemetry metrics.
- Supports more redis commands and options.
## v8.10
- Removed extra OpenTelemetry spans from go-redis core. Now go-redis instrumentation only adds a
single span with a Redis command (instead of 4 spans). There are multiple reasons behind this
decision:
- Traces become smaller and less noisy.
- It may be costly to process those 3 extra spans for each query.
- go-redis no longer depends on OpenTelemetry.
Eventually we hope to replace the information that we no longer collect with OpenTelemetry
Metrics.
## v8.9
- Changed `PubSub.Channel` to only rely on `Ping` result. You can now use `WithChannelSize`,
`WithChannelHealthCheckInterval`, and `WithChannelSendTimeout` to override default settings.
## v8.8
- To make updating easier, extra modules now have the same version as go-redis does. That means that
you need to update your imports:
```
github.com/go-redis/redis/extra/redisotel -> github.com/go-redis/redis/extra/redisotel/v8
github.com/go-redis/redis/extra/rediscensus -> github.com/go-redis/redis/extra/rediscensus/v8
```
## v8.5
- [knadh](https://github.com/knadh) contributed long-awaited ability to scan Redis Hash into a
struct:
```go
err := rdb.HGetAll(ctx, "hash").Scan(&data)
err := rdb.MGet(ctx, "key1", "key2").Scan(&data)
```
- Please check [redismock](https://github.com/go-redis/redismock) by
[monkey92t](https://github.com/monkey92t) if you are looking for mocking Redis Client.
## v8
- All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not
using `context.Context` yet, the simplest option is to define global package variable
`var ctx = context.TODO()` and use it when `ctx` is required.
- Full support for `context.Context` canceling.
- Added `redis.NewFailoverClusterClient` that supports routing read-only commands to a slave node.
- Added `redisext.OpenTemetryHook` that adds
[Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/).
- Redis slow log support.
- Ring uses Rendezvous Hashing by default which provides better distribution. You need to move
existing keys to a new location or keys will be inaccessible / lost. To use old hashing scheme:
```go
import "github.com/golang/groupcache/consistenthash"
ring := redis.NewRing(&redis.RingOptions{
NewConsistentHash: func() {
return consistenthash.New(100, crc32.ChecksumIEEE)
},
})
```
- `ClusterOptions.MaxRedirects` default value is changed from 8 to 3.
- `Options.MaxRetries` default value is changed from 0 to 3.
- `Cluster.ForEachNode` is renamed to `ForEachShard` for consistency with `Ring`.
## v7.3
- New option `Options.Username` which causes client to use `AuthACL`. Be aware if your connection
URL contains username.
## v7.2
- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users.
## v7.1
- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer`
interface.
## v7
- _Important_. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a
transactional pipeline.
- WrapProcess is replaced with more convenient AddHook that has access to context.Context.
- WithContext now can not be used to create a shallow copy of the client.
- New methods ProcessContext, DoContext, and ExecContext.
- Client respects Context.Deadline when setting net.Conn deadline.
- Client listens on Context.Done while waiting for a connection from the pool and returns an error
when context context is cancelled.
- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow
detecting reconnections.
- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse
the time.
- `SetLimiter` is removed and added `Options.Limiter` instead.
- `HMSet` is deprecated as of Redis v4.
## v6.15
- Cluster and Ring pipelines process commands for each node in its own goroutine.
## 6.14
- Added Options.MinIdleConns.
- Added Options.MaxConnAge.
- PoolStats.FreeConns is renamed to PoolStats.IdleConns.
- Add Client.Do to simplify creating custom commands.
- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers.
- Lower memory usage.
## v6.13
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set
`HashReplicas = 1000` for better keys distribution between shards.
- Cluster client was optimized to use much less memory when reloading cluster state.
- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout
occurres. In most cases it is recommended to use PubSub.Channel instead.
- Dialer.KeepAlive is set to 5 minutes by default.
## v6.12
- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis
Servers that don't have cluster mode enabled. See
https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup
- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol.
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources.
`Pipeline.Discard` is still available if you want to reset commands for some reason.
- Replaced `*redis.Z` with `redis.Z` since it is small enough to be passed as value.

View File

@ -18,14 +18,17 @@ type ClientStub struct {
resp []byte
}
var initHello = []byte("%1\r\n+proto\r\n:3\r\n")
func NewClientStub(resp []byte) *ClientStub {
stub := &ClientStub{
resp: resp,
}
stub.Cmdable = NewClient(&Options{
PoolSize: 128,
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(), nil
return stub.stubConn(initHello), nil
},
})
return stub
@ -40,7 +43,7 @@ func NewClusterClientStub(resp []byte) *ClientStub {
PoolSize: 128,
Addrs: []string{"127.0.0.1:6379"},
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(), nil
return stub.stubConn(initHello), nil
},
ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) {
return []ClusterSlot{
@ -65,18 +68,27 @@ func NewClusterClientStub(resp []byte) *ClientStub {
return stub
}
func (c *ClientStub) stubConn() *ConnStub {
func (c *ClientStub) stubConn(init []byte) *ConnStub {
return &ConnStub{
init: init,
resp: c.resp,
}
}
type ConnStub struct {
init []byte
resp []byte
pos int
}
func (c *ConnStub) Read(b []byte) (n int, err error) {
// Return conn.init()
if len(c.init) > 0 {
n = copy(b, c.init)
c.init = c.init[n:]
return n, nil
}
if len(c.resp) == 0 {
return 0, io.EOF
}

View File

@ -223,7 +223,7 @@ func BenchmarkZAdd(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err := client.ZAdd(ctx, "key", &redis.Z{
err := client.ZAdd(ctx, "key", redis.Z{
Score: float64(1),
Member: "hello",
}).Err()

View File

@ -795,7 +795,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx)
_ = pipe.Close()
ask = false
} else {
lastErr = node.Client.Process(ctx, cmd)
@ -1406,12 +1405,7 @@ func (c *ClusterClient) txPipelineReadQueued(
return err
}
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
}

View File

@ -515,9 +515,7 @@ var _ = Describe("ClusterClient", func() {
pipe = client.Pipeline().(*redis.Pipeline)
})
AfterEach(func() {
Expect(pipe.Close()).NotTo(HaveOccurred())
})
AfterEach(func() {})
assertPipeline()
})
@ -527,9 +525,7 @@ var _ = Describe("ClusterClient", func() {
pipe = client.TxPipeline().(*redis.Pipeline)
})
AfterEach(func() {
Expect(pipe.Close()).NotTo(HaveOccurred())
})
AfterEach(func() {})
assertPipeline()
})
@ -1182,16 +1178,17 @@ var _ = Describe("ClusterClient with unavailable Cluster", func() {
var client *redis.ClusterClient
BeforeEach(func() {
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
opt := redisClusterOptions()
opt.ReadTimeout = 250 * time.Millisecond
opt.WriteTimeout = 250 * time.Millisecond
opt.MaxRedirects = 1
client = cluster.newClusterClientUnstable(opt)
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
})
AfterEach(func() {

1527
command.go

File diff suppressed because it is too large Load Diff

View File

@ -4,10 +4,10 @@ import (
"errors"
"time"
"github.com/go-redis/redis/v8"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
redis "github.com/go-redis/redis/v8"
)
var _ = Describe("Cmd", func() {

View File

@ -137,8 +137,7 @@ type Cmdable interface {
MSetNX(ctx context.Context, values ...interface{}) *BoolCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd
// TODO: rename to SetEx
SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd
SetXX(ctx context.Context, key string, value interface{}, expiration time.Duration) *BoolCmd
SetRange(ctx context.Context, key string, offset int64, value string) *IntCmd
@ -164,7 +163,7 @@ type Cmdable interface {
HDel(ctx context.Context, key string, fields ...string) *IntCmd
HExists(ctx context.Context, key, field string) *BoolCmd
HGet(ctx context.Context, key, field string) *StringCmd
HGetAll(ctx context.Context, key string) *StringStringMapCmd
HGetAll(ctx context.Context, key string) *MapStringStringCmd
HIncrBy(ctx context.Context, key, field string, incr int64) *IntCmd
HIncrByFloat(ctx context.Context, key, field string, incr float64) *FloatCmd
HKeys(ctx context.Context, key string) *StringSliceCmd
@ -174,7 +173,8 @@ type Cmdable interface {
HMSet(ctx context.Context, key string, values ...interface{}) *BoolCmd
HSetNX(ctx context.Context, key, field string, value interface{}) *BoolCmd
HVals(ctx context.Context, key string) *StringSliceCmd
HRandField(ctx context.Context, key string, count int, withValues bool) *StringSliceCmd
HRandField(ctx context.Context, key string, count int) *StringSliceCmd
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
BLPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd
BRPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd
@ -244,10 +244,6 @@ type Cmdable interface {
XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
// TODO: XTrim and XTrimApprox remove in v9.
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
@ -260,27 +256,11 @@ type Cmdable interface {
BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
BZPopMin(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
// TODO: remove
// ZAddCh
// ZIncr
// ZAddNXCh
// ZAddXXCh
// ZIncrNX
// ZIncrXX
// in v9.
// use ZAddArgs and ZAddArgsIncr.
ZAdd(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddNX(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddXX(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddCh(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddNXCh(ctx context.Context, key string, members ...*Z) *IntCmd
ZAddXXCh(ctx context.Context, key string, members ...*Z) *IntCmd
ZAdd(ctx context.Context, key string, members ...Z) *IntCmd
ZAddNX(ctx context.Context, key string, members ...Z) *IntCmd
ZAddXX(ctx context.Context, key string, members ...Z) *IntCmd
ZAddArgs(ctx context.Context, key string, args ZAddArgs) *IntCmd
ZAddArgsIncr(ctx context.Context, key string, args ZAddArgs) *FloatCmd
ZIncr(ctx context.Context, key string, member *Z) *FloatCmd
ZIncrNX(ctx context.Context, key string, member *Z) *FloatCmd
ZIncrXX(ctx context.Context, key string, member *Z) *FloatCmd
ZCard(ctx context.Context, key string) *IntCmd
ZCount(ctx context.Context, key, min, max string) *IntCmd
ZLexCount(ctx context.Context, key, min, max string) *IntCmd
@ -312,9 +292,10 @@ type Cmdable interface {
ZRevRank(ctx context.Context, key, member string) *IntCmd
ZScore(ctx context.Context, key, member string) *FloatCmd
ZUnionStore(ctx context.Context, dest string, store *ZStore) *IntCmd
ZRandMember(ctx context.Context, key string, count int) *StringSliceCmd
ZRandMemberWithScores(ctx context.Context, key string, count int) *ZSliceCmd
ZUnion(ctx context.Context, store ZStore) *StringSliceCmd
ZUnionWithScores(ctx context.Context, store ZStore) *ZSliceCmd
ZRandMember(ctx context.Context, key string, count int, withScores bool) *StringSliceCmd
ZDiff(ctx context.Context, keys ...string) *StringSliceCmd
ZDiffWithScores(ctx context.Context, keys ...string) *ZSliceCmd
ZDiffStore(ctx context.Context, destination string, keys ...string) *IntCmd
@ -330,7 +311,9 @@ type Cmdable interface {
ClientList(ctx context.Context) *StringCmd
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
ClientID(ctx context.Context) *IntCmd
ConfigGet(ctx context.Context, parameter string) *SliceCmd
ClientUnblock(ctx context.Context, id int64) *IntCmd
ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd
ConfigResetStat(ctx context.Context) *StatusCmd
ConfigSet(ctx context.Context, parameter, value string) *StatusCmd
ConfigRewrite(ctx context.Context) *StatusCmd
@ -346,6 +329,7 @@ type Cmdable interface {
ShutdownSave(ctx context.Context) *StatusCmd
ShutdownNoSave(ctx context.Context) *StatusCmd
SlaveOf(ctx context.Context, host, port string) *StatusCmd
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
Time(ctx context.Context) *TimeCmd
DebugObject(ctx context.Context, key string) *StringCmd
ReadOnly(ctx context.Context) *StatusCmd
@ -404,6 +388,7 @@ type StatefulCmdable interface {
Select(ctx context.Context, index int) *StatusCmd
SwapDB(ctx context.Context, index1, index2 int) *StatusCmd
ClientSetName(ctx context.Context, name string) *BoolCmd
Hello(ctx context.Context, ver int, username, password, clientName string) *MapStringInterfaceCmd
}
var (
@ -460,6 +445,26 @@ func (c statefulCmdable) ClientSetName(ctx context.Context, name string) *BoolCm
return cmd
}
// Hello Set the resp protocol used.
func (c statefulCmdable) Hello(ctx context.Context,
ver int, username, password, clientName string) *MapStringInterfaceCmd {
args := make([]interface{}, 0, 7)
args = append(args, "hello", ver)
if password != "" {
if username != "" {
args = append(args, "auth", username, password)
} else {
args = append(args, "auth", "default", password)
}
}
if clientName != "" {
args = append(args, "setname", clientName)
}
cmd := NewMapStringInterfaceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
//------------------------------------------------------------------------------
func (c cmdable) Command(ctx context.Context) *CommandsInfoCmd {
@ -882,7 +887,7 @@ func (c cmdable) MSetNX(ctx context.Context, values ...interface{}) *BoolCmd {
}
// Set Redis `SET key value [expiration]` command.
// Use expiration for `SETEX`-like behavior.
// Use expiration for `SETEx`-like behavior.
//
// Zero expiration means the key has no expiration time.
// KeepTTL is a Redis KEEPTTL option to keep existing TTL, it requires your redis-server version >= 6.0,
@ -958,8 +963,8 @@ func (c cmdable) SetArgs(ctx context.Context, key string, value interface{}, a S
return cmd
}
// SetEX Redis `SETEX key expiration value` command.
func (c cmdable) SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
// SetEx Redis `SETEx key expiration value` command.
func (c cmdable) SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
cmd := NewStatusCmd(ctx, "setex", key, formatSec(ctx, expiration), value)
_ = c(ctx, cmd)
return cmd
@ -1229,8 +1234,8 @@ func (c cmdable) HGet(ctx context.Context, key, field string) *StringCmd {
return cmd
}
func (c cmdable) HGetAll(ctx context.Context, key string) *StringStringMapCmd {
cmd := NewStringStringMapCmd(ctx, "hgetall", key)
func (c cmdable) HGetAll(ctx context.Context, key string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "hgetall", key)
_ = c(ctx, cmd)
return cmd
}
@ -1313,16 +1318,15 @@ func (c cmdable) HVals(ctx context.Context, key string) *StringSliceCmd {
}
// HRandField redis-server version >= 6.2.0.
func (c cmdable) HRandField(ctx context.Context, key string, count int, withValues bool) *StringSliceCmd {
args := make([]interface{}, 0, 4)
// Although count=0 is meaningless, redis accepts count=0.
args = append(args, "hrandfield", key, count)
if withValues {
args = append(args, "withvalues")
func (c cmdable) HRandField(ctx context.Context, key string, count int) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "hrandfield", key, count)
_ = c(ctx, cmd)
return cmd
}
cmd := NewStringSliceCmd(ctx, args...)
// HRandFieldWithValues redis-server version >= 6.2.0.
func (c cmdable) HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd {
cmd := NewKeyValueSliceCmd(ctx, "hrandfield", key, count, "withvalues")
_ = c(ctx, cmd)
return cmd
}
@ -1725,10 +1729,6 @@ type XAddArgs struct {
Stream string
NoMkStream bool
MaxLen int64 // MAXLEN N
// Deprecated: use MaxLen+Approx, remove in v9.
MaxLenApprox int64 // MAXLEN ~ N
MinID string
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
Approx bool
@ -1752,9 +1752,6 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
} else {
args = append(args, "maxlen", a.MaxLen)
}
case a.MaxLenApprox > 0:
// TODO remove in v9.
args = append(args, "maxlen", "~", a.MaxLenApprox)
case a.MinID != "":
if a.Approx {
args = append(args, "minid", "~", a.MinID)
@ -2070,16 +2067,6 @@ func (c cmdable) xTrim(
return cmd
}
// Deprecated: use XTrimMaxLen, remove in v9.
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
}
// Deprecated: use XTrimMaxLenApprox, remove in v9.
func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd {
return c.xTrim(ctx, key, "maxlen", true, maxLen, 0)
}
// XTrimMaxLen No `~` rules are used, `limit` cannot be used.
// cmd: XTRIM key MAXLEN maxLen
func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
@ -2266,116 +2253,26 @@ func (c cmdable) ZAddArgsIncr(ctx context.Context, key string, args ZAddArgs) *F
return cmd
}
// TODO: Compatible with v8 api, will be removed in v9.
func (c cmdable) zAdd(ctx context.Context, key string, args ZAddArgs, members ...*Z) *IntCmd {
args.Members = make([]Z, len(members))
for i, m := range members {
args.Members[i] = *m
}
cmd := NewIntCmd(ctx, c.zAddArgs(key, args, false)...)
_ = c(ctx, cmd)
return cmd
}
// ZAdd Redis `ZADD key score member [score member ...]` command.
func (c cmdable) ZAdd(ctx context.Context, key string, members ...*Z) *IntCmd {
return c.zAdd(ctx, key, ZAddArgs{}, members...)
func (c cmdable) ZAdd(ctx context.Context, key string, members ...Z) *IntCmd {
return c.ZAddArgs(ctx, key, ZAddArgs{
Members: members,
})
}
// ZAddNX Redis `ZADD key NX score member [score member ...]` command.
func (c cmdable) ZAddNX(ctx context.Context, key string, members ...*Z) *IntCmd {
return c.zAdd(ctx, key, ZAddArgs{
func (c cmdable) ZAddNX(ctx context.Context, key string, members ...Z) *IntCmd {
return c.ZAddArgs(ctx, key, ZAddArgs{
NX: true,
}, members...)
Members: members,
})
}
// ZAddXX Redis `ZADD key XX score member [score member ...]` command.
func (c cmdable) ZAddXX(ctx context.Context, key string, members ...*Z) *IntCmd {
return c.zAdd(ctx, key, ZAddArgs{
func (c cmdable) ZAddXX(ctx context.Context, key string, members ...Z) *IntCmd {
return c.ZAddArgs(ctx, key, ZAddArgs{
XX: true,
}, members...)
}
// ZAddCh Redis `ZADD key CH score member [score member ...]` command.
// Deprecated: Use
// client.ZAddArgs(ctx, ZAddArgs{
// Ch: true,
// Members: []Z,
// })
// remove in v9.
func (c cmdable) ZAddCh(ctx context.Context, key string, members ...*Z) *IntCmd {
return c.zAdd(ctx, key, ZAddArgs{
Ch: true,
}, members...)
}
// ZAddNXCh Redis `ZADD key NX CH score member [score member ...]` command.
// Deprecated: Use
// client.ZAddArgs(ctx, ZAddArgs{
// NX: true,
// Ch: true,
// Members: []Z,
// })
// remove in v9.
func (c cmdable) ZAddNXCh(ctx context.Context, key string, members ...*Z) *IntCmd {
return c.zAdd(ctx, key, ZAddArgs{
NX: true,
Ch: true,
}, members...)
}
// ZAddXXCh Redis `ZADD key XX CH score member [score member ...]` command.
// Deprecated: Use
// client.ZAddArgs(ctx, ZAddArgs{
// XX: true,
// Ch: true,
// Members: []Z,
// })
// remove in v9.
func (c cmdable) ZAddXXCh(ctx context.Context, key string, members ...*Z) *IntCmd {
return c.zAdd(ctx, key, ZAddArgs{
XX: true,
Ch: true,
}, members...)
}
// ZIncr Redis `ZADD key INCR score member` command.
// Deprecated: Use
// client.ZAddArgsIncr(ctx, ZAddArgs{
// Members: []Z,
// })
// remove in v9.
func (c cmdable) ZIncr(ctx context.Context, key string, member *Z) *FloatCmd {
return c.ZAddArgsIncr(ctx, key, ZAddArgs{
Members: []Z{*member},
})
}
// ZIncrNX Redis `ZADD key NX INCR score member` command.
// Deprecated: Use
// client.ZAddArgsIncr(ctx, ZAddArgs{
// NX: true,
// Members: []Z,
// })
// remove in v9.
func (c cmdable) ZIncrNX(ctx context.Context, key string, member *Z) *FloatCmd {
return c.ZAddArgsIncr(ctx, key, ZAddArgs{
NX: true,
Members: []Z{*member},
})
}
// ZIncrXX Redis `ZADD key XX INCR score member` command.
// Deprecated: Use
// client.ZAddArgsIncr(ctx, ZAddArgs{
// XX: true,
// Members: []Z,
// })
// remove in v9.
func (c cmdable) ZIncrXX(ctx context.Context, key string, member *Z) *FloatCmd {
return c.ZAddArgsIncr(ctx, key, ZAddArgs{
XX: true,
Members: []Z{*member},
Members: members,
})
}
@ -2783,16 +2680,15 @@ func (c cmdable) ZUnionStore(ctx context.Context, dest string, store *ZStore) *I
}
// ZRandMember redis-server version >= 6.2.0.
func (c cmdable) ZRandMember(ctx context.Context, key string, count int, withScores bool) *StringSliceCmd {
args := make([]interface{}, 0, 4)
// Although count=0 is meaningless, redis accepts count=0.
args = append(args, "zrandmember", key, count)
if withScores {
args = append(args, "withscores")
func (c cmdable) ZRandMember(ctx context.Context, key string, count int) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "zrandmember", key, count)
_ = c(ctx, cmd)
return cmd
}
cmd := NewStringSliceCmd(ctx, args...)
// ZRandMemberWithScores redis-server version >= 6.2.0.
func (c cmdable) ZRandMemberWithScores(ctx context.Context, key string, count int) *ZSliceCmd {
cmd := NewZSliceCmd(ctx, "zrandmember", key, count, "withscores")
_ = c(ctx, cmd)
return cmd
}
@ -2940,8 +2836,8 @@ func (c cmdable) ClientUnblockWithError(ctx context.Context, id int64) *IntCmd {
return cmd
}
func (c cmdable) ConfigGet(ctx context.Context, parameter string) *SliceCmd {
cmd := NewSliceCmd(ctx, "config", "get", parameter)
func (c cmdable) ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "config", "get", parameter)
_ = c(ctx, cmd)
return cmd
}

File diff suppressed because it is too large Load Diff

View File

@ -190,8 +190,8 @@ func ExampleClient_Set() {
}
}
func ExampleClient_SetEX() {
err := rdb.SetEX(ctx, "key", "value", time.Hour).Err()
func ExampleClient_SetEx() {
err := rdb.SetEx(ctx, "key", "value", time.Hour).Err()
if err != nil {
panic(err)
}
@ -278,9 +278,9 @@ func ExampleClient_ScanType() {
// Output: found 33 keys
}
// ExampleStringStringMapCmd_Scan shows how to scan the results of a map fetch
// ExampleMapStringStringCmd_Scan shows how to scan the results of a map fetch
// into a struct.
func ExampleStringStringMapCmd_Scan() {
func ExampleMapStringStringCmd_Scan() {
rdb.FlushDB(ctx)
err := rdb.HMSet(ctx, "map",
"name", "hello",
@ -617,7 +617,7 @@ func ExampleClient_SlowLogGet() {
old := rdb.ConfigGet(ctx, key).Val()
rdb.ConfigSet(ctx, key, "0")
defer rdb.ConfigSet(ctx, key, old[1].(string))
defer rdb.ConfigSet(ctx, key, old[key])
if err := rdb.Do(ctx, "slowlog", "reset").Err(); err != nil {
panic(err)

View File

@ -60,21 +60,21 @@ func (c *ClusterClient) SwapNodes(ctx context.Context, key string) error {
return nil
}
func (state *clusterState) IsConsistent(ctx context.Context) bool {
if len(state.Masters) < 3 {
func (c *clusterState) IsConsistent(ctx context.Context) bool {
if len(c.Masters) < 3 {
return false
}
for _, master := range state.Masters {
for _, master := range c.Masters {
s := master.Client.Info(ctx, "replication").Val()
if !strings.Contains(s, "role:master") {
return false
}
}
if len(state.Slaves) < 3 {
if len(c.Slaves) < 3 {
return false
}
for _, slave := range state.Slaves {
for _, slave := range c.Slaves {
s := slave.Client.Info(ctx, "replication").Val()
if !strings.Contains(s, "role:slave") {
return false

View File

@ -0,0 +1,49 @@
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
package pool
import (
"errors"
"io"
"net"
"syscall"
"time"
)
var errUnexpectedRead = errors.New("unexpected read from socket")
func connCheck(conn net.Conn) error {
// Reset previous timeout.
_ = conn.SetDeadline(time.Time{})
sysConn, ok := conn.(syscall.Conn)
if !ok {
return nil
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return err
}
var sysErr error
err = rawConn.Read(func(fd uintptr) bool {
var buf [1]byte
n, err := syscall.Read(int(fd), buf[:])
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case n > 0:
sysErr = errUnexpectedRead
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
sysErr = nil
default:
sysErr = err
}
return true
})
if err != nil {
return err
}
return sysErr
}

View File

@ -0,0 +1,9 @@
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
package pool
import "net"
func connCheck(conn net.Conn) error {
return nil
}

View File

@ -0,0 +1,48 @@
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
package pool
import (
"net"
"net/http/httptest"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("tests conn_check with real conns", func() {
var ts *httptest.Server
var conn net.Conn
var err error
BeforeEach(func() {
ts = httptest.NewServer(nil)
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
ts.Close()
})
It("good conn check", func() {
Expect(connCheck(conn)).NotTo(HaveOccurred())
Expect(conn.Close()).NotTo(HaveOccurred())
Expect(connCheck(conn)).To(HaveOccurred())
})
It("bad conn check", func() {
Expect(conn.Close()).NotTo(HaveOccurred())
Expect(connCheck(conn)).To(HaveOccurred())
})
It("check conn deadline", func() {
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
time.Sleep(time.Millisecond * 10)
Expect(connCheck(conn)).NotTo(HaveOccurred())
Expect(conn.Close()).NotTo(HaveOccurred())
})
})

View File

@ -1,9 +1,14 @@
package pool
import (
"net"
"time"
)
func (cn *Conn) SetCreatedAt(tm time.Time) {
cn.createdAt = tm
}
func (cn *Conn) NetConn() net.Conn {
return cn.netConn
}

View File

@ -323,6 +323,8 @@ var _ = Describe("conns reaper", func() {
cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
case "aged":
cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
case "connCheck":
_ = cn.Close()
}
conns = append(conns, cn)
staleConns = append(staleConns, cn)
@ -409,6 +411,7 @@ var _ = Describe("conns reaper", func() {
assert("idle")
assert("aged")
assert("connCheck")
})
var _ = Describe("race", func() {

View File

@ -2,21 +2,40 @@ package proto
import (
"bufio"
"errors"
"fmt"
"io"
"math"
"math/big"
"strconv"
"github.com/go-redis/redis/v8/internal/util"
)
// redis resp protocol data type.
const (
ErrorReply = '-'
StatusReply = '+'
IntReply = ':'
StringReply = '$'
ArrayReply = '*'
RespStatus = '+' // +<string>\r\n
RespError = '-' // -<string>\r\n
RespString = '$' // $<length>\r\n<bytes>\r\n
RespInt = ':' // :<number>\r\n
RespNil = '_' // _\r\n
RespFloat = ',' // ,<floating-point-number>\r\n (golang float)
RespBool = '#' // true: #t\r\n false: #f\r\n
RespBlobError = '!' // !<length>\r\n<bytes>\r\n
RespVerbatim = '=' // =<length>\r\nFORMAT:<bytes>\r\n
RespBigInt = '(' // (<big number>\r\n
RespArray = '*' // *<len>\r\n... (same as resp2)
RespMap = '%' // %<len>\r\n(key)\r\n(value)\r\n... (golang map)
RespSet = '~' // ~<len>\r\n... (same as Array)
RespAttr = '|' // |<len>\r\n(key)\r\n(value)\r\n... + command reply
RespPush = '>' // ><len>\r\n... (same as Array)
)
// Not used temporarily.
// Redis has not used these two data types for the time being, and will implement them later.
// Streamed = "EOF:"
// StreamedAggregated = '?'
//------------------------------------------------------------------------------
const Nil = RedisError("redis: nil") // nolint:errname
@ -27,19 +46,19 @@ func (e RedisError) Error() string { return string(e) }
func (RedisError) RedisError() {}
//------------------------------------------------------------------------------
func ParseErrorReply(line []byte) error {
return RedisError(line[1:])
}
type MultiBulkParse func(*Reader, int64) (interface{}, error)
//------------------------------------------------------------------------------
type Reader struct {
rd *bufio.Reader
_buf []byte
}
func NewReader(rd io.Reader) *Reader {
return &Reader{
rd: bufio.NewReader(rd),
_buf: make([]byte, 64),
}
}
@ -55,14 +74,53 @@ func (r *Reader) Reset(rd io.Reader) {
r.rd.Reset(rd)
}
// PeekReplyType returns the data type of the next response without advancing the Reader,
// and discard the attribute type.
func (r *Reader) PeekReplyType() (byte, error) {
b, err := r.rd.Peek(1)
if err != nil {
return 0, err
}
if b[0] == RespAttr {
if err = r.DiscardNext(); err != nil {
return 0, err
}
return r.PeekReplyType()
}
return b[0], nil
}
// ReadLine Return a valid reply, it will check the protocol or redis error,
// and discard the attribute type.
func (r *Reader) ReadLine() ([]byte, error) {
line, err := r.readLine()
if err != nil {
return nil, err
}
if isNilReply(line) {
switch line[0] {
case RespError:
return nil, ParseErrorReply(line)
case RespNil:
return nil, Nil
case RespBlobError:
var blobErr string
blobErr, err = r.readStringReply(line)
if err == nil {
err = RedisError(blobErr)
}
return nil, err
case RespAttr:
if err = r.Discard(line); err != nil {
return nil, err
}
return r.ReadLine()
}
// Compatible with RESP2
if IsNilReply(line) {
return nil, Nil
}
return line, nil
}
@ -93,48 +151,192 @@ func (r *Reader) readLine() ([]byte, error) {
return b[:len(b)-2], nil
}
func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
func (r *Reader) ReadReply() (interface{}, error) {
line, err := r.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case StatusReply:
case RespStatus:
return string(line[1:]), nil
case IntReply:
case RespInt:
return util.ParseInt(line[1:], 10, 64)
case StringReply:
case RespFloat:
return r.readFloat(line)
case RespBool:
return r.readBool(line)
case RespBigInt:
return r.readBigInt(line)
case RespString:
return r.readStringReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
if m == nil {
err := fmt.Errorf("redis: got %.100q, but multi bulk parser is nil", line)
return nil, err
}
return m(r, n)
case RespVerbatim:
return r.readVerb(line)
case RespArray, RespSet, RespPush:
return r.readSlice(line)
case RespMap:
return r.readMap(line)
}
return nil, fmt.Errorf("redis: can't parse %.100q", line)
}
func (r *Reader) ReadIntReply() (int64, error) {
func (r *Reader) readFloat(line []byte) (float64, error) {
v := string(line[1:])
switch string(line[1:]) {
case "inf":
return math.Inf(1), nil
case "-inf":
return math.Inf(-1), nil
}
return strconv.ParseFloat(v, 64)
}
func (r *Reader) readBool(line []byte) (bool, error) {
switch string(line[1:]) {
case "t":
return true, nil
case "f":
return false, nil
}
return false, fmt.Errorf("redis: can't parse bool reply: %q", line)
}
func (r *Reader) readBigInt(line []byte) (*big.Int, error) {
i := new(big.Int)
if i, ok := i.SetString(string(line[1:]), 10); ok {
return i, nil
}
return nil, fmt.Errorf("redis: can't parse bigInt reply: %q", line)
}
func (r *Reader) readStringReply(line []byte) (string, error) {
n, err := replyLen(line)
if err != nil {
return "", err
}
b := make([]byte, n+2)
_, err = io.ReadFull(r.rd, b)
if err != nil {
return "", err
}
return util.BytesToString(b[:n]), nil
}
func (r *Reader) readVerb(line []byte) (string, error) {
s, err := r.readStringReply(line)
if err != nil {
return "", err
}
if len(s) < 4 || s[3] != ':' {
return "", fmt.Errorf("redis: can't parse verbatim string reply: %q", line)
}
return s[4:], nil
}
func (r *Reader) readSlice(line []byte) ([]interface{}, error) {
n, err := replyLen(line)
if err != nil {
return nil, err
}
val := make([]interface{}, n)
for i := 0; i < len(val); i++ {
v, err := r.ReadReply()
if err != nil {
if err == Nil {
val[i] = nil
continue
}
if err, ok := err.(RedisError); ok {
val[i] = err
continue
}
return nil, err
}
val[i] = v
}
return val, nil
}
func (r *Reader) readMap(line []byte) (map[interface{}]interface{}, error) {
n, err := replyLen(line)
if err != nil {
return nil, err
}
m := make(map[interface{}]interface{}, n)
for i := 0; i < n; i++ {
k, err := r.ReadReply()
if err != nil {
return nil, err
}
v, err := r.ReadReply()
if err != nil {
if err == Nil {
m[k] = nil
continue
}
if err, ok := err.(RedisError); ok {
m[k] = err
continue
}
return nil, err
}
m[k] = v
}
return m, nil
}
// -------------------------------
func (r *Reader) ReadInt() (int64, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case ErrorReply:
return 0, ParseErrorReply(line)
case IntReply:
case RespInt, RespStatus:
return util.ParseInt(line[1:], 10, 64)
default:
case RespString:
s, err := r.readStringReply(line)
if err != nil {
return 0, err
}
return util.ParseInt([]byte(s), 10, 64)
case RespBigInt:
b, err := r.readBigInt(line)
if err != nil {
return 0, err
}
if !b.IsInt64() {
return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
}
return b.Int64(), nil
}
return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
}
func (r *Reader) ReadFloat() (float64, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case RespFloat:
return r.readFloat(line)
case RespStatus:
return strconv.ParseFloat(string(line[1:]), 64)
case RespString:
s, err := r.readStringReply(line)
if err != nil {
return 0, err
}
return strconv.ParseFloat(s, 64)
}
return 0, fmt.Errorf("redis: can't parse float reply: %.100q", line)
}
func (r *Reader) ReadString() (string, error) {
@ -142,191 +344,180 @@ func (r *Reader) ReadString() (string, error) {
if err != nil {
return "", err
}
switch line[0] {
case ErrorReply:
return "", ParseErrorReply(line)
case StringReply:
case RespStatus, RespInt, RespFloat:
return string(line[1:]), nil
case RespString:
return r.readStringReply(line)
case StatusReply:
return string(line[1:]), nil
case IntReply:
return string(line[1:]), nil
default:
case RespBool:
b, err := r.readBool(line)
return strconv.FormatBool(b), err
case RespVerbatim:
return r.readVerb(line)
case RespBigInt:
b, err := r.readBigInt(line)
if err != nil {
return "", err
}
return b.String(), nil
}
return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
}
}
func (r *Reader) readStringReply(line []byte) (string, error) {
if isNilReply(line) {
return "", Nil
}
replyLen, err := util.Atoi(line[1:])
func (r *Reader) ReadBool() (bool, error) {
s, err := r.ReadString()
if err != nil {
return "", err
return false, err
}
return s == "OK" || s == "1" || s == "true", nil
}
b := make([]byte, replyLen+2)
_, err = io.ReadFull(r.rd, b)
if err != nil {
return "", err
}
return util.BytesToString(b[:replyLen]), nil
}
func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
func (r *Reader) ReadSlice() ([]interface{}, error) {
line, err := r.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
return m(r, n)
default:
return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
return r.readSlice(line)
}
// ReadFixedArrayLen read fixed array length.
func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
n, err := r.ReadArrayLen()
if err != nil {
return err
}
if n != fixedLen {
return fmt.Errorf("redis: got %d elements of array length, wanted %d", n, fixedLen)
}
return nil
}
// ReadArrayLen Read and return the length of the array.
func (r *Reader) ReadArrayLen() (int, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case ErrorReply:
return 0, ParseErrorReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return 0, err
}
return int(n), nil
case RespArray, RespSet, RespPush:
return replyLen(line)
default:
return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
return 0, fmt.Errorf("redis: can't parse array(array/set/push) reply: %.100q", line)
}
}
func (r *Reader) ReadScanReply() ([]string, uint64, error) {
n, err := r.ReadArrayLen()
// ReadFixedMapLen read fixed map length.
func (r *Reader) ReadFixedMapLen(fixedLen int) error {
n, err := r.ReadMapLen()
if err != nil {
return nil, 0, err
return err
}
if n != 2 {
return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
if n != fixedLen {
return fmt.Errorf("redis: got %d elements of map length, wanted %d", n, fixedLen)
}
return nil
}
cursor, err := r.ReadUint()
if err != nil {
return nil, 0, err
}
n, err = r.ReadArrayLen()
if err != nil {
return nil, 0, err
}
keys := make([]string, n)
for i := 0; i < n; i++ {
key, err := r.ReadString()
if err != nil {
return nil, 0, err
}
keys[i] = key
}
return keys, cursor, err
}
func (r *Reader) ReadInt() (int64, error) {
b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
return util.ParseInt(b, 10, 64)
}
func (r *Reader) ReadUint() (uint64, error) {
b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
return util.ParseUint(b, 10, 64)
}
func (r *Reader) ReadFloatReply() (float64, error) {
b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
return util.ParseFloat(b, 64)
}
func (r *Reader) readTmpBytesReply() ([]byte, error) {
// ReadMapLen read the length of the map type.
// If responding to the array type (RespArray/RespSet/RespPush),
// it must be a multiple of 2 and return n/2.
// Other types will return an error.
func (r *Reader) ReadMapLen() (int, error) {
line, err := r.ReadLine()
if err != nil {
return nil, err
return 0, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case StringReply:
return r._readTmpBytesReply(line)
case StatusReply:
return line[1:], nil
case RespMap:
return replyLen(line)
case RespArray, RespSet, RespPush:
// Some commands and RESP2 protocol may respond to array types.
n, err := replyLen(line)
if err != nil {
return 0, err
}
if n%2 != 0 {
return 0, fmt.Errorf("redis: the length of the array must be a multiple of 2, got: %d", n)
}
return n / 2, nil
default:
return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
return 0, fmt.Errorf("redis: can't parse map reply: %.100q", line)
}
}
func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
if isNilReply(line) {
return nil, Nil
// Discard the data represented by line.
func (r *Reader) Discard(line []byte) (err error) {
if len(line) == 0 {
return errors.New("redis: invalid line")
}
switch line[0] {
case RespStatus, RespError, RespInt, RespNil, RespFloat, RespBool, RespBigInt:
return nil
}
replyLen, err := util.Atoi(line[1:])
n, err := replyLen(line)
if err != nil && err != Nil {
return err
}
switch line[0] {
case RespBlobError, RespString, RespVerbatim:
// +\r\n
_, err = r.rd.Discard(n + 2)
return err
case RespArray, RespSet, RespPush:
for i := 0; i < n; i++ {
if err = r.DiscardNext(); err != nil {
return err
}
}
return nil
case RespMap, RespAttr:
// Read key & value.
for i := 0; i < n*2; i++ {
if err = r.DiscardNext(); err != nil {
return err
}
}
return nil
}
return fmt.Errorf("redis: can't parse %.100q", line)
}
// DiscardNext read and discard the data represented by the next line.
func (r *Reader) DiscardNext() error {
line, err := r.readLine()
if err != nil {
return nil, err
return err
}
return r.Discard(line)
}
buf := r.buf(replyLen + 2)
_, err = io.ReadFull(r.rd, buf)
func replyLen(line []byte) (n int, err error) {
n, err = util.Atoi(line[1:])
if err != nil {
return nil, err
return 0, err
}
return buf[:replyLen], nil
if n < -1 {
return 0, fmt.Errorf("redis: invalid reply: %q", line)
}
func (r *Reader) buf(n int) []byte {
if n <= cap(r._buf) {
return r._buf[:n]
}
d := n - cap(r._buf)
r._buf = append(r._buf, make([]byte, d)...)
return r._buf
}
func isNilReply(b []byte) bool {
return len(b) == 3 &&
(b[0] == StringReply || b[0] == ArrayReply) &&
b[1] == '-' && b[2] == '1'
}
func ParseErrorReply(line []byte) error {
return RedisError(string(line[1:]))
}
func parseArrayLen(line []byte) (int64, error) {
if isNilReply(line) {
switch line[0] {
case RespString, RespVerbatim, RespBlobError,
RespArray, RespSet, RespPush, RespMap, RespAttr:
if n == -1 {
return 0, Nil
}
return util.ParseInt(line[1:], 10, 64)
}
return n, nil
}
// IsNilReply detect redis.Nil of RESP2.
func IsNilReply(line []byte) bool {
return len(line) == 3 &&
(line[0] == RespString || line[0] == RespArray) &&
line[1] == '-' && line[2] == '1'
}

View File

@ -9,23 +9,63 @@ import (
)
func BenchmarkReader_ParseReply_Status(b *testing.B) {
benchmarkParseReply(b, "+OK\r\n", nil, false)
benchmarkParseReply(b, "+OK\r\n", false)
}
func BenchmarkReader_ParseReply_Int(b *testing.B) {
benchmarkParseReply(b, ":1\r\n", nil, false)
benchmarkParseReply(b, ":1\r\n", false)
}
func BenchmarkReader_ParseReply_Float(b *testing.B) {
benchmarkParseReply(b, ",123.456\r\n", false)
}
func BenchmarkReader_ParseReply_Bool(b *testing.B) {
benchmarkParseReply(b, "#t\r\n", false)
}
func BenchmarkReader_ParseReply_BigInt(b *testing.B) {
benchmarkParseReply(b, "(3492890328409238509324850943850943825024385\r\n", false)
}
func BenchmarkReader_ParseReply_Error(b *testing.B) {
benchmarkParseReply(b, "-Error message\r\n", nil, true)
benchmarkParseReply(b, "-Error message\r\n", true)
}
func BenchmarkReader_ParseReply_Nil(b *testing.B) {
benchmarkParseReply(b, "_\r\n", true)
}
func BenchmarkReader_ParseReply_BlobError(b *testing.B) {
benchmarkParseReply(b, "!21\r\nSYNTAX invalid syntax", true)
}
func BenchmarkReader_ParseReply_String(b *testing.B) {
benchmarkParseReply(b, "$5\r\nhello\r\n", nil, false)
benchmarkParseReply(b, "$5\r\nhello\r\n", false)
}
func BenchmarkReader_ParseReply_Verb(b *testing.B) {
benchmarkParseReply(b, "$9\r\ntxt:hello\r\n", false)
}
func BenchmarkReader_ParseReply_Slice(b *testing.B) {
benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", multiBulkParse, false)
benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false)
}
func BenchmarkReader_ParseReply_Set(b *testing.B) {
benchmarkParseReply(b, "~2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false)
}
func BenchmarkReader_ParseReply_Push(b *testing.B) {
benchmarkParseReply(b, ">2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false)
}
func BenchmarkReader_ParseReply_Map(b *testing.B) {
benchmarkParseReply(b, "%2\r\n$5\r\nhello\r\n$5\r\nworld\r\n+key\r\n+value\r\n", false)
}
func BenchmarkReader_ParseReply_Attr(b *testing.B) {
benchmarkParseReply(b, "%1\r\n+key\r\n+value\r\n+hello\r\n", false)
}
func TestReader_ReadLine(t *testing.T) {
@ -43,7 +83,7 @@ func TestReader_ReadLine(t *testing.T) {
}
}
func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wanterr bool) {
func benchmarkParseReply(b *testing.B, reply string, wanterr bool) {
buf := new(bytes.Buffer)
for i := 0; i < b.N; i++ {
buf.WriteString(reply)
@ -52,21 +92,9 @@ func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wan
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := p.ReadReply(m)
_, err := p.ReadReply()
if !wanterr && err != nil {
b.Fatal(err)
}
}
}
func multiBulkParse(p *proto.Reader, n int64) (interface{}, error) {
vv := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ {
v, err := p.ReadReply(multiBulkParse)
if err != nil {
return nil, err
}
vv = append(vv, v)
}
return vv, nil
}

View File

@ -14,7 +14,7 @@ import (
type writer interface {
io.Writer
io.ByteWriter
// io.StringWriter
// WriteString implement io.StringWriter.
WriteString(s string) (n int, err error)
}
@ -35,7 +35,7 @@ func NewWriter(wr writer) *Writer {
}
func (w *Writer) WriteArgs(args []interface{}) error {
if err := w.WriteByte(ArrayReply); err != nil {
if err := w.WriteByte(RespArray); err != nil {
return err
}
@ -116,7 +116,7 @@ func (w *Writer) WriteArg(v interface{}) error {
}
func (w *Writer) bytes(b []byte) error {
if err := w.WriteByte(StringReply); err != nil {
if err := w.WriteByte(RespString); err != nil {
return err
}

View File

@ -270,7 +270,10 @@ func setupTCPConn(u *url.URL) (*Options, error) {
}
if u.Scheme == "rediss" {
o.TLSConfig = &tls.Config{ServerName: h}
o.TLSConfig = &tls.Config{
ServerName: h,
MinVersion: tls.VersionTLS12,
}
}
return setupConnParams(u, o)

View File

@ -3,8 +3,6 @@ package redis
import (
"context"
"sync"
"github.com/go-redis/redis/v8/internal/pool"
)
type pipelineExecer func(context.Context, []Cmder) error
@ -27,8 +25,7 @@ type Pipeliner interface {
Len() int
Do(ctx context.Context, args ...interface{}) *Cmd
Process(ctx context.Context, cmd Cmder) error
Close() error
Discard() error
Discard()
Exec(ctx context.Context) ([]Cmder, error)
}
@ -46,7 +43,6 @@ type Pipeline struct {
mu sync.Mutex
cmds []Cmder
closed bool
}
func (c *Pipeline) init() {
@ -77,29 +73,11 @@ func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
return nil
}
// Close closes the pipeline, releasing any open resources.
func (c *Pipeline) Close() error {
c.mu.Lock()
_ = c.discard()
c.closed = true
c.mu.Unlock()
return nil
}
// Discard resets the pipeline and discards queued commands.
func (c *Pipeline) Discard() error {
func (c *Pipeline) Discard() {
c.mu.Lock()
err := c.discard()
c.mu.Unlock()
return err
}
func (c *Pipeline) discard() error {
if c.closed {
return pool.ErrClosed
}
c.cmds = c.cmds[:0]
return nil
c.mu.Unlock()
}
// Exec executes all previously queued commands using one
@ -111,10 +89,6 @@ func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil, pool.ErrClosed
}
if len(c.cmds) == 0 {
return nil, nil
}
@ -129,9 +103,7 @@ func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]C
if err := fn(c); err != nil {
return nil, err
}
cmds, err := c.Exec(ctx)
_ = c.Close()
return cmds, err
return c.Exec(ctx)
}
func (c *Pipeline) Pipeline() Pipeliner {

View File

@ -72,7 +72,6 @@ var _ = Describe("pool", func() {
Expect(cmds).To(HaveLen(1))
Expect(ping.Err()).NotTo(HaveOccurred())
Expect(ping.Val()).To(Equal("PONG"))
Expect(pipe.Close()).NotTo(HaveOccurred())
})
pool := client.Pool()

View File

@ -1,7 +1,6 @@
package redis_test
import (
"context"
"io"
"net"
"sync"
@ -15,16 +14,11 @@ import (
var _ = Describe("PubSub", func() {
var client *redis.Client
var clientID int64
BeforeEach(func() {
opt := redisOptions()
opt.MinIdleConns = 0
opt.MaxConnAge = 0
opt.OnConnect = func(ctx context.Context, cn *redis.Conn) (err error) {
clientID, err = cn.ClientID(ctx).Result()
return err
}
client = redis.NewClient(opt)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
@ -421,30 +415,6 @@ var _ = Describe("PubSub", func() {
Expect(msg.Payload).To(Equal(string(bigVal)))
})
It("handles message payload slice with server-assisted client-size caching", func() {
pubsub := client.Subscribe(ctx, "__redis__:invalidate")
defer pubsub.Close()
client2 := redis.NewClient(redisOptions())
defer client2.Close()
err := client2.Do(ctx, "CLIENT", "TRACKING", "on", "REDIRECT", clientID).Err()
Expect(err).NotTo(HaveOccurred())
err = client2.Do(ctx, "GET", "mykey").Err()
Expect(err).To(Equal(redis.Nil))
err = client2.Do(ctx, "SET", "mykey", "myvalue").Err()
Expect(err).NotTo(HaveOccurred())
ch := pubsub.Channel()
var msg *redis.Message
Eventually(ch).Should(Receive(&msg))
Expect(msg.Channel).To(Equal("__redis__:invalidate"))
Expect(msg.PayloadSlice).To(Equal([]string{"mykey"}))
})
It("supports concurrent Ping and Receive", func() {
const N = 100

View File

@ -15,6 +15,7 @@ import (
// Nil reply returned by Redis when key does not exist.
const Nil = proto.Nil
// SetLogger set custom log
func SetLogger(logger internal.Logging) {
internal.Logger = logger
}
@ -232,8 +233,19 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(ctx, c.opt, connPool)
var auth bool
// The low version of redis-server does not support the hello command.
// For redis-server (<6.0) that does not support the Hello command,
// we continue to provide services with RESP2.
if err := conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err(); err == nil {
auth = true
} else if err.Error() != "ERR unknown command 'hello'" {
return err
}
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
if password != "" {
if !auth && password != "" {
if username != "" {
pipe.AuthACL(ctx, username, password)
} else {
@ -522,14 +534,8 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
return err
}
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
err := fmt.Errorf("redis: expected '*', but got line %q", line)
return err
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
}
return nil
@ -537,9 +543,11 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
//------------------------------------------------------------------------------
// Client is a Redis client representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
// Client is a Redis client representing a pool of zero or more underlying connections.
// It's safe for concurrent use by multiple goroutines.
//
// Client creates and frees connections automatically; it also maintains a free pool
// of idle connections. You can control the pool size with Config.PoolSize option.
type Client struct {
*baseClient
cmdable

View File

@ -136,17 +136,6 @@ var _ = Describe("Client", func() {
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})
It("should close pipeline without closing the client", func() {
pipeline := client.Pipeline()
Expect(pipeline.Close()).NotTo(HaveOccurred())
pipeline.Ping(ctx)
_, err := pipeline.Exec(ctx)
Expect(err).To(MatchError("redis: client is closed"))
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})
It("should close pubsub when client is closed", func() {
pubsub := client.Subscribe(ctx)
Expect(client.Close()).NotTo(HaveOccurred())
@ -157,12 +146,6 @@ var _ = Describe("Client", func() {
Expect(pubsub.Close()).NotTo(HaveOccurred())
})
It("should close pipeline when client is closed", func() {
pipeline := client.Pipeline()
Expect(client.Close()).NotTo(HaveOccurred())
Expect(pipeline.Close()).NotTo(HaveOccurred())
})
It("should select DB", func() {
db2 := redis.NewClient(&redis.Options{
Addr: redisAddr,

View File

@ -83,8 +83,8 @@ func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd {
}
// NewStringStringMapResult returns a StringStringMapCmd initialised with val and err for testing.
func NewStringStringMapResult(val map[string]string, err error) *StringStringMapCmd {
var cmd StringStringMapCmd
func NewStringStringMapResult(val map[string]string, err error) *MapStringStringCmd {
var cmd MapStringStringCmd
cmd.val = val
cmd.SetErr(err)
return &cmd

View File

@ -309,7 +309,7 @@ func (c *ringShards) Random() (*ringShard, error) {
return c.GetByKey(strconv.Itoa(rand.Int()))
}
// heartbeat monitors state of each shard in the ring.
// Heartbeat monitors state of each shard in the ring.
func (c *ringShards) Heartbeat(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()

View File

@ -123,7 +123,6 @@ var _ = Describe("Redis Ring", func() {
cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(100))
Expect(pipe.Close()).NotTo(HaveOccurred())
for _, cmd := range cmds {
Expect(cmd.Err()).NotTo(HaveOccurred())
@ -177,6 +176,7 @@ var _ = Describe("Redis Ring", func() {
It("can be initialized with a new client callback", func() {
opts := redisRingOptions()
opts.NewClient = func(name string, opt *redis.Options) *redis.Client {
opt.Username = "username1"
opt.Password = "password1"
return redis.NewClient(opt)
}
@ -184,7 +184,7 @@ var _ = Describe("Redis Ring", func() {
err := ring.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("ERR AUTH"))
Expect(err.Error()).To(ContainSubstring("WRONGPASS"))
})
})

View File

@ -335,8 +335,8 @@ func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *
return cmd
}
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "sentinels", name)
_ = c.Process(ctx, cmd)
return cmd
}
@ -368,8 +368,8 @@ func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
}
// Master shows the state and info of the specified master.
func (c *SentinelClient) Master(ctx context.Context, name string) *StringStringMapCmd {
cmd := NewStringStringMapCmd(ctx, "sentinel", "master", name)
func (c *SentinelClient) Master(ctx context.Context, name string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "sentinel", "master", name)
_ = c.Process(ctx, cmd)
return cmd
}
@ -382,8 +382,8 @@ func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
}
// Slaves shows a list of slaves for the specified master and their state.
func (c *SentinelClient) Slaves(ctx context.Context, name string) *SliceCmd {
cmd := NewSliceCmd(ctx, "sentinel", "slaves", name)
func (c *SentinelClient) Slaves(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "slaves", name)
_ = c.Process(ctx, cmd)
return cmd
}
@ -601,28 +601,12 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel
return parseSlaveAddrs(addrs, false)
}
func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
func parseSlaveAddrs(addrs []map[string]string, keepDisconnected bool) []string {
nodes := make([]string, 0, len(addrs))
for _, node := range addrs {
ip := ""
port := ""
flags := []string{}
lastkey := ""
isDown := false
for _, key := range node.([]interface{}) {
switch lastkey {
case "ip":
ip = key.(string)
case "port":
port = key.(string)
case "flags":
flags = strings.Split(key.(string), ",")
}
lastkey = key.(string)
}
for _, flag := range flags {
if flags, ok := node["flags"]; ok {
for _, flag := range strings.Split(flags, ",") {
switch flag {
case "s_down", "o_down":
isDown = true
@ -632,9 +616,9 @@ func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
}
}
}
if !isDown {
nodes = append(nodes, net.JoinHostPort(ip, port))
}
if !isDown && node["ip"] != "" && node["port"] != "" {
nodes = append(nodes, net.JoinHostPort(node["ip"], node["port"]))
}
}
@ -683,16 +667,13 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
return
}
for _, sentinel := range sentinels {
vals := sentinel.([]interface{})
var ip, port string
for i := 0; i < len(vals); i += 2 {
key := vals[i].(string)
switch key {
case "ip":
ip = vals[i+1].(string)
case "port":
port = vals[i+1].(string)
ip, ok := sentinel["ip"]
if !ok {
continue
}
port, ok := sentinel["port"]
if !ok {
continue
}
if ip != "" && port != "" {
sentinelAddr := net.JoinHostPort(ip, port)

View File

@ -185,7 +185,8 @@ var _ = Describe("NewFailoverClusterClient", func() {
}
// Create subscription.
ch := client.Subscribe(ctx, "foo").Channel()
sub := client.Subscribe(ctx, "foo")
ch := sub.Channel()
// Kill master.
err = master.Shutdown(ctx).Err()
@ -207,6 +208,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
}, "15s", "100ms").Should(Receive(&msg))
Expect(msg.Channel).To(Equal("foo"))
Expect(msg.Payload).To(Equal("hello"))
Expect(sub.Close()).NotTo(HaveOccurred())
_, err = startRedis(masterPort)
Expect(err).NotTo(HaveOccurred())

10
testdata/redis.conf vendored
View File

@ -1,10 +0,0 @@
# Minimal redis.conf
port 6379
daemonize no
dir .
save ""
appendonly yes
cluster-config-file nodes.conf
cluster-node-timeout 30000
maxclients 1001