forked from mirror/redis
commit
f44b325a6d
5
Makefile
5
Makefile
|
@ -26,10 +26,9 @@ fmt:
|
||||||
goimports -w -local github.com/go-redis/redis ./
|
goimports -w -local github.com/go-redis/redis ./
|
||||||
|
|
||||||
go_mod_tidy:
|
go_mod_tidy:
|
||||||
go get -u && go mod tidy
|
|
||||||
set -e; for dir in $(PACKAGE_DIRS); do \
|
set -e; for dir in $(PACKAGE_DIRS); do \
|
||||||
echo "go mod tidy in $${dir}"; \
|
echo "go mod tidy in $${dir}"; \
|
||||||
(cd "$${dir}" && \
|
(cd "$${dir}" && \
|
||||||
go get -u && \
|
go get -u ./... && \
|
||||||
go mod tidy); \
|
go mod tidy -compat=1.17); \
|
||||||
done
|
done
|
||||||
|
|
|
@ -41,7 +41,7 @@ func NewClusterClientStub(resp []byte) *ClientStub {
|
||||||
|
|
||||||
client := NewClusterClient(&ClusterOptions{
|
client := NewClusterClient(&ClusterOptions{
|
||||||
PoolSize: 128,
|
PoolSize: 128,
|
||||||
Addrs: []string{"127.0.0.1:6379"},
|
Addrs: []string{":6379"},
|
||||||
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
return stub.stubConn(initHello), nil
|
return stub.stubConn(initHello), nil
|
||||||
},
|
},
|
||||||
|
@ -118,7 +118,7 @@ func BenchmarkDecode(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
benchmarks := []Benchmark{
|
benchmarks := []Benchmark{
|
||||||
{"single", NewClientStub},
|
{"server", NewClientStub},
|
||||||
{"cluster", NewClusterClientStub},
|
{"cluster", NewClusterClientStub},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
61
command.go
61
command.go
|
@ -83,7 +83,7 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
|
||||||
if info != nil {
|
if info != nil {
|
||||||
return int(info.FirstKeyPos)
|
return int(info.FirstKeyPos)
|
||||||
}
|
}
|
||||||
return 0
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func cmdString(cmd Cmder, val interface{}) string {
|
func cmdString(cmd Cmder, val interface{}) string {
|
||||||
|
@ -2649,13 +2649,14 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
|
||||||
|
|
||||||
// subtract start and end.
|
// subtract start and end.
|
||||||
nodes := make([]ClusterNode, n-2)
|
nodes := make([]ClusterNode, n-2)
|
||||||
|
|
||||||
for j := 0; j < len(nodes); j++ {
|
for j := 0; j < len(nodes); j++ {
|
||||||
nn, err := rd.ReadArrayLen()
|
nn, err := rd.ReadArrayLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if nn != 2 && nn != 3 {
|
if nn < 2 || nn > 4 {
|
||||||
return fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", nn)
|
return fmt.Errorf("got %d elements in cluster info address, expected 2, 3, or 4", n)
|
||||||
}
|
}
|
||||||
|
|
||||||
ip, err := rd.ReadString()
|
ip, err := rd.ReadString()
|
||||||
|
@ -2670,14 +2671,43 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
|
||||||
|
|
||||||
nodes[j].Addr = net.JoinHostPort(ip, port)
|
nodes[j].Addr = net.JoinHostPort(ip, port)
|
||||||
|
|
||||||
if nn == 3 {
|
if nn >= 3 {
|
||||||
id, err := rd.ReadString()
|
id, err := rd.ReadString()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
nodes[j].ID = id
|
nodes[j].ID = id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if nn >= 4 {
|
||||||
|
networkingMetadata := make(map[string]string)
|
||||||
|
|
||||||
|
metadataLength, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if metadataLength%2 != 0 {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"got %d elements in metadata, expected an even number", metadataLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < metadataLength; i += 2 {
|
||||||
|
key, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
value, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
networkingMetadata[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes[j].NetworkingMetadata = networkingMetadata
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.val[i] = ClusterSlot{
|
cmd.val[i] = ClusterSlot{
|
||||||
Start: int(start),
|
Start: int(start),
|
||||||
End: int(end),
|
End: int(end),
|
||||||
|
@ -3136,6 +3166,7 @@ func (cmd *CommandsInfoCmd) String() string {
|
||||||
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
||||||
const numArgRedis5 = 6
|
const numArgRedis5 = 6
|
||||||
const numArgRedis6 = 7
|
const numArgRedis6 = 7
|
||||||
|
const numArgRedis7 = 10
|
||||||
|
|
||||||
n, err := rd.ReadArrayLen()
|
n, err := rd.ReadArrayLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3148,8 +3179,12 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if nn != numArgRedis5 && nn != numArgRedis6 {
|
|
||||||
return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6/7", nn)
|
switch nn {
|
||||||
|
case numArgRedis5, numArgRedis6, numArgRedis7:
|
||||||
|
// ok
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6/7/10", nn)
|
||||||
}
|
}
|
||||||
|
|
||||||
cmdInfo := &CommandInfo{}
|
cmdInfo := &CommandInfo{}
|
||||||
|
@ -3200,7 +3235,7 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
cmdInfo.StepCount = int8(stepCount)
|
cmdInfo.StepCount = int8(stepCount)
|
||||||
|
|
||||||
if nn == numArgRedis6 {
|
if nn >= numArgRedis6 {
|
||||||
aclFlagLen, err := rd.ReadArrayLen()
|
aclFlagLen, err := rd.ReadArrayLen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -3218,6 +3253,18 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if nn >= numArgRedis7 {
|
||||||
|
if err := rd.DiscardNext(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := rd.DiscardNext(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := rd.DiscardNext(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cmd.val[cmdInfo.Name] = cmdInfo
|
cmd.val[cmdInfo.Name] = cmdInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -310,6 +310,7 @@ type Cmdable interface {
|
||||||
ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd
|
ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd
|
||||||
ClientList(ctx context.Context) *StringCmd
|
ClientList(ctx context.Context) *StringCmd
|
||||||
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
|
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
|
||||||
|
ClientUnpause(ctx context.Context) *BoolCmd
|
||||||
ClientID(ctx context.Context) *IntCmd
|
ClientID(ctx context.Context) *IntCmd
|
||||||
ClientUnblock(ctx context.Context, id int64) *IntCmd
|
ClientUnblock(ctx context.Context, id int64) *IntCmd
|
||||||
ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
|
ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
|
||||||
|
@ -2818,6 +2819,12 @@ func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd {
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c cmdable) ClientUnpause(ctx context.Context) *BoolCmd {
|
||||||
|
cmd := NewBoolCmd(ctx, "client", "unpause")
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c cmdable) ClientID(ctx context.Context) *IntCmd {
|
func (c cmdable) ClientID(ctx context.Context) *IntCmd {
|
||||||
cmd := NewIntCmd(ctx, "client", "id")
|
cmd := NewIntCmd(ctx, "client", "id")
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
|
|
@ -2,14 +2,16 @@ package redisotel_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-redis/redis/extra/redisotel/v8"
|
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/extra/redisotel/v8"
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNew(t *testing.T) {
|
func TestNew(t *testing.T) {
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -4,6 +4,7 @@ go 1.17
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/cespare/xxhash/v2 v2.1.2
|
github.com/cespare/xxhash/v2 v2.1.2
|
||||||
|
github.com/davecgh/go-spew v1.1.1
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
|
||||||
github.com/onsi/ginkgo v1.16.5
|
github.com/onsi/ginkgo v1.16.5
|
||||||
github.com/onsi/gomega v1.19.0
|
github.com/onsi/gomega v1.19.0
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -4,6 +4,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
|
||||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
|
||||||
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
|
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
|
||||||
|
|
||||||
package pool
|
package pool
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//go:build !linux && !darwin && !dragonfly && !freebsd && !netbsd && !openbsd && !solaris && !illumos
|
||||||
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
|
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
|
||||||
|
|
||||||
package pool
|
package pool
|
||||||
|
|
|
@ -2,9 +2,12 @@ package pool_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
@ -32,5 +35,89 @@ func perform(n int, cbs ...func(int)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func dummyDialer(context.Context) (net.Conn, error) {
|
func dummyDialer(context.Context) (net.Conn, error) {
|
||||||
return &net.TCPConn{}, nil
|
return newDummyConn(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDummyConn() net.Conn {
|
||||||
|
return &dummyConn{
|
||||||
|
rawConn: new(dummyRawConn),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ net.Conn = (*dummyConn)(nil)
|
||||||
|
_ syscall.Conn = (*dummyConn)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
type dummyConn struct {
|
||||||
|
rawConn *dummyRawConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SyscallConn() (syscall.RawConn, error) {
|
||||||
|
return d.rawConn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var errDummy = fmt.Errorf("dummyConn err")
|
||||||
|
|
||||||
|
func (d *dummyConn) Read(b []byte) (n int, err error) {
|
||||||
|
return 0, errDummy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) Write(b []byte) (n int, err error) {
|
||||||
|
return 0, errDummy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) Close() error {
|
||||||
|
d.rawConn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) LocalAddr() net.Addr {
|
||||||
|
return &net.TCPAddr{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) RemoteAddr() net.Addr {
|
||||||
|
return &net.TCPAddr{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SetDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SetReadDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ syscall.RawConn = (*dummyRawConn)(nil)
|
||||||
|
|
||||||
|
type dummyRawConn struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Control(f func(fd uintptr)) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
if d.closed {
|
||||||
|
return fmt.Errorf("dummyRawConn closed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyRawConn) Close() {
|
||||||
|
d.mu.Lock()
|
||||||
|
d.closed = true
|
||||||
|
d.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
|
@ -542,7 +542,7 @@ func (p *ConnPool) reapStaleConn() *Conn {
|
||||||
|
|
||||||
func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
||||||
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
|
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
|
||||||
return false
|
return connCheck(cn.netConn) != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -553,5 +553,5 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return connCheck(cn.netConn) != nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,7 +124,7 @@ func (r *Reader) ReadLine() ([]byte, error) {
|
||||||
return line, nil
|
return line, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// readLine that returns an error if:
|
// readLine returns an error if:
|
||||||
// - there is a pending read error;
|
// - there is a pending read error;
|
||||||
// - or line does not end with \r\n.
|
// - or line does not end with \r\n.
|
||||||
func (r *Reader) readLine() ([]byte, error) {
|
func (r *Reader) readLine() ([]byte, error) {
|
||||||
|
@ -403,7 +403,7 @@ func (r *Reader) ReadArrayLen() (int, error) {
|
||||||
case RespArray, RespSet, RespPush:
|
case RespArray, RespSet, RespPush:
|
||||||
return replyLen(line)
|
return replyLen(line)
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("redis: can't parse array(array/set/push) reply: %.100q", line)
|
return 0, fmt.Errorf("redis: can't parse array/set/push reply: %.100q", line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,6 +446,15 @@ func (r *Reader) ReadMapLen() (int, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DiscardNext read and discard the data represented by the next line.
|
||||||
|
func (r *Reader) DiscardNext() error {
|
||||||
|
line, err := r.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return r.Discard(line)
|
||||||
|
}
|
||||||
|
|
||||||
// Discard the data represented by line.
|
// Discard the data represented by line.
|
||||||
func (r *Reader) Discard(line []byte) (err error) {
|
func (r *Reader) Discard(line []byte) (err error) {
|
||||||
if len(line) == 0 {
|
if len(line) == 0 {
|
||||||
|
@ -486,15 +495,6 @@ func (r *Reader) Discard(line []byte) (err error) {
|
||||||
return fmt.Errorf("redis: can't parse %.100q", line)
|
return fmt.Errorf("redis: can't parse %.100q", line)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DiscardNext read and discard the data represented by the next line.
|
|
||||||
func (r *Reader) DiscardNext() error {
|
|
||||||
line, err := r.readLine()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return r.Discard(line)
|
|
||||||
}
|
|
||||||
|
|
||||||
func replyLen(line []byte) (n int, err error) {
|
func replyLen(line []byte) (n int, err error) {
|
||||||
n, err = util.Atoi(line[1:])
|
n, err = util.Atoi(line[1:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -515,7 +515,7 @@ func replyLen(line []byte) (n int, err error) {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsNilReply detect redis.Nil of RESP2.
|
// IsNilReply detects redis.Nil of RESP2.
|
||||||
func IsNilReply(line []byte) bool {
|
func IsNilReply(line []byte) bool {
|
||||||
return len(line) == 3 &&
|
return len(line) == 3 &&
|
||||||
(line[0] == RespString || line[0] == RespArray) &&
|
(line[0] == RespString || line[0] == RespArray) &&
|
||||||
|
|
|
@ -324,7 +324,7 @@ func startRedis(port string, args ...string) (*redisProcess, error) {
|
||||||
|
|
||||||
p := &redisProcess{process, client}
|
p := &redisProcess{process, client}
|
||||||
registerProcess(port, p)
|
registerProcess(port, p)
|
||||||
return p, err
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
||||||
|
|
|
@ -86,13 +86,14 @@ var _ = Describe("pool", func() {
|
||||||
cn.SetNetConn(&badConn{})
|
cn.SetNetConn(&badConn{})
|
||||||
client.Pool().Put(ctx, cn)
|
client.Pool().Put(ctx, cn)
|
||||||
|
|
||||||
err = client.Ping(ctx).Err()
|
|
||||||
Expect(err).To(MatchError("bad connection"))
|
|
||||||
|
|
||||||
val, err := client.Ping(ctx).Result()
|
val, err := client.Ping(ctx).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).To(Equal("PONG"))
|
Expect(val).To(Equal("PONG"))
|
||||||
|
|
||||||
|
val, err = client.Ping(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).To(Equal("PONG"))
|
||||||
|
|
||||||
pool := client.Pool()
|
pool := client.Pool()
|
||||||
Expect(pool.Len()).To(Equal(1))
|
Expect(pool.Len()).To(Equal(1))
|
||||||
Expect(pool.IdleLen()).To(Equal(1))
|
Expect(pool.IdleLen()).To(Equal(1))
|
||||||
|
|
12
redis.go
12
redis.go
|
@ -223,13 +223,6 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||||
username, password = c.opt.CredentialsProvider()
|
username, password = c.opt.CredentialsProvider()
|
||||||
}
|
}
|
||||||
|
|
||||||
if password == "" &&
|
|
||||||
c.opt.DB == 0 &&
|
|
||||||
!c.opt.readOnly &&
|
|
||||||
c.opt.OnConnect == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
connPool := pool.NewSingleConnPool(c.connPool, cn)
|
connPool := pool.NewSingleConnPool(c.connPool, cn)
|
||||||
conn := newConn(ctx, c.opt, connPool)
|
conn := newConn(ctx, c.opt, connPool)
|
||||||
|
|
||||||
|
@ -238,7 +231,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||||
// The low version of redis-server does not support the hello command.
|
// The low version of redis-server does not support the hello command.
|
||||||
// For redis-server (<6.0) that does not support the Hello command,
|
// For redis-server (<6.0) that does not support the Hello command,
|
||||||
// we continue to provide services with RESP2.
|
// we continue to provide services with RESP2.
|
||||||
if err := conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err(); err == nil {
|
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
|
||||||
auth = true
|
auth = true
|
||||||
} else if err.Error() != "ERR unknown command 'hello'" {
|
} else if err.Error() != "ERR unknown command 'hello'" {
|
||||||
return err
|
return err
|
||||||
|
@ -514,11 +507,12 @@ func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
|
||||||
}
|
}
|
||||||
|
|
||||||
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
|
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
|
||||||
// Parse queued replies.
|
// Parse +OK.
|
||||||
if err := statusCmd.readReply(rd); err != nil {
|
if err := statusCmd.readReply(rd); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse +QUEUED.
|
||||||
for range cmds {
|
for range cmds {
|
||||||
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
|
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -192,7 +192,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
err = master.Shutdown(ctx).Err()
|
err = master.Shutdown(ctx).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Eventually(func() error {
|
Eventually(func() error {
|
||||||
return sentinelMaster.Ping(ctx).Err()
|
return master.Ping(ctx).Err()
|
||||||
}, "15s", "100ms").Should(HaveOccurred())
|
}, "15s", "100ms").Should(HaveOccurred())
|
||||||
|
|
||||||
// Check that client picked up new master.
|
// Check that client picked up new master.
|
||||||
|
@ -223,7 +223,7 @@ var _ = Describe("SentinelAclAuth", func() {
|
||||||
|
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
var sentinel *redis.SentinelClient
|
var sentinel *redis.SentinelClient
|
||||||
var sentinels = func() []*redisProcess {
|
sentinels := func() []*redisProcess {
|
||||||
return []*redisProcess{sentinel1, sentinel2, sentinel3}
|
return []*redisProcess{sentinel1, sentinel2, sentinel3}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -142,9 +142,6 @@ var _ = Describe("Tx", func() {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = do()
|
|
||||||
Expect(err).To(MatchError("bad connection"))
|
|
||||||
|
|
||||||
err = do()
|
err = do()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue