fix: update COMMAND parser for Redis 7

This commit is contained in:
Vladimir Mihailenco 2022-06-04 16:07:28 +03:00
parent 3a722be811
commit b0bb514059
6 changed files with 40 additions and 27 deletions

View File

@ -41,7 +41,7 @@ func NewClusterClientStub(resp []byte) *ClientStub {
client := NewClusterClient(&ClusterOptions{ client := NewClusterClient(&ClusterOptions{
PoolSize: 128, PoolSize: 128,
Addrs: []string{"127.0.0.1:6379"}, Addrs: []string{":6379"},
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(initHello), nil return stub.stubConn(initHello), nil
}, },
@ -118,7 +118,7 @@ func BenchmarkDecode(b *testing.B) {
} }
benchmarks := []Benchmark{ benchmarks := []Benchmark{
{"single", NewClientStub}, {"server", NewClientStub},
{"cluster", NewClusterClientStub}, {"cluster", NewClusterClientStub},
} }

View File

@ -83,7 +83,7 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
if info != nil { if info != nil {
return int(info.FirstKeyPos) return int(info.FirstKeyPos)
} }
return 0 return 1
} }
func cmdString(cmd Cmder, val interface{}) string { func cmdString(cmd Cmder, val interface{}) string {
@ -3166,6 +3166,7 @@ func (cmd *CommandsInfoCmd) String() string {
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
const numArgRedis5 = 6 const numArgRedis5 = 6
const numArgRedis6 = 7 const numArgRedis6 = 7
const numArgRedis7 = 10
n, err := rd.ReadArrayLen() n, err := rd.ReadArrayLen()
if err != nil { if err != nil {
@ -3178,8 +3179,12 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
if err != nil { if err != nil {
return err return err
} }
if nn != numArgRedis5 && nn != numArgRedis6 {
return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6/7", nn) switch nn {
case numArgRedis5, numArgRedis6, numArgRedis7:
// ok
default:
return fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6/7/10", nn)
} }
cmdInfo := &CommandInfo{} cmdInfo := &CommandInfo{}
@ -3230,7 +3235,7 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
} }
cmdInfo.StepCount = int8(stepCount) cmdInfo.StepCount = int8(stepCount)
if nn == numArgRedis6 { if nn >= numArgRedis6 {
aclFlagLen, err := rd.ReadArrayLen() aclFlagLen, err := rd.ReadArrayLen()
if err != nil { if err != nil {
return err return err
@ -3248,6 +3253,18 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
} }
} }
if nn >= numArgRedis7 {
if err := rd.DiscardNext(); err != nil {
return err
}
if err := rd.DiscardNext(); err != nil {
return err
}
if err := rd.DiscardNext(); err != nil {
return err
}
}
cmd.val[cmdInfo.Name] = cmdInfo cmd.val[cmdInfo.Name] = cmdInfo
} }

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.17
require ( require (
github.com/cespare/xxhash/v2 v2.1.2 github.com/cespare/xxhash/v2 v2.1.2
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
github.com/onsi/ginkgo v1.16.5 github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.19.0 github.com/onsi/gomega v1.19.0

1
go.sum
View File

@ -4,6 +4,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=

View File

@ -124,7 +124,7 @@ func (r *Reader) ReadLine() ([]byte, error) {
return line, nil return line, nil
} }
// readLine that returns an error if: // readLine returns an error if:
// - there is a pending read error; // - there is a pending read error;
// - or line does not end with \r\n. // - or line does not end with \r\n.
func (r *Reader) readLine() ([]byte, error) { func (r *Reader) readLine() ([]byte, error) {
@ -403,7 +403,7 @@ func (r *Reader) ReadArrayLen() (int, error) {
case RespArray, RespSet, RespPush: case RespArray, RespSet, RespPush:
return replyLen(line) return replyLen(line)
default: default:
return 0, fmt.Errorf("redis: can't parse array(array/set/push) reply: %.100q", line) return 0, fmt.Errorf("redis: can't parse array/set/push reply: %.100q", line)
} }
} }
@ -446,6 +446,15 @@ func (r *Reader) ReadMapLen() (int, error) {
} }
} }
// DiscardNext read and discard the data represented by the next line.
func (r *Reader) DiscardNext() error {
line, err := r.readLine()
if err != nil {
return err
}
return r.Discard(line)
}
// Discard the data represented by line. // Discard the data represented by line.
func (r *Reader) Discard(line []byte) (err error) { func (r *Reader) Discard(line []byte) (err error) {
if len(line) == 0 { if len(line) == 0 {
@ -486,15 +495,6 @@ func (r *Reader) Discard(line []byte) (err error) {
return fmt.Errorf("redis: can't parse %.100q", line) return fmt.Errorf("redis: can't parse %.100q", line)
} }
// DiscardNext read and discard the data represented by the next line.
func (r *Reader) DiscardNext() error {
line, err := r.readLine()
if err != nil {
return err
}
return r.Discard(line)
}
func replyLen(line []byte) (n int, err error) { func replyLen(line []byte) (n int, err error) {
n, err = util.Atoi(line[1:]) n, err = util.Atoi(line[1:])
if err != nil { if err != nil {
@ -515,7 +515,7 @@ func replyLen(line []byte) (n int, err error) {
return n, nil return n, nil
} }
// IsNilReply detect redis.Nil of RESP2. // IsNilReply detects redis.Nil of RESP2.
func IsNilReply(line []byte) bool { func IsNilReply(line []byte) bool {
return len(line) == 3 && return len(line) == 3 &&
(line[0] == RespString || line[0] == RespArray) && (line[0] == RespString || line[0] == RespArray) &&

View File

@ -223,13 +223,6 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
username, password = c.opt.CredentialsProvider() username, password = c.opt.CredentialsProvider()
} }
if password == "" &&
c.opt.DB == 0 &&
!c.opt.readOnly &&
c.opt.OnConnect == nil {
return nil
}
connPool := pool.NewSingleConnPool(c.connPool, cn) connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(ctx, c.opt, connPool) conn := newConn(ctx, c.opt, connPool)
@ -238,7 +231,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
// The low version of redis-server does not support the hello command. // The low version of redis-server does not support the hello command.
// For redis-server (<6.0) that does not support the Hello command, // For redis-server (<6.0) that does not support the Hello command,
// we continue to provide services with RESP2. // we continue to provide services with RESP2.
if err := conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err(); err == nil { if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
auth = true auth = true
} else if err.Error() != "ERR unknown command 'hello'" { } else if err.Error() != "ERR unknown command 'hello'" {
return err return err
@ -514,11 +507,12 @@ func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
} }
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
// Parse queued replies. // Parse +OK.
if err := statusCmd.readReply(rd); err != nil { if err := statusCmd.readReply(rd); err != nil {
return err return err
} }
// Parse +QUEUED.
for range cmds { for range cmds {
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
return err return err