Add support for resp3 protocol (#1739)

* support resp3 protocol

Signed-off-by: monkey <golang@88.com>

* Upgrade mod version limit go1.14

https://github.com/go-redis/redis/issues/1715#issuecomment-820685614

Signed-off-by: monkey <golang@88.com>

* Remove the redundant check of ReadReply

Signed-off-by: monkey <golang@88.com>

* fix the problem

Signed-off-by: monkey <golang@88.com>

* workflows add v9

Signed-off-by: monkey <golang@88.com>

* update StringStringMapCmd to MapStringStringCmd

Signed-off-by: monkey <golang@88.com>
This commit is contained in:
monkey92t 2021-04-27 15:04:46 +08:00 committed by GitHub
parent 8d87a75fd6
commit 8ad01240a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1402 additions and 1136 deletions

View File

@ -4,7 +4,7 @@ on:
push:
branches: [master]
pull_request:
branches: [master]
branches: [master, v9]
jobs:
build:

View File

@ -18,14 +18,17 @@ type ClientStub struct {
resp []byte
}
var initHello = []byte("%1\r\n+proto\r\n:3\r\n")
func NewClientStub(resp []byte) *ClientStub {
stub := &ClientStub{
resp: resp,
}
stub.Cmdable = NewClient(&Options{
PoolSize: 128,
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(), nil
return stub.stubConn(initHello), nil
},
})
return stub
@ -40,7 +43,7 @@ func NewClusterClientStub(resp []byte) *ClientStub {
PoolSize: 128,
Addrs: []string{"127.0.0.1:6379"},
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(), nil
return stub.stubConn(initHello), nil
},
ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) {
return []ClusterSlot{
@ -65,18 +68,27 @@ func NewClusterClientStub(resp []byte) *ClientStub {
return stub
}
func (c *ClientStub) stubConn() *ConnStub {
func (c *ClientStub) stubConn(init []byte) *ConnStub {
return &ConnStub{
init: init,
resp: c.resp,
}
}
type ConnStub struct {
init []byte
resp []byte
pos int
}
func (c *ConnStub) Read(b []byte) (n int, err error) {
// Return conn.init()
if len(c.init) > 0 {
n = copy(b, c.init)
c.init = c.init[n:]
return n, nil
}
if len(c.resp) == 0 {
return 0, io.EOF
}

View File

@ -1392,12 +1392,7 @@ func (c *ClusterClient) txPipelineReadQueued(
return err
}
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
}

View File

@ -1182,16 +1182,17 @@ var _ = Describe("ClusterClient with unavailable Cluster", func() {
var client *redis.ClusterClient
BeforeEach(func() {
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
opt := redisClusterOptions()
opt.ReadTimeout = 250 * time.Millisecond
opt.WriteTimeout = 250 * time.Millisecond
opt.MaxRedirects = 1
client = cluster.newClusterClientUnstable(opt)
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
})
AfterEach(func() {

1616
command.go

File diff suppressed because it is too large Load Diff

View File

@ -158,7 +158,7 @@ type Cmdable interface {
HDel(ctx context.Context, key string, fields ...string) *IntCmd
HExists(ctx context.Context, key, field string) *BoolCmd
HGet(ctx context.Context, key, field string) *StringCmd
HGetAll(ctx context.Context, key string) *StringStringMapCmd
HGetAll(ctx context.Context, key string) *MapStringStringCmd
HIncrBy(ctx context.Context, key, field string, incr int64) *IntCmd
HIncrByFloat(ctx context.Context, key, field string, incr float64) *FloatCmd
HKeys(ctx context.Context, key string) *StringSliceCmd
@ -168,7 +168,8 @@ type Cmdable interface {
HMSet(ctx context.Context, key string, values ...interface{}) *BoolCmd
HSetNX(ctx context.Context, key, field string, value interface{}) *BoolCmd
HVals(ctx context.Context, key string) *StringSliceCmd
HRandField(ctx context.Context, key string, count int, withValues bool) *StringSliceCmd
HRandField(ctx context.Context, key string, count int) *StringSliceCmd
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
BLPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd
BRPop(ctx context.Context, timeout time.Duration, keys ...string) *StringSliceCmd
@ -274,7 +275,8 @@ type Cmdable interface {
ZRevRank(ctx context.Context, key, member string) *IntCmd
ZScore(ctx context.Context, key, member string) *FloatCmd
ZUnionStore(ctx context.Context, dest string, store *ZStore) *IntCmd
ZRandMember(ctx context.Context, key string, count int, withScores bool) *StringSliceCmd
ZRandMember(ctx context.Context, key string, count int) *StringSliceCmd
ZRandMemberWithScores(ctx context.Context, key string, count int) *ZSliceCmd
PFAdd(ctx context.Context, key string, els ...interface{}) *IntCmd
PFCount(ctx context.Context, keys ...string) *IntCmd
@ -287,7 +289,7 @@ type Cmdable interface {
ClientList(ctx context.Context) *StringCmd
ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
ClientID(ctx context.Context) *IntCmd
ConfigGet(ctx context.Context, parameter string) *SliceCmd
ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd
ConfigResetStat(ctx context.Context) *StatusCmd
ConfigSet(ctx context.Context, parameter, value string) *StatusCmd
ConfigRewrite(ctx context.Context) *StatusCmd
@ -358,6 +360,7 @@ type StatefulCmdable interface {
Select(ctx context.Context, index int) *StatusCmd
SwapDB(ctx context.Context, index1, index2 int) *StatusCmd
ClientSetName(ctx context.Context, name string) *BoolCmd
Hello(ctx context.Context, ver int, username, password, clientName string) *MapStringInterfaceCmd
}
var (
@ -413,6 +416,26 @@ func (c statefulCmdable) ClientSetName(ctx context.Context, name string) *BoolCm
return cmd
}
// Set the resp protocol used.
func (c statefulCmdable) Hello(ctx context.Context,
ver int, username, password, clientName string) *MapStringInterfaceCmd {
args := make([]interface{}, 0, 7)
args = append(args, "hello", ver)
if password != "" {
if username != "" {
args = append(args, "auth", username, password)
} else {
args = append(args, "auth", "default", password)
}
}
if clientName != "" {
args = append(args, "setname", clientName)
}
cmd := NewMapStringInterfaceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
//------------------------------------------------------------------------------
func (c cmdable) Command(ctx context.Context) *CommandsInfoCmd {
@ -440,7 +463,7 @@ func (c cmdable) Ping(ctx context.Context) *StatusCmd {
return cmd
}
func (c cmdable) Quit(ctx context.Context) *StatusCmd {
func (c cmdable) Quit(_ context.Context) *StatusCmd {
panic("not implemented")
}
@ -1138,8 +1161,8 @@ func (c cmdable) HGet(ctx context.Context, key, field string) *StringCmd {
return cmd
}
func (c cmdable) HGetAll(ctx context.Context, key string) *StringStringMapCmd {
cmd := NewStringStringMapCmd(ctx, "hgetall", key)
func (c cmdable) HGetAll(ctx context.Context, key string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "hgetall", key)
_ = c(ctx, cmd)
return cmd
}
@ -1222,16 +1245,15 @@ func (c cmdable) HVals(ctx context.Context, key string) *StringSliceCmd {
}
// redis-server version >= 6.2.0.
func (c cmdable) HRandField(ctx context.Context, key string, count int, withValues bool) *StringSliceCmd {
args := make([]interface{}, 0, 4)
func (c cmdable) HRandField(ctx context.Context, key string, count int) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "hrandfield", key, count)
_ = c(ctx, cmd)
return cmd
}
// Although count=0 is meaningless, redis accepts count=0.
args = append(args, "hrandfield", key, count)
if withValues {
args = append(args, "withvalues")
}
cmd := NewStringSliceCmd(ctx, args...)
// redis-server version >= 6.2.0.
func (c cmdable) HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd {
cmd := NewKeyValueSliceCmd(ctx, "hrandfield", key, count, "withvalues")
_ = c(ctx, cmd)
return cmd
}
@ -2316,17 +2338,16 @@ func (c cmdable) ZUnionStore(ctx context.Context, dest string, store *ZStore) *I
return cmd
}
// redis-server version >= 6.2.0.
func (c cmdable) ZRandMember(ctx context.Context, key string, count int, withScores bool) *StringSliceCmd {
args := make([]interface{}, 0, 4)
// ZRandMember redis-server version >= 6.2.0.
func (c cmdable) ZRandMember(ctx context.Context, key string, count int) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "zrandmember", key, count)
_ = c(ctx, cmd)
return cmd
}
// Although count=0 is meaningless, redis accepts count=0.
args = append(args, "zrandmember", key, count)
if withScores {
args = append(args, "withscores")
}
cmd := NewStringSliceCmd(ctx, args...)
// ZRandMemberWithScores redis-server version >= 6.2.0.
func (c cmdable) ZRandMemberWithScores(ctx context.Context, key string, count int) *ZSliceCmd {
cmd := NewZSliceCmd(ctx, "zrandmember", key, count, "withscores")
_ = c(ctx, cmd)
return cmd
}
@ -2431,8 +2452,8 @@ func (c cmdable) ClientUnblockWithError(ctx context.Context, id int64) *IntCmd {
return cmd
}
func (c cmdable) ConfigGet(ctx context.Context, parameter string) *SliceCmd {
cmd := NewSliceCmd(ctx, "config", "get", parameter)
func (c cmdable) ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "config", "get", parameter)
_ = c(ctx, cmd)
return cmd
}
@ -2553,7 +2574,7 @@ func (c cmdable) SlowLogGet(ctx context.Context, num int64) *SlowLogCmd {
return cmd
}
func (c cmdable) Sync(ctx context.Context) {
func (c cmdable) Sync(_ context.Context) {
panic("not implemented")
}

View File

@ -47,6 +47,17 @@ var _ = Describe("Commands", func() {
Expect(stats.IdleConns).To(Equal(uint32(1)))
})
It("should hello", func() {
cmds, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Hello(ctx, 3, "", "", "")
return nil
})
Expect(err).NotTo(HaveOccurred())
m, err := cmds[0].(*redis.MapStringInterfaceCmd).Result()
Expect(err).NotTo(HaveOccurred())
Expect(m["proto"]).To(Equal(int64(3)))
})
It("should Echo", func() {
pipe := client.Pipeline()
echo := pipe.Echo(ctx, "hello")
@ -182,10 +193,11 @@ var _ = Describe("Commands", func() {
It("should ConfigSet", func() {
configGet := client.ConfigGet(ctx, "maxmemory")
Expect(configGet.Err()).NotTo(HaveOccurred())
Expect(configGet.Val()).To(HaveLen(2))
Expect(configGet.Val()[0]).To(Equal("maxmemory"))
Expect(configGet.Val()).To(HaveLen(1))
_, ok := configGet.Val()["maxmemory"]
Expect(ok).To(BeTrue())
configSet := client.ConfigSet(ctx, "maxmemory", configGet.Val()[1].(string))
configSet := client.ConfigSet(ctx, "maxmemory", configGet.Val()["maxmemory"])
Expect(configSet.Err()).NotTo(HaveOccurred())
Expect(configSet.Val()).To(Equal("OK"))
})
@ -1839,18 +1851,20 @@ var _ = Describe("Commands", func() {
err = client.HSet(ctx, "hash", "key2", "hello2").Err()
Expect(err).NotTo(HaveOccurred())
v := client.HRandField(ctx, "hash", 1, false)
v := client.HRandField(ctx, "hash", 1)
Expect(v.Err()).NotTo(HaveOccurred())
Expect(v.Val()).To(Or(Equal([]string{"key1"}), Equal([]string{"key2"})))
v = client.HRandField(ctx, "hash", 0, false)
v = client.HRandField(ctx, "hash", 0)
Expect(v.Err()).NotTo(HaveOccurred())
Expect(v.Val()).To(HaveLen(0))
var slice []string
err = client.HRandField(ctx, "hash", 1, true).ScanSlice(&slice)
kv, err := client.HRandFieldWithValues(ctx, "hash", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(slice).To(Or(Equal([]string{"key1", "hello1"}), Equal([]string{"key2", "hello2"})))
Expect(kv).To(Or(
Equal([]redis.KeyValue{{Key: "key1", Value: "hello1"}}),
Equal([]redis.KeyValue{{Key: "key2", Value: "hello2"}}),
))
})
})
@ -3919,18 +3933,20 @@ var _ = Describe("Commands", func() {
err = client.ZAdd(ctx, "zset", &redis.Z{Score: 2, Member: "two"}).Err()
Expect(err).NotTo(HaveOccurred())
v := client.ZRandMember(ctx, "zset", 1, false)
v := client.ZRandMember(ctx, "zset", 1)
Expect(v.Err()).NotTo(HaveOccurred())
Expect(v.Val()).To(Or(Equal([]string{"one"}), Equal([]string{"two"})))
v = client.ZRandMember(ctx, "zset", 0, false)
v = client.ZRandMember(ctx, "zset", 0)
Expect(v.Err()).NotTo(HaveOccurred())
Expect(v.Val()).To(HaveLen(0))
var slice []string
err = client.ZRandMember(ctx, "zset", 1, true).ScanSlice(&slice)
kv, err := client.ZRandMemberWithScores(ctx, "zset", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(slice).To(Or(Equal([]string{"one", "1"}), Equal([]string{"two", "2"})))
Expect(kv).To(Or(
Equal([]redis.Z{{Member: "one", Score: 1}}),
Equal([]redis.Z{{Member: "two", Score: 2}}),
))
})
})
@ -4675,7 +4691,7 @@ var _ = Describe("Commands", func() {
old := client.ConfigGet(ctx, key).Val()
client.ConfigSet(ctx, key, "0")
defer client.ConfigSet(ctx, key, old[1].(string))
defer client.ConfigSet(ctx, key, old[key])
err := rdb.Do(ctx, "slowlog", "reset").Err()
Expect(err).NotTo(HaveOccurred())

View File

@ -276,9 +276,9 @@ func ExampleClient_ScanType() {
// Output: found 33 keys
}
// ExampleStringStringMapCmd_Scan shows how to scan the results of a map fetch
// ExampleMapStringStringCmd_Scan shows how to scan the results of a map fetch
// into a struct.
func ExampleStringStringMapCmd_Scan() {
func ExampleMapStringStringCmd_Scan() {
rdb.FlushDB(ctx)
err := rdb.HMSet(ctx, "map",
"name", "hello",
@ -615,7 +615,7 @@ func ExampleClient_SlowLogGet() {
old := rdb.ConfigGet(ctx, key).Val()
rdb.ConfigSet(ctx, key, "0")
defer rdb.ConfigSet(ctx, key, old[1].(string))
defer rdb.ConfigSet(ctx, key, old[key])
if err := rdb.Do(ctx, "slowlog", "reset").Err(); err != nil {
panic(err)

2
go.mod
View File

@ -1,6 +1,6 @@
module github.com/go-redis/redis/v8
go 1.13
go 1.14
require (
github.com/cespare/xxhash/v2 v2.1.1

View File

@ -2,20 +2,39 @@ package proto
import (
"bufio"
"errors"
"fmt"
"io"
"math"
"math/big"
"strconv"
"github.com/go-redis/redis/v8/internal/util"
)
const (
ErrorReply = '-'
StatusReply = '+'
IntReply = ':'
StringReply = '$'
ArrayReply = '*'
RespStatus = '+' // +<string>\r\n
RespError = '-' // -<string>\r\n
RespString = '$' // $<length>\r\n<bytes>\r\n
RespInt = ':' // :<number>\r\n
RespNil = '_' // _\r\n
RespFloat = ',' // ,<floating-point-number>\r\n (golang float)
RespBool = '#' // true: #t\r\n false: #f\r\n
RespBlobError = '!' // !<length>\r\n<bytes>\r\n
RespVerbatim = '=' // =<length>\r\nFORMAT:<bytes>\r\n
RespBigInt = '(' // (<big number>\r\n
RespArray = '*' // *<len>\r\n... (same as resp2)
RespMap = '%' // %<len>\r\n(key)\r\n(value)\r\n... (golang map)
RespSet = '~' // ~<len>\r\n... (same as Array)
RespAttr = '|' // |<len>\r\n(key)\r\n(value)\r\n... + command reply
RespPush = '>' // ><len>\r\n... (same as Array)
)
// Not used temporarily.
// Redis has not used these two data types for the time being, and will implement them later.
// Streamed = "EOF:"
// StreamedAggregated = '?'
//------------------------------------------------------------------------------
const Nil = RedisError("redis: nil")
@ -26,19 +45,19 @@ func (e RedisError) Error() string { return string(e) }
func (RedisError) RedisError() {}
func ParseErrorReply(line []byte) error {
return RedisError(line[1:])
}
//------------------------------------------------------------------------------
type MultiBulkParse func(*Reader, int64) (interface{}, error)
type Reader struct {
rd *bufio.Reader
_buf []byte
rd *bufio.Reader
}
func NewReader(rd io.Reader) *Reader {
return &Reader{
rd: bufio.NewReader(rd),
_buf: make([]byte, 64),
rd: bufio.NewReader(rd),
}
}
@ -54,14 +73,53 @@ func (r *Reader) Reset(rd io.Reader) {
r.rd.Reset(rd)
}
// PeekReplyType returns the data type of the next response without advancing the Reader,
// and discard the attribute type.
func (r *Reader) PeekReplyType() (byte, error) {
b, err := r.rd.Peek(1)
if err != nil {
return 0, err
}
if b[0] == RespAttr {
if err = r.DiscardNext(); err != nil {
return 0, err
}
return r.PeekReplyType()
}
return b[0], nil
}
// ReadLine Return a valid reply, it will check the protocol or redis error,
// and discard the attribute type.
func (r *Reader) ReadLine() ([]byte, error) {
line, err := r.readLine()
if err != nil {
return nil, err
}
if isNilReply(line) {
switch line[0] {
case RespError:
return nil, ParseErrorReply(line)
case RespNil:
return nil, Nil
case RespBlobError:
var blobErr string
blobErr, err = r.readStringReply(line)
if err == nil {
err = RedisError(blobErr)
}
return nil, err
case RespAttr:
if err = r.Discard(line); err != nil {
return nil, err
}
return r.ReadLine()
}
// Compatible with RESP2
if IsNilReply(line) {
return nil, Nil
}
return line, nil
}
@ -92,48 +150,192 @@ func (r *Reader) readLine() ([]byte, error) {
return b[:len(b)-2], nil
}
func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
func (r *Reader) ReadReply() (interface{}, error) {
line, err := r.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case StatusReply:
case RespStatus:
return string(line[1:]), nil
case IntReply:
case RespInt:
return util.ParseInt(line[1:], 10, 64)
case StringReply:
case RespFloat:
return r.readFloat(line)
case RespBool:
return r.readBool(line)
case RespBigInt:
return r.readBigInt(line)
case RespString:
return r.readStringReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
if m == nil {
err := fmt.Errorf("redis: got %.100q, but multi bulk parser is nil", line)
return nil, err
}
return m(r, n)
case RespVerbatim:
return r.readVerb(line)
case RespArray, RespSet, RespPush:
return r.readSlice(line)
case RespMap:
return r.readMap(line)
}
return nil, fmt.Errorf("redis: can't parse %.100q", line)
}
func (r *Reader) ReadIntReply() (int64, error) {
func (r *Reader) readFloat(line []byte) (float64, error) {
v := string(line[1:])
switch string(line[1:]) {
case "inf":
return math.Inf(1), nil
case "-inf":
return math.Inf(-1), nil
}
return strconv.ParseFloat(v, 64)
}
func (r *Reader) readBool(line []byte) (bool, error) {
switch string(line[1:]) {
case "t":
return true, nil
case "f":
return false, nil
}
return false, fmt.Errorf("redis: can't parse bool reply: %q", line)
}
func (r *Reader) readBigInt(line []byte) (*big.Int, error) {
i := new(big.Int)
if i, ok := i.SetString(string(line[1:]), 10); ok {
return i, nil
}
return nil, fmt.Errorf("redis: can't parse bigInt reply: %q", line)
}
func (r *Reader) readStringReply(line []byte) (string, error) {
n, err := replyLen(line)
if err != nil {
return "", err
}
b := make([]byte, n+2)
_, err = io.ReadFull(r.rd, b)
if err != nil {
return "", err
}
return util.BytesToString(b[:n]), nil
}
func (r *Reader) readVerb(line []byte) (string, error) {
s, err := r.readStringReply(line)
if err != nil {
return "", err
}
if len(s) < 4 || s[3] != ':' {
return "", fmt.Errorf("redis: can't parse verbatim string reply: %q", line)
}
return s[4:], nil
}
func (r *Reader) readSlice(line []byte) ([]interface{}, error) {
n, err := replyLen(line)
if err != nil {
return nil, err
}
val := make([]interface{}, n)
for i := 0; i < len(val); i++ {
v, err := r.ReadReply()
if err != nil {
if err == Nil {
val[i] = nil
continue
}
if err, ok := err.(RedisError); ok {
val[i] = err
continue
}
return nil, err
}
val[i] = v
}
return val, nil
}
func (r *Reader) readMap(line []byte) (map[interface{}]interface{}, error) {
n, err := replyLen(line)
if err != nil {
return nil, err
}
m := make(map[interface{}]interface{}, n)
for i := 0; i < n; i++ {
k, err := r.ReadReply()
if err != nil {
return nil, err
}
v, err := r.ReadReply()
if err != nil {
if err == Nil {
m[k] = nil
continue
}
if err, ok := err.(RedisError); ok {
m[k] = err
continue
}
return nil, err
}
m[k] = v
}
return m, nil
}
// -------------------------------
func (r *Reader) ReadInt() (int64, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case ErrorReply:
return 0, ParseErrorReply(line)
case IntReply:
case RespInt, RespStatus:
return util.ParseInt(line[1:], 10, 64)
default:
return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
case RespString:
s, err := r.readStringReply(line)
if err != nil {
return 0, err
}
return util.ParseInt([]byte(s), 10, 64)
case RespBigInt:
b, err := r.readBigInt(line)
if err != nil {
return 0, err
}
if !b.IsInt64() {
return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
}
return b.Int64(), nil
}
return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
}
func (r *Reader) ReadFloat() (float64, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case RespFloat:
return r.readFloat(line)
case RespStatus:
return strconv.ParseFloat(string(line[1:]), 64)
case RespString:
s, err := r.readStringReply(line)
if err != nil {
return 0, err
}
return strconv.ParseFloat(s, 64)
}
return 0, fmt.Errorf("redis: can't parse float reply: %.100q", line)
}
func (r *Reader) ReadString() (string, error) {
@ -141,191 +343,180 @@ func (r *Reader) ReadString() (string, error) {
if err != nil {
return "", err
}
switch line[0] {
case ErrorReply:
return "", ParseErrorReply(line)
case StringReply:
case RespStatus, RespInt, RespFloat:
return string(line[1:]), nil
case RespString:
return r.readStringReply(line)
case StatusReply:
return string(line[1:]), nil
case IntReply:
return string(line[1:]), nil
default:
return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
case RespBool:
b, err := r.readBool(line)
return strconv.FormatBool(b), err
case RespVerbatim:
return r.readVerb(line)
case RespBigInt:
b, err := r.readBigInt(line)
if err != nil {
return "", err
}
return b.String(), nil
}
return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
}
func (r *Reader) readStringReply(line []byte) (string, error) {
if isNilReply(line) {
return "", Nil
}
replyLen, err := util.Atoi(line[1:])
func (r *Reader) ReadBool() (bool, error) {
s, err := r.ReadString()
if err != nil {
return "", err
return false, err
}
b := make([]byte, replyLen+2)
_, err = io.ReadFull(r.rd, b)
if err != nil {
return "", err
}
return util.BytesToString(b[:replyLen]), nil
return s == "OK" || s == "1" || s == "true", nil
}
func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
func (r *Reader) ReadSlice() ([]interface{}, error) {
line, err := r.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
return m(r, n)
default:
return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
return r.readSlice(line)
}
// ReadFixedArrayLen read fixed array length.
func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
n, err := r.ReadArrayLen()
if err != nil {
return err
}
if n != fixedLen {
return fmt.Errorf("redis: got %d elements of array length, wanted %d", n, fixedLen)
}
return nil
}
// ReadArrayLen Read and return the length of the array.
func (r *Reader) ReadArrayLen() (int, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case ErrorReply:
return 0, ParseErrorReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
case RespArray, RespSet, RespPush:
return replyLen(line)
default:
return 0, fmt.Errorf("redis: can't parse array(array/set/push) reply: %.100q", line)
}
}
// ReadFixedMapLen read fixed map length.
func (r *Reader) ReadFixedMapLen(fixedLen int) error {
n, err := r.ReadMapLen()
if err != nil {
return err
}
if n != fixedLen {
return fmt.Errorf("redis: got %d elements of map length, wanted %d", n, fixedLen)
}
return nil
}
// ReadMapLen read the length of the map type.
// If responding to the array type (RespArray/RespSet/RespPush),
// it must be a multiple of 2 and return n/2.
// Other types will return an error.
func (r *Reader) ReadMapLen() (int, error) {
line, err := r.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case RespMap:
return replyLen(line)
case RespArray, RespSet, RespPush:
// Some commands and RESP2 protocol may respond to array types.
n, err := replyLen(line)
if err != nil {
return 0, err
}
return int(n), nil
default:
return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
}
func (r *Reader) ReadScanReply() ([]string, uint64, error) {
n, err := r.ReadArrayLen()
if err != nil {
return nil, 0, err
}
if n != 2 {
return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
}
cursor, err := r.ReadUint()
if err != nil {
return nil, 0, err
}
n, err = r.ReadArrayLen()
if err != nil {
return nil, 0, err
}
keys := make([]string, n)
for i := 0; i < n; i++ {
key, err := r.ReadString()
if err != nil {
return nil, 0, err
if n%2 != 0 {
return 0, fmt.Errorf("redis: the length of the array must be a multiple of 2, got: %d", n)
}
keys[i] = key
return n / 2, nil
default:
return 0, fmt.Errorf("redis: can't parse map reply: %.100q", line)
}
return keys, cursor, err
}
func (r *Reader) ReadInt() (int64, error) {
b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
return util.ParseInt(b, 10, 64)
}
func (r *Reader) ReadUint() (uint64, error) {
b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
return util.ParseUint(b, 10, 64)
}
func (r *Reader) ReadFloatReply() (float64, error) {
b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
return util.ParseFloat(b, 64)
}
func (r *Reader) readTmpBytesReply() ([]byte, error) {
line, err := r.ReadLine()
if err != nil {
return nil, err
// Discard the data represented by line.
func (r *Reader) Discard(line []byte) (err error) {
if len(line) == 0 {
return errors.New("redis: invalid line")
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case StringReply:
return r._readTmpBytesReply(line)
case StatusReply:
return line[1:], nil
default:
return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
case RespStatus, RespError, RespInt, RespNil, RespFloat, RespBool, RespBigInt:
return nil
}
n, err := replyLen(line)
if err != nil && err != Nil {
return err
}
switch line[0] {
case RespBlobError, RespString, RespVerbatim:
// +\r\n
_, err = r.rd.Discard(n + 2)
return err
case RespArray, RespSet, RespPush:
for i := 0; i < n; i++ {
if err = r.DiscardNext(); err != nil {
return err
}
}
return nil
case RespMap, RespAttr:
// Read key & value.
for i := 0; i < n*2; i++ {
if err = r.DiscardNext(); err != nil {
return err
}
}
return nil
}
return fmt.Errorf("redis: can't parse %.100q", line)
}
func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
if isNilReply(line) {
return nil, Nil
}
replyLen, err := util.Atoi(line[1:])
// DiscardNext read and discard the data represented by the next line.
func (r *Reader) DiscardNext() error {
line, err := r.readLine()
if err != nil {
return nil, err
return err
}
return r.Discard(line)
}
buf := r.buf(replyLen + 2)
_, err = io.ReadFull(r.rd, buf)
func replyLen(line []byte) (n int, err error) {
n, err = util.Atoi(line[1:])
if err != nil {
return nil, err
return 0, err
}
return buf[:replyLen], nil
}
func (r *Reader) buf(n int) []byte {
if n <= cap(r._buf) {
return r._buf[:n]
if n < -1 {
return 0, fmt.Errorf("redis: invalid reply: %q", line)
}
d := n - cap(r._buf)
r._buf = append(r._buf, make([]byte, d)...)
return r._buf
}
func isNilReply(b []byte) bool {
return len(b) == 3 &&
(b[0] == StringReply || b[0] == ArrayReply) &&
b[1] == '-' && b[2] == '1'
}
func ParseErrorReply(line []byte) error {
return RedisError(string(line[1:]))
}
func parseArrayLen(line []byte) (int64, error) {
if isNilReply(line) {
return 0, Nil
switch line[0] {
case RespString, RespVerbatim, RespBlobError,
RespArray, RespSet, RespPush, RespMap, RespAttr:
if n == -1 {
return 0, Nil
}
}
return util.ParseInt(line[1:], 10, 64)
return n, nil
}
// IsNilReply detect redis.Nil of RESP2.
func IsNilReply(line []byte) bool {
return len(line) == 3 &&
(line[0] == RespString || line[0] == RespArray) &&
line[1] == '-' && line[2] == '1'
}

View File

@ -9,23 +9,63 @@ import (
)
func BenchmarkReader_ParseReply_Status(b *testing.B) {
benchmarkParseReply(b, "+OK\r\n", nil, false)
benchmarkParseReply(b, "+OK\r\n", false)
}
func BenchmarkReader_ParseReply_Int(b *testing.B) {
benchmarkParseReply(b, ":1\r\n", nil, false)
benchmarkParseReply(b, ":1\r\n", false)
}
func BenchmarkReader_ParseReply_Float(b *testing.B) {
benchmarkParseReply(b, ",123.456\r\n", false)
}
func BenchmarkReader_ParseReply_Bool(b *testing.B) {
benchmarkParseReply(b, "#t\r\n", false)
}
func BenchmarkReader_ParseReply_BigInt(b *testing.B) {
benchmarkParseReply(b, "(3492890328409238509324850943850943825024385\r\n", false)
}
func BenchmarkReader_ParseReply_Error(b *testing.B) {
benchmarkParseReply(b, "-Error message\r\n", nil, true)
benchmarkParseReply(b, "-Error message\r\n", true)
}
func BenchmarkReader_ParseReply_Nil(b *testing.B) {
benchmarkParseReply(b, "_\r\n", true)
}
func BenchmarkReader_ParseReply_BlobError(b *testing.B) {
benchmarkParseReply(b, "!21\r\nSYNTAX invalid syntax", true)
}
func BenchmarkReader_ParseReply_String(b *testing.B) {
benchmarkParseReply(b, "$5\r\nhello\r\n", nil, false)
benchmarkParseReply(b, "$5\r\nhello\r\n", false)
}
func BenchmarkReader_ParseReply_Verb(b *testing.B) {
benchmarkParseReply(b, "$9\r\ntxt:hello\r\n", false)
}
func BenchmarkReader_ParseReply_Slice(b *testing.B) {
benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", multiBulkParse, false)
benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false)
}
func BenchmarkReader_ParseReply_Set(b *testing.B) {
benchmarkParseReply(b, "~2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false)
}
func BenchmarkReader_ParseReply_Push(b *testing.B) {
benchmarkParseReply(b, ">2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", false)
}
func BenchmarkReader_ParseReply_Map(b *testing.B) {
benchmarkParseReply(b, "%2\r\n$5\r\nhello\r\n$5\r\nworld\r\n+key\r\n+value\r\n", false)
}
func BenchmarkReader_ParseReply_Attr(b *testing.B) {
benchmarkParseReply(b, "%1\r\n+key\r\n+value\r\n+hello\r\n", false)
}
func TestReader_ReadLine(t *testing.T) {
@ -43,7 +83,7 @@ func TestReader_ReadLine(t *testing.T) {
}
}
func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wanterr bool) {
func benchmarkParseReply(b *testing.B, reply string, wanterr bool) {
buf := new(bytes.Buffer)
for i := 0; i < b.N; i++ {
buf.WriteString(reply)
@ -52,21 +92,9 @@ func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wan
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := p.ReadReply(m)
_, err := p.ReadReply()
if !wanterr && err != nil {
b.Fatal(err)
}
}
}
func multiBulkParse(p *proto.Reader, n int64) (interface{}, error) {
vv := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ {
v, err := p.ReadReply(multiBulkParse)
if err != nil {
return nil, err
}
vv = append(vv, v)
}
return vv, nil
}

View File

@ -34,7 +34,7 @@ func NewWriter(wr writer) *Writer {
}
func (w *Writer) WriteArgs(args []interface{}) error {
if err := w.WriteByte(ArrayReply); err != nil {
if err := w.WriteByte(RespArray); err != nil {
return err
}
@ -111,7 +111,7 @@ func (w *Writer) WriteArg(v interface{}) error {
}
func (w *Writer) bytes(b []byte) error {
if err := w.WriteByte(StringReply); err != nil {
if err := w.WriteByte(RespString); err != nil {
return err
}

View File

@ -1,7 +1,6 @@
package redis_test
import (
"context"
"io"
"net"
"sync"
@ -15,16 +14,11 @@ import (
var _ = Describe("PubSub", func() {
var client *redis.Client
var clientID int64
BeforeEach(func() {
opt := redisOptions()
opt.MinIdleConns = 0
opt.MaxConnAge = 0
opt.OnConnect = func(ctx context.Context, cn *redis.Conn) (err error) {
clientID, err = cn.ClientID(ctx).Result()
return err
}
client = redis.NewClient(opt)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
@ -421,30 +415,6 @@ var _ = Describe("PubSub", func() {
Expect(msg.Payload).To(Equal(string(bigVal)))
})
It("handles message payload slice with server-assisted client-size caching", func() {
pubsub := client.Subscribe(ctx, "__redis__:invalidate")
defer pubsub.Close()
client2 := redis.NewClient(redisOptions())
defer client2.Close()
err := client2.Do(ctx, "CLIENT", "TRACKING", "on", "REDIRECT", clientID).Err()
Expect(err).NotTo(HaveOccurred())
err = client2.Do(ctx, "GET", "mykey").Err()
Expect(err).To(Equal(redis.Nil))
err = client2.Do(ctx, "SET", "mykey", "myvalue").Err()
Expect(err).NotTo(HaveOccurred())
ch := pubsub.Channel()
var msg *redis.Message
Eventually(ch).Should(Receive(&msg))
Expect(msg.Channel).To(Equal("__redis__:invalidate"))
Expect(msg.PayloadSlice).To(Equal([]string{"mykey"}))
})
It("supports concurrent Ping and Receive", func() {
const N = 100

View File

@ -230,21 +230,21 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
}
cn.Inited = true
if c.opt.Password == "" &&
c.opt.DB == 0 &&
!c.opt.readOnly &&
c.opt.OnConnect == nil {
return nil
}
ctx, span := internal.StartSpan(ctx, "redis.init_conn")
defer span.End()
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(ctx, c.opt, connPool)
var auth bool
// The low version of redis-server does not support the hello command.
if conn.Hello(ctx, 3, c.opt.Username, c.opt.Password, "").Err() == nil {
auth = true
}
_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
if c.opt.Password != "" {
if !auth && c.opt.Password != "" {
if c.opt.Username != "" {
pipe.AuthACL(ctx, c.opt.Username, c.opt.Password)
} else {
@ -542,14 +542,8 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
return err
}
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
err := fmt.Errorf("redis: expected '*', but got line %q", line)
return err
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
}
return nil

View File

@ -83,8 +83,8 @@ func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd {
}
// NewStringStringMapResult returns a StringStringMapCmd initialised with val and err for testing.
func NewStringStringMapResult(val map[string]string, err error) *StringStringMapCmd {
var cmd StringStringMapCmd
func NewStringStringMapResult(val map[string]string, err error) *MapStringStringCmd {
var cmd MapStringStringCmd
cmd.val = val
cmd.SetErr(err)
return &cmd

View File

@ -177,6 +177,7 @@ var _ = Describe("Redis Ring", func() {
It("can be initialized with a new client callback", func() {
opts := redisRingOptions()
opts.NewClient = func(name string, opt *redis.Options) *redis.Client {
opt.Username = "username1"
opt.Password = "password1"
return redis.NewClient(opt)
}
@ -184,7 +185,7 @@ var _ = Describe("Redis Ring", func() {
err := ring.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("ERR AUTH"))
Expect(err.Error()).To(ContainSubstring("WRONGPASS"))
})
})

View File

@ -322,8 +322,8 @@ func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *
return cmd
}
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "sentinels", name)
_ = c.Process(ctx, cmd)
return cmd
}
@ -355,8 +355,8 @@ func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
}
// Master shows the state and info of the specified master.
func (c *SentinelClient) Master(ctx context.Context, name string) *StringStringMapCmd {
cmd := NewStringStringMapCmd(ctx, "sentinel", "master", name)
func (c *SentinelClient) Master(ctx context.Context, name string) *MapStringStringCmd {
cmd := NewMapStringStringCmd(ctx, "sentinel", "master", name)
_ = c.Process(ctx, cmd)
return cmd
}
@ -369,8 +369,8 @@ func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
}
// Slaves shows a list of slaves for the specified master and their state.
func (c *SentinelClient) Slaves(ctx context.Context, name string) *SliceCmd {
cmd := NewSliceCmd(ctx, "sentinel", "slaves", name)
func (c *SentinelClient) Slaves(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "slaves", name)
_ = c.Process(ctx, cmd)
return cmd
}
@ -588,40 +588,24 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel
return parseSlaveAddrs(addrs, false)
}
func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
func parseSlaveAddrs(addrs []map[string]string, keepDisconnected bool) []string {
nodes := make([]string, 0, len(addrs))
for _, node := range addrs {
ip := ""
port := ""
flags := []string{}
lastkey := ""
isDown := false
for _, key := range node.([]interface{}) {
switch lastkey {
case "ip":
ip = key.(string)
case "port":
port = key.(string)
case "flags":
flags = strings.Split(key.(string), ",")
}
lastkey = key.(string)
}
for _, flag := range flags {
switch flag {
case "s_down", "o_down":
isDown = true
case "disconnected":
if !keepDisconnected {
if flags, ok := node["flags"]; ok {
for _, flag := range strings.Split(flags, ",") {
switch flag {
case "s_down", "o_down":
isDown = true
case "disconnected":
if !keepDisconnected {
isDown = true
}
}
}
}
if !isDown {
nodes = append(nodes, net.JoinHostPort(ip, port))
if !isDown && node["ip"] != "" && node["port"] != "" {
nodes = append(nodes, net.JoinHostPort(node["ip"], node["port"]))
}
}
@ -670,16 +654,13 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
return
}
for _, sentinel := range sentinels {
vals := sentinel.([]interface{})
var ip, port string
for i := 0; i < len(vals); i += 2 {
key := vals[i].(string)
switch key {
case "ip":
ip = vals[i+1].(string)
case "port":
port = vals[i+1].(string)
}
ip, ok := sentinel["ip"]
if !ok {
continue
}
port, ok := sentinel["port"]
if !ok {
continue
}
if ip != "" && port != "" {
sentinelAddr := net.JoinHostPort(ip, port)

View File

@ -185,7 +185,8 @@ var _ = Describe("NewFailoverClusterClient", func() {
}
// Create subscription.
ch := client.Subscribe(ctx, "foo").Channel()
sub := client.Subscribe(ctx, "foo")
ch := sub.Channel()
// Kill master.
err = master.Shutdown(ctx).Err()
@ -207,6 +208,7 @@ var _ = Describe("NewFailoverClusterClient", func() {
}, "15s", "100ms").Should(Receive(&msg))
Expect(msg.Channel).To(Equal("foo"))
Expect(msg.Payload).To(Equal("hello"))
Expect(sub.Close()).NotTo(HaveOccurred())
_, err = startRedis(masterPort)
Expect(err).NotTo(HaveOccurred())