diff --git a/Makefile b/Makefile index a4cfe057..8dcddb24 100644 --- a/Makefile +++ b/Makefile @@ -26,10 +26,9 @@ fmt: goimports -w -local github.com/go-redis/redis ./ go_mod_tidy: - go get -u && go mod tidy set -e; for dir in $(PACKAGE_DIRS); do \ echo "go mod tidy in $${dir}"; \ (cd "$${dir}" && \ - go get -u && \ - go mod tidy); \ + go get -u ./... && \ + go mod tidy -compat=1.17); \ done diff --git a/bench_decode_test.go b/bench_decode_test.go index b07ad4ed..753130e2 100644 --- a/bench_decode_test.go +++ b/bench_decode_test.go @@ -41,7 +41,7 @@ func NewClusterClientStub(resp []byte) *ClientStub { client := NewClusterClient(&ClusterOptions{ PoolSize: 128, - Addrs: []string{"127.0.0.1:6379"}, + Addrs: []string{":6379"}, Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { return stub.stubConn(initHello), nil }, @@ -118,7 +118,7 @@ func BenchmarkDecode(b *testing.B) { } benchmarks := []Benchmark{ - {"single", NewClientStub}, + {"server", NewClientStub}, {"cluster", NewClusterClientStub}, } diff --git a/command.go b/command.go index a6beea66..1f4d646f 100644 --- a/command.go +++ b/command.go @@ -83,7 +83,7 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { if info != nil { return int(info.FirstKeyPos) } - return 0 + return 1 } func cmdString(cmd Cmder, val interface{}) string { @@ -2649,13 +2649,14 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { // subtract start and end. nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { nn, err := rd.ReadArrayLen() if err != nil { return err } - if nn != 2 && nn != 3 { - return fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", nn) + if nn < 2 || nn > 4 { + return fmt.Errorf("got %d elements in cluster info address, expected 2, 3, or 4", n) } ip, err := rd.ReadString() @@ -2670,14 +2671,43 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { nodes[j].Addr = net.JoinHostPort(ip, port) - if nn == 3 { + if nn >= 3 { id, err := rd.ReadString() if err != nil { return err } nodes[j].ID = id } + + if nn >= 4 { + networkingMetadata := make(map[string]string) + + metadataLength, err := rd.ReadArrayLen() + if err != nil { + return err + } + + if metadataLength%2 != 0 { + return fmt.Errorf( + "got %d elements in metadata, expected an even number", metadataLength) + } + + for i := 0; i < metadataLength; i += 2 { + key, err := rd.ReadString() + if err != nil { + return err + } + value, err := rd.ReadString() + if err != nil { + return err + } + networkingMetadata[key] = value + } + + nodes[j].NetworkingMetadata = networkingMetadata + } } + cmd.val[i] = ClusterSlot{ Start: int(start), End: int(end), @@ -3136,6 +3166,7 @@ func (cmd *CommandsInfoCmd) String() string { func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { const numArgRedis5 = 6 const numArgRedis6 = 7 + const numArgRedis7 = 10 n, err := rd.ReadArrayLen() if err != nil { @@ -3148,8 +3179,12 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { if err != nil { return err } - if nn != numArgRedis5 && nn != numArgRedis6 { - return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6/7", nn) + + switch nn { + case numArgRedis5, numArgRedis6, numArgRedis7: + // ok + default: + return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6/7/10", nn) } cmdInfo := &CommandInfo{} @@ -3200,7 +3235,7 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { } cmdInfo.StepCount = int8(stepCount) - if nn == numArgRedis6 { + if nn >= numArgRedis6 { aclFlagLen, err := rd.ReadArrayLen() if err != nil { return err @@ -3218,6 +3253,18 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { } } + if nn >= numArgRedis7 { + if err := rd.DiscardNext(); err != nil { + return err + } + if err := rd.DiscardNext(); err != nil { + return err + } + if err := rd.DiscardNext(); err != nil { + return err + } + } + cmd.val[cmdInfo.Name] = cmdInfo } diff --git a/commands.go b/commands.go index eb0757d1..60997a8f 100644 --- a/commands.go +++ b/commands.go @@ -310,6 +310,7 @@ type Cmdable interface { ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd ClientList(ctx context.Context) *StringCmd ClientPause(ctx context.Context, dur time.Duration) *BoolCmd + ClientUnpause(ctx context.Context) *BoolCmd ClientID(ctx context.Context) *IntCmd ClientUnblock(ctx context.Context, id int64) *IntCmd ClientUnblockWithError(ctx context.Context, id int64) *IntCmd @@ -2818,6 +2819,12 @@ func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd { return cmd } +func (c cmdable) ClientUnpause(ctx context.Context) *BoolCmd { + cmd := NewBoolCmd(ctx, "client", "unpause") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ClientID(ctx context.Context) *IntCmd { cmd := NewIntCmd(ctx, "client", "id") _ = c(ctx, cmd) diff --git a/extra/redisotel/redisotel_test.go b/extra/redisotel/redisotel_test.go index 68aacc4a..883dcf17 100644 --- a/extra/redisotel/redisotel_test.go +++ b/extra/redisotel/redisotel_test.go @@ -2,14 +2,16 @@ package redisotel_test import ( "context" - semconv "go.opentelemetry.io/otel/semconv/v1.7.0" "testing" - "github.com/go-redis/redis/extra/redisotel/v8" - "github.com/go-redis/redis/v8" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" + "go.opentelemetry.io/otel" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" + + "github.com/go-redis/redis/extra/redisotel/v8" + "github.com/go-redis/redis/v8" ) func TestNew(t *testing.T) { diff --git a/go.mod b/go.mod index 36eb8010..fdf1076b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.17 require ( github.com/cespare/xxhash/v2 v2.1.2 + github.com/davecgh/go-spew v1.1.1 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.19.0 diff --git a/go.sum b/go.sum index dfa31aac..53adb179 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= diff --git a/internal/pool/conn_check.go b/internal/pool/conn_check.go index 1afdd6fe..74680e4b 100644 --- a/internal/pool/conn_check.go +++ b/internal/pool/conn_check.go @@ -1,3 +1,4 @@ +//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos // +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos package pool @@ -46,4 +47,4 @@ func connCheck(conn net.Conn) error { } return sysErr -} \ No newline at end of file +} diff --git a/internal/pool/conn_check_dummy.go b/internal/pool/conn_check_dummy.go index e7d62808..9408446b 100644 --- a/internal/pool/conn_check_dummy.go +++ b/internal/pool/conn_check_dummy.go @@ -1,3 +1,4 @@ +//go:build !linux && !darwin && !dragonfly && !freebsd && !netbsd && !openbsd && !solaris && !illumos // +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos package pool @@ -6,4 +7,4 @@ import "net" func connCheck(conn net.Conn) error { return nil -} \ No newline at end of file +} diff --git a/internal/pool/main_test.go b/internal/pool/main_test.go index 2365dbc6..8ad16747 100644 --- a/internal/pool/main_test.go +++ b/internal/pool/main_test.go @@ -2,9 +2,12 @@ package pool_test import ( "context" + "fmt" "net" "sync" + "syscall" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -32,5 +35,89 @@ func perform(n int, cbs ...func(int)) { } func dummyDialer(context.Context) (net.Conn, error) { - return &net.TCPConn{}, nil + return newDummyConn(), nil +} + +func newDummyConn() net.Conn { + return &dummyConn{ + rawConn: new(dummyRawConn), + } +} + +var ( + _ net.Conn = (*dummyConn)(nil) + _ syscall.Conn = (*dummyConn)(nil) +) + +type dummyConn struct { + rawConn *dummyRawConn +} + +func (d *dummyConn) SyscallConn() (syscall.RawConn, error) { + return d.rawConn, nil +} + +var errDummy = fmt.Errorf("dummyConn err") + +func (d *dummyConn) Read(b []byte) (n int, err error) { + return 0, errDummy +} + +func (d *dummyConn) Write(b []byte) (n int, err error) { + return 0, errDummy +} + +func (d *dummyConn) Close() error { + d.rawConn.Close() + return nil +} + +func (d *dummyConn) LocalAddr() net.Addr { + return &net.TCPAddr{} +} + +func (d *dummyConn) RemoteAddr() net.Addr { + return &net.TCPAddr{} +} + +func (d *dummyConn) SetDeadline(t time.Time) error { + return nil +} + +func (d *dummyConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (d *dummyConn) SetWriteDeadline(t time.Time) error { + return nil +} + +var _ syscall.RawConn = (*dummyRawConn)(nil) + +type dummyRawConn struct { + mu sync.Mutex + closed bool +} + +func (d *dummyRawConn) Control(f func(fd uintptr)) error { + return nil +} + +func (d *dummyRawConn) Read(f func(fd uintptr) (done bool)) error { + d.mu.Lock() + defer d.mu.Unlock() + if d.closed { + return fmt.Errorf("dummyRawConn closed") + } + return nil +} + +func (d *dummyRawConn) Write(f func(fd uintptr) (done bool)) error { + return nil +} + +func (d *dummyRawConn) Close() { + d.mu.Lock() + d.closed = true + d.mu.Unlock() } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 44a4e779..3a6ecb27 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -542,7 +542,7 @@ func (p *ConnPool) reapStaleConn() *Conn { func (p *ConnPool) isStaleConn(cn *Conn) bool { if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { - return false + return connCheck(cn.netConn) != nil } now := time.Now() @@ -553,5 +553,5 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool { return true } - return false + return connCheck(cn.netConn) != nil } diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 3ceedca9..b330f594 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -124,7 +124,7 @@ func (r *Reader) ReadLine() ([]byte, error) { return line, nil } -// readLine that returns an error if: +// readLine returns an error if: // - there is a pending read error; // - or line does not end with \r\n. func (r *Reader) readLine() ([]byte, error) { @@ -403,7 +403,7 @@ func (r *Reader) ReadArrayLen() (int, error) { case RespArray, RespSet, RespPush: return replyLen(line) default: - return 0, fmt.Errorf("redis: can't parse array(array/set/push) reply: %.100q", line) + return 0, fmt.Errorf("redis: can't parse array/set/push reply: %.100q", line) } } @@ -446,6 +446,15 @@ func (r *Reader) ReadMapLen() (int, error) { } } +// DiscardNext read and discard the data represented by the next line. +func (r *Reader) DiscardNext() error { + line, err := r.readLine() + if err != nil { + return err + } + return r.Discard(line) +} + // Discard the data represented by line. func (r *Reader) Discard(line []byte) (err error) { if len(line) == 0 { @@ -486,15 +495,6 @@ func (r *Reader) Discard(line []byte) (err error) { return fmt.Errorf("redis: can't parse %.100q", line) } -// DiscardNext read and discard the data represented by the next line. -func (r *Reader) DiscardNext() error { - line, err := r.readLine() - if err != nil { - return err - } - return r.Discard(line) -} - func replyLen(line []byte) (n int, err error) { n, err = util.Atoi(line[1:]) if err != nil { @@ -515,7 +515,7 @@ func replyLen(line []byte) (n int, err error) { return n, nil } -// IsNilReply detect redis.Nil of RESP2. +// IsNilReply detects redis.Nil of RESP2. func IsNilReply(line []byte) bool { return len(line) == 3 && (line[0] == RespString || line[0] == RespArray) && diff --git a/main_test.go b/main_test.go index 5414310e..5529112e 100644 --- a/main_test.go +++ b/main_test.go @@ -324,7 +324,7 @@ func startRedis(port string, args ...string) (*redisProcess, error) { p := &redisProcess{process, client} registerProcess(port, p) - return p, err + return p, nil } func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { diff --git a/pool_test.go b/pool_test.go index e297b010..7be2ced7 100644 --- a/pool_test.go +++ b/pool_test.go @@ -86,13 +86,14 @@ var _ = Describe("pool", func() { cn.SetNetConn(&badConn{}) client.Pool().Put(ctx, cn) - err = client.Ping(ctx).Err() - Expect(err).To(MatchError("bad connection")) - val, err := client.Ping(ctx).Result() Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal("PONG")) + val, err = client.Ping(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("PONG")) + pool := client.Pool() Expect(pool.Len()).To(Equal(1)) Expect(pool.IdleLen()).To(Equal(1)) diff --git a/redis.go b/redis.go index e5d0eb1c..2e4724bc 100644 --- a/redis.go +++ b/redis.go @@ -223,13 +223,6 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { username, password = c.opt.CredentialsProvider() } - if password == "" && - c.opt.DB == 0 && - !c.opt.readOnly && - c.opt.OnConnect == nil { - return nil - } - connPool := pool.NewSingleConnPool(c.connPool, cn) conn := newConn(ctx, c.opt, connPool) @@ -238,7 +231,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { // The low version of redis-server does not support the hello command. // For redis-server (<6.0) that does not support the Hello command, // we continue to provide services with RESP2. - if err := conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err(); err == nil { + if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil { auth = true } else if err.Error() != "ERR unknown command 'hello'" { return err @@ -514,11 +507,12 @@ func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder { } func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { - // Parse queued replies. + // Parse +OK. if err := statusCmd.readReply(rd); err != nil { return err } + // Parse +QUEUED. for range cmds { if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { return err diff --git a/sentinel_test.go b/sentinel_test.go index cc56cbab..d0fa9c95 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -192,7 +192,7 @@ var _ = Describe("NewFailoverClusterClient", func() { err = master.Shutdown(ctx).Err() Expect(err).NotTo(HaveOccurred()) Eventually(func() error { - return sentinelMaster.Ping(ctx).Err() + return master.Ping(ctx).Err() }, "15s", "100ms").Should(HaveOccurred()) // Check that client picked up new master. @@ -223,7 +223,7 @@ var _ = Describe("SentinelAclAuth", func() { var client *redis.Client var sentinel *redis.SentinelClient - var sentinels = func() []*redisProcess { + sentinels := func() []*redisProcess { return []*redisProcess{sentinel1, sentinel2, sentinel3} } diff --git a/tx_test.go b/tx_test.go index 7deb2dfd..030a56cc 100644 --- a/tx_test.go +++ b/tx_test.go @@ -142,9 +142,6 @@ var _ = Describe("Tx", func() { return err } - err = do() - Expect(err).To(MatchError("bad connection")) - err = do() Expect(err).NotTo(HaveOccurred()) })