feat: upgrade to Redis 7

This commit is contained in:
Vladimir Mihailenco 2022-06-04 17:25:12 +03:00
parent c98c5f0eeb
commit d09c27e604
9 changed files with 293 additions and 229 deletions

View File

@ -16,7 +16,7 @@ bench: testdeps
testdata/redis: testdata/redis:
mkdir -p $@ mkdir -p $@
wget -qO- https://download.redis.io/releases/redis-6.2.5.tar.gz | tar xvz --strip-components=1 -C $@ wget -qO- https://download.redis.io/releases/redis-7.0.0.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis testdata/redis/src/redis-server: testdata/redis
cd $< && make all cd $< && make all

View File

@ -69,6 +69,12 @@ And then install go-redis/v8 (note _v8_ in the import; omitting it is a popular
go get github.com/go-redis/redis/v8 go get github.com/go-redis/redis/v8
``` ```
If you need **Redis 7** support, install go-redis/v9:
```shell
go get github.com/go-redis/redis/v9
```
## Quickstart ## Quickstart
```go ```go

View File

@ -1769,19 +1769,35 @@ func (cmd *XAutoClaimCmd) String() string {
} }
func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error { func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error {
var err error n, err := rd.ReadArrayLen()
if err = rd.ReadFixedArrayLen(2); err != nil { if err != nil {
return err return err
} }
switch n {
case 2, // Redis 6
3: // Redis 7:
// ok
default:
return fmt.Errorf("redis: got %d elements in XAutoClaim reply, wanted 2/3", n)
}
cmd.start, err = rd.ReadString() cmd.start, err = rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
cmd.val, err = readXMessageSlice(rd) cmd.val, err = readXMessageSlice(rd)
if err != nil { if err != nil {
return err return err
} }
if n >= 3 {
if err := rd.DiscardNext(); err != nil {
return err
}
}
return nil return nil
} }
@ -1823,27 +1839,43 @@ func (cmd *XAutoClaimJustIDCmd) String() string {
} }
func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error { func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error {
var err error n, err := rd.ReadArrayLen()
if err = rd.ReadFixedArrayLen(2); err != nil { if err != nil {
return err return err
} }
switch n {
case 2, // Redis 6
3: // Redis 7:
// ok
default:
return fmt.Errorf("redis: got %d elements in XAutoClaimJustID reply, wanted 2/3", n)
}
cmd.start, err = rd.ReadString() cmd.start, err = rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
n, err := rd.ReadArrayLen()
nn, err := rd.ReadArrayLen()
if err != nil { if err != nil {
return err return err
} }
cmd.val = make([]string, n) cmd.val = make([]string, nn)
for i := 0; i < n; i++ { for i := 0; i < nn; i++ {
cmd.val[i], err = rd.ReadString() cmd.val[i], err = rd.ReadString()
if err != nil { if err != nil {
return err return err
} }
} }
if n >= 3 {
if err := rd.DiscardNext(); err != nil {
return err
}
}
return nil return nil
} }
@ -1939,6 +1971,8 @@ type XInfoGroup struct {
Consumers int64 Consumers int64
Pending int64 Pending int64
LastDeliveredID string LastDeliveredID string
EntriesRead int64
Lag int64
} }
var _ Cmder = (*XInfoGroupsCmd)(nil) var _ Cmder = (*XInfoGroupsCmd)(nil)
@ -1976,12 +2010,15 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
cmd.val = make([]XInfoGroup, n) cmd.val = make([]XInfoGroup, n)
for i := 0; i < len(cmd.val); i++ { for i := 0; i < len(cmd.val); i++ {
if err = rd.ReadFixedMapLen(4); err != nil { group := &cmd.val[i]
nn, err := rd.ReadMapLen()
if err != nil {
return err return err
} }
var key string var key string
for f := 0; f < 4; f++ { for j := 0; j < nn; j++ {
key, err = rd.ReadString() key, err = rd.ReadString()
if err != nil { if err != nil {
return err return err
@ -1989,18 +2026,37 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
switch key { switch key {
case "name": case "name":
cmd.val[i].Name, err = rd.ReadString() group.Name, err = rd.ReadString()
if err != nil {
return err
}
case "consumers": case "consumers":
cmd.val[i].Consumers, err = rd.ReadInt() group.Consumers, err = rd.ReadInt()
if err != nil {
return err
}
case "pending": case "pending":
cmd.val[i].Pending, err = rd.ReadInt() group.Pending, err = rd.ReadInt()
if err != nil {
return err
}
case "last-delivered-id": case "last-delivered-id":
cmd.val[i].LastDeliveredID, err = rd.ReadString() group.LastDeliveredID, err = rd.ReadString()
if err != nil {
return err
}
case "entries-read":
group.EntriesRead, err = rd.ReadInt()
if err != nil {
return err
}
case "lag":
group.Lag, err = rd.ReadInt()
if err != nil {
return err
}
default: default:
return fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key) return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
}
if err != nil {
return err
} }
} }
} }
@ -2016,13 +2072,16 @@ type XInfoStreamCmd struct {
} }
type XInfoStream struct { type XInfoStream struct {
Length int64 Length int64
RadixTreeKeys int64 RadixTreeKeys int64
RadixTreeNodes int64 RadixTreeNodes int64
Groups int64 Groups int64
LastGeneratedID string LastGeneratedID string
FirstEntry XMessage MaxDeletedEntryID string
LastEntry XMessage EntriesAdded int64
FirstEntry XMessage
LastEntry XMessage
RecordedFirstEntryID string
} }
var _ Cmder = (*XInfoStreamCmd)(nil) var _ Cmder = (*XInfoStreamCmd)(nil)
@ -2053,12 +2112,13 @@ func (cmd *XInfoStreamCmd) String() string {
} }
func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error { func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
if err := rd.ReadFixedMapLen(7); err != nil { n, err := rd.ReadMapLen()
if err != nil {
return err return err
} }
cmd.val = &XInfoStream{} cmd.val = &XInfoStream{}
for i := 0; i < 7; i++ { for i := 0; i < n; i++ {
key, err := rd.ReadString() key, err := rd.ReadString()
if err != nil { if err != nil {
return err return err
@ -2066,30 +2126,56 @@ func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
switch key { switch key {
case "length": case "length":
cmd.val.Length, err = rd.ReadInt() cmd.val.Length, err = rd.ReadInt()
if err != nil {
return err
}
case "radix-tree-keys": case "radix-tree-keys":
cmd.val.RadixTreeKeys, err = rd.ReadInt() cmd.val.RadixTreeKeys, err = rd.ReadInt()
if err != nil {
return err
}
case "radix-tree-nodes": case "radix-tree-nodes":
cmd.val.RadixTreeNodes, err = rd.ReadInt() cmd.val.RadixTreeNodes, err = rd.ReadInt()
if err != nil {
return err
}
case "groups": case "groups":
cmd.val.Groups, err = rd.ReadInt() cmd.val.Groups, err = rd.ReadInt()
if err != nil {
return err
}
case "last-generated-id": case "last-generated-id":
cmd.val.LastGeneratedID, err = rd.ReadString() cmd.val.LastGeneratedID, err = rd.ReadString()
if err != nil {
return err
}
case "max-deleted-entry-id":
cmd.val.MaxDeletedEntryID, err = rd.ReadString()
if err != nil {
return err
}
case "entries-added":
cmd.val.EntriesAdded, err = rd.ReadInt()
if err != nil {
return err
}
case "first-entry": case "first-entry":
cmd.val.FirstEntry, err = readXMessage(rd) cmd.val.FirstEntry, err = readXMessage(rd)
if err == Nil { if err != nil && err != Nil {
err = nil return err
} }
case "last-entry": case "last-entry":
cmd.val.LastEntry, err = readXMessage(rd) cmd.val.LastEntry, err = readXMessage(rd)
if err == Nil { if err != nil && err != Nil {
err = nil return err
}
case "recorded-first-entry-id":
cmd.val.RecordedFirstEntryID, err = rd.ReadString()
if err != nil {
return err
} }
default: default:
return fmt.Errorf("redis: unexpected content %s "+ return fmt.Errorf("redis: unexpected key %q in XINFO STREAM reply", key)
"in XINFO STREAM reply", key)
}
if err != nil {
return err
} }
} }
return nil return nil
@ -2103,17 +2189,22 @@ type XInfoStreamFullCmd struct {
} }
type XInfoStreamFull struct { type XInfoStreamFull struct {
Length int64 Length int64
RadixTreeKeys int64 RadixTreeKeys int64
RadixTreeNodes int64 RadixTreeNodes int64
LastGeneratedID string LastGeneratedID string
Entries []XMessage MaxDeletedEntryID string
Groups []XInfoStreamGroup EntriesAdded int64
Entries []XMessage
Groups []XInfoStreamGroup
RecordedFirstEntryID string
} }
type XInfoStreamGroup struct { type XInfoStreamGroup struct {
Name string Name string
LastDeliveredID string LastDeliveredID string
EntriesRead int64
Lag int64
PelCount int64 PelCount int64
Pending []XInfoStreamGroupPending Pending []XInfoStreamGroupPending
Consumers []XInfoStreamConsumer Consumers []XInfoStreamConsumer
@ -2167,13 +2258,14 @@ func (cmd *XInfoStreamFullCmd) String() string {
} }
func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error { func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error {
if err := rd.ReadFixedMapLen(6); err != nil { n, err := rd.ReadMapLen()
if err != nil {
return err return err
} }
cmd.val = &XInfoStreamFull{} cmd.val = &XInfoStreamFull{}
for i := 0; i < 6; i++ { for i := 0; i < n; i++ {
key, err := rd.ReadString() key, err := rd.ReadString()
if err != nil { if err != nil {
return err return err
@ -2182,22 +2274,51 @@ func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error {
switch key { switch key {
case "length": case "length":
cmd.val.Length, err = rd.ReadInt() cmd.val.Length, err = rd.ReadInt()
if err != nil {
return err
}
case "radix-tree-keys": case "radix-tree-keys":
cmd.val.RadixTreeKeys, err = rd.ReadInt() cmd.val.RadixTreeKeys, err = rd.ReadInt()
if err != nil {
return err
}
case "radix-tree-nodes": case "radix-tree-nodes":
cmd.val.RadixTreeNodes, err = rd.ReadInt() cmd.val.RadixTreeNodes, err = rd.ReadInt()
if err != nil {
return err
}
case "last-generated-id": case "last-generated-id":
cmd.val.LastGeneratedID, err = rd.ReadString() cmd.val.LastGeneratedID, err = rd.ReadString()
if err != nil {
return err
}
case "entries-added":
cmd.val.EntriesAdded, err = rd.ReadInt()
if err != nil {
return err
}
case "entries": case "entries":
cmd.val.Entries, err = readXMessageSlice(rd) cmd.val.Entries, err = readXMessageSlice(rd)
if err != nil {
return err
}
case "groups": case "groups":
cmd.val.Groups, err = readStreamGroups(rd) cmd.val.Groups, err = readStreamGroups(rd)
if err != nil {
return err
}
case "max-deleted-entry-id":
cmd.val.MaxDeletedEntryID, err = rd.ReadString()
if err != nil {
return err
}
case "recorded-first-entry-id":
cmd.val.RecordedFirstEntryID, err = rd.ReadString()
if err != nil {
return err
}
default: default:
return fmt.Errorf("redis: unexpected content %s "+ return fmt.Errorf("redis: unexpected key %q in XINFO STREAM FULL reply", key)
"in XINFO STREAM FULL reply", key)
}
if err != nil {
return err
} }
} }
return nil return nil
@ -2210,13 +2331,14 @@ func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) {
} }
groups := make([]XInfoStreamGroup, 0, n) groups := make([]XInfoStreamGroup, 0, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if err = rd.ReadFixedMapLen(5); err != nil { nn, err := rd.ReadMapLen()
if err != nil {
return nil, err return nil, err
} }
group := XInfoStreamGroup{} group := XInfoStreamGroup{}
for f := 0; f < 5; f++ { for j := 0; j < nn; j++ {
key, err := rd.ReadString() key, err := rd.ReadString()
if err != nil { if err != nil {
return nil, err return nil, err
@ -2225,21 +2347,41 @@ func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) {
switch key { switch key {
case "name": case "name":
group.Name, err = rd.ReadString() group.Name, err = rd.ReadString()
if err != nil {
return nil, err
}
case "last-delivered-id": case "last-delivered-id":
group.LastDeliveredID, err = rd.ReadString() group.LastDeliveredID, err = rd.ReadString()
if err != nil {
return nil, err
}
case "entries-read":
group.EntriesRead, err = rd.ReadInt()
if err != nil {
return nil, err
}
case "lag":
group.Lag, err = rd.ReadInt()
if err != nil {
return nil, err
}
case "pel-count": case "pel-count":
group.PelCount, err = rd.ReadInt() group.PelCount, err = rd.ReadInt()
if err != nil {
return nil, err
}
case "pending": case "pending":
group.Pending, err = readXInfoStreamGroupPending(rd) group.Pending, err = readXInfoStreamGroupPending(rd)
if err != nil {
return nil, err
}
case "consumers": case "consumers":
group.Consumers, err = readXInfoStreamConsumers(rd) group.Consumers, err = readXInfoStreamConsumers(rd)
if err != nil {
return nil, err
}
default: default:
return nil, fmt.Errorf("redis: unexpected content %s "+ return nil, fmt.Errorf("redis: unexpected key %q in XINFO STREAM FULL reply", key)
"in XINFO STREAM FULL reply", key)
}
if err != nil {
return nil, err
} }
} }
@ -2682,7 +2824,7 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
if nn >= 4 { if nn >= 4 {
networkingMetadata := make(map[string]string) networkingMetadata := make(map[string]string)
metadataLength, err := rd.ReadArrayLen() metadataLength, err := rd.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }

View File

@ -259,7 +259,7 @@ var _ = Describe("Commands", func() {
It("should Command", func() { It("should Command", func() {
cmds, err := client.Command(ctx).Result() cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(len(cmds)).To(BeNumerically("~", 200, 25)) Expect(len(cmds)).To(BeNumerically("~", 240, 25))
cmd := cmds["mget"] cmd := cmds["mget"]
Expect(cmd.Name).To(Equal("mget")) Expect(cmd.Name).To(Equal("mget"))
@ -272,7 +272,6 @@ var _ = Describe("Commands", func() {
cmd = cmds["ping"] cmd = cmds["ping"]
Expect(cmd.Name).To(Equal("ping")) Expect(cmd.Name).To(Equal("ping"))
Expect(cmd.Arity).To(Equal(int8(-1))) Expect(cmd.Arity).To(Equal(int8(-1)))
Expect(cmd.Flags).To(ContainElement("stale"))
Expect(cmd.Flags).To(ContainElement("fast")) Expect(cmd.Flags).To(ContainElement("fast"))
Expect(cmd.FirstKeyPos).To(Equal(int8(0))) Expect(cmd.FirstKeyPos).To(Equal(int8(0)))
Expect(cmd.LastKeyPos).To(Equal(int8(0))) Expect(cmd.LastKeyPos).To(Equal(int8(0)))
@ -281,7 +280,7 @@ var _ = Describe("Commands", func() {
}) })
Describe("debugging", func() { Describe("debugging", func() {
It("should DebugObject", func() { PIt("should DebugObject", func() {
err := client.DebugObject(ctx, "foo").Err() err := client.DebugObject(ctx, "foo").Err()
Expect(err).To(MatchError("ERR no such key")) Expect(err).To(MatchError("ERR no such key"))
@ -1309,7 +1308,7 @@ var _ = Describe("Commands", func() {
Get: true, Get: true,
} }
val, err := client.SetArgs(ctx, "key", "hello", args).Result() val, err := client.SetArgs(ctx, "key", "hello", args).Result()
Expect(err).To(Equal(proto.RedisError("ERR syntax error"))) Expect(err).To(Equal(redis.Nil))
Expect(val).To(Equal("")) Expect(val).To(Equal(""))
}) })
@ -1347,7 +1346,7 @@ var _ = Describe("Commands", func() {
Get: true, Get: true,
} }
val, err := client.SetArgs(ctx, "key", "hello", args).Result() val, err := client.SetArgs(ctx, "key", "hello", args).Result()
Expect(err).To(Equal(proto.RedisError("ERR syntax error"))) Expect(err).To(Equal(redis.Nil))
Expect(val).To(Equal("")) Expect(val).To(Equal(""))
}) })
@ -4838,13 +4837,22 @@ var _ = Describe("Commands", func() {
res.RadixTreeNodes = 0 res.RadixTreeNodes = 0
Expect(res).To(Equal(&redis.XInfoStream{ Expect(res).To(Equal(&redis.XInfoStream{
Length: 3, Length: 3,
RadixTreeKeys: 0, RadixTreeKeys: 0,
RadixTreeNodes: 0, RadixTreeNodes: 0,
Groups: 2, Groups: 2,
LastGeneratedID: "3-0", LastGeneratedID: "3-0",
FirstEntry: redis.XMessage{ID: "1-0", Values: map[string]interface{}{"uno": "un"}}, MaxDeletedEntryID: "0-0",
LastEntry: redis.XMessage{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}}, EntriesAdded: 3,
FirstEntry: redis.XMessage{
ID: "1-0",
Values: map[string]interface{}{"uno": "un"},
},
LastEntry: redis.XMessage{
ID: "3-0",
Values: map[string]interface{}{"tres": "troix"},
},
RecordedFirstEntryID: "1-0",
})) }))
// stream is empty // stream is empty
@ -4858,13 +4866,16 @@ var _ = Describe("Commands", func() {
res.RadixTreeNodes = 0 res.RadixTreeNodes = 0
Expect(res).To(Equal(&redis.XInfoStream{ Expect(res).To(Equal(&redis.XInfoStream{
Length: 0, Length: 0,
RadixTreeKeys: 0, RadixTreeKeys: 0,
RadixTreeNodes: 0, RadixTreeNodes: 0,
Groups: 2, Groups: 2,
LastGeneratedID: "3-0", LastGeneratedID: "3-0",
FirstEntry: redis.XMessage{}, MaxDeletedEntryID: "3-0",
LastEntry: redis.XMessage{}, EntriesAdded: 3,
FirstEntry: redis.XMessage{},
LastEntry: redis.XMessage{},
RecordedFirstEntryID: "0-0",
})) }))
}) })
@ -4892,115 +4903,14 @@ var _ = Describe("Commands", func() {
} }
} }
} }
Expect(res).To(Equal(&redis.XInfoStreamFull{
Length: 3,
RadixTreeKeys: 0,
RadixTreeNodes: 0,
LastGeneratedID: "3-0",
Entries: []redis.XMessage{
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
},
Groups: []redis.XInfoStreamGroup{
{
Name: "group1",
LastDeliveredID: "3-0",
PelCount: 3,
Pending: []redis.XInfoStreamGroupPending{
{
ID: "1-0",
Consumer: "consumer1",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
{
ID: "2-0",
Consumer: "consumer1",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
},
Consumers: []redis.XInfoStreamConsumer{
{
Name: "consumer1",
SeenTime: time.Time{},
PelCount: 2,
Pending: []redis.XInfoStreamConsumerPending{
{
ID: "1-0",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
{
ID: "2-0",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
},
},
{
Name: "consumer2",
SeenTime: time.Time{},
PelCount: 1,
Pending: []redis.XInfoStreamConsumerPending{
{
ID: "3-0",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
},
},
},
},
{
Name: "group2",
LastDeliveredID: "3-0",
PelCount: 2,
Pending: []redis.XInfoStreamGroupPending{
{
ID: "2-0",
Consumer: "consumer1",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
{
ID: "3-0",
Consumer: "consumer1",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
},
Consumers: []redis.XInfoStreamConsumer{
{
Name: "consumer1",
SeenTime: time.Time{},
PelCount: 2,
Pending: []redis.XInfoStreamConsumerPending{
{
ID: "2-0",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
{
ID: "3-0",
DeliveryTime: time.Time{},
DeliveryCount: 1,
},
},
},
},
},
},
}))
}) })
It("should XINFO GROUPS", func() { It("should XINFO GROUPS", func() {
res, err := client.XInfoGroups(ctx, "stream").Result() res, err := client.XInfoGroups(ctx, "stream").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XInfoGroup{ Expect(res).To(Equal([]redis.XInfoGroup{
{Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0"}, {Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0", EntriesRead: 3},
{Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0"}, {Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0", EntriesRead: 3},
})) }))
}) })

View File

@ -85,11 +85,11 @@ func (c *clusterState) IsConsistent(ctx context.Context) bool {
} }
func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []string { func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []string {
addrs, err := c.Slaves(ctx, name).Result() addrs, err := c.Replicas(ctx, name).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
name, err) name, err)
return []string{} return []string{}
} }
return parseSlaveAddrs(addrs, false) return parseReplicaAddrs(addrs, false)
} }

View File

@ -388,7 +388,7 @@ func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
return err return err
} }
if n != fixedLen { if n != fixedLen {
return fmt.Errorf("redis: got %d elements of array length, wanted %d", n, fixedLen) return fmt.Errorf("redis: got %d elements in the array, wanted %d", n, fixedLen)
} }
return nil return nil
} }
@ -407,19 +407,19 @@ func (r *Reader) ReadArrayLen() (int, error) {
} }
} }
// ReadFixedMapLen read fixed map length. // ReadFixedMapLen reads fixed map length.
func (r *Reader) ReadFixedMapLen(fixedLen int) error { func (r *Reader) ReadFixedMapLen(fixedLen int) error {
n, err := r.ReadMapLen() n, err := r.ReadMapLen()
if err != nil { if err != nil {
return err return err
} }
if n != fixedLen { if n != fixedLen {
return fmt.Errorf("redis: got %d elements of map length, wanted %d", n, fixedLen) return fmt.Errorf("redis: got %d elements in the map, wanted %d", n, fixedLen)
} }
return nil return nil
} }
// ReadMapLen read the length of the map type. // ReadMapLen reads the length of the map type.
// If responding to the array type (RespArray/RespSet/RespPush), // If responding to the array type (RespArray/RespSet/RespPush),
// it must be a multiple of 2 and return n/2. // it must be a multiple of 2 and return n/2.
// Other types will return an error. // Other types will return an error.

View File

@ -283,8 +283,9 @@ func (p *redisProcess) Close() error {
} }
var ( var (
redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server")) redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf")) redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf"))
redisSentinelConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "sentinel.conf"))
) )
func redisDir(port string) (string, error) { func redisDir(port string) (string, error) {
@ -306,7 +307,8 @@ func startRedis(port string, args ...string) (*redisProcess, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
if err := exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
return nil, err return nil, err
} }
@ -333,7 +335,12 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
return nil, err return nil, err
} }
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir) sentinelConf := filepath.Join(dir, "sentinel.conf")
if err := os.WriteFile(sentinelConf, nil, 0o644); err != nil {
return nil, err
}
process, err := execCmd(redisServerBin, sentinelConf, "--sentinel", "--port", port, "--dir", dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -355,7 +362,7 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
client.Process(ctx, cmd) client.Process(ctx, cmd)
if err := cmd.Err(); err != nil { if err := cmd.Err(); err != nil {
process.Kill() process.Kill()
return nil, err return nil, fmt.Errorf("%s failed: %w", cmd, err)
} }
} }

View File

@ -228,8 +228,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
var auth bool var auth bool
// The low version of redis-server does not support the hello command. // For redis-server <6.0 that does not support the Hello command,
// For redis-server (<6.0) that does not support the Hello command,
// we continue to provide services with RESP2. // we continue to provide services with RESP2.
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil { if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
auth = true auth = true

View File

@ -32,19 +32,19 @@ type FailoverOptions struct {
// authentication. // authentication.
SentinelPassword string SentinelPassword string
// Allows routing read-only commands to the closest master or slave node. // Allows routing read-only commands to the closest master or replica node.
// This option only works with NewFailoverClusterClient. // This option only works with NewFailoverClusterClient.
RouteByLatency bool RouteByLatency bool
// Allows routing read-only commands to the random master or slave node. // Allows routing read-only commands to the random master or replica node.
// This option only works with NewFailoverClusterClient. // This option only works with NewFailoverClusterClient.
RouteRandomly bool RouteRandomly bool
// Route all commands to slave read-only nodes. // Route all commands to replica read-only nodes.
SlaveOnly bool ReplicaOnly bool
// Use slaves disconnected with master when cannot get connected slaves // Use replicas disconnected with master when cannot get connected replicas
// Now, this option only works in RandomSlaveAddr function. // Now, this option only works in RandomReplicaAddr function.
UseDisconnectedSlaves bool UseDisconnectedReplicas bool
// Following options are copied from Options struct. // Following options are copied from Options struct.
@ -194,7 +194,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
} }
opt := failoverOpt.clientOptions() opt := failoverOpt.clientOptions()
opt.Dialer = masterSlaveDialer(failover) opt.Dialer = masterReplicaDialer(failover)
opt.init() opt.init()
connPool := newConnPool(opt) connPool := newConnPool(opt)
@ -217,15 +217,15 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
return &c return &c
} }
func masterSlaveDialer( func masterReplicaDialer(
failover *sentinelFailover, failover *sentinelFailover,
) func(ctx context.Context, network, addr string) (net.Conn, error) { ) func(ctx context.Context, network, addr string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) { return func(ctx context.Context, network, _ string) (net.Conn, error) {
var addr string var addr string
var err error var err error
if failover.opt.SlaveOnly { if failover.opt.ReplicaOnly {
addr, err = failover.RandomSlaveAddr(ctx) addr, err = failover.RandomReplicaAddr(ctx)
} else { } else {
addr, err = failover.MasterAddr(ctx) addr, err = failover.MasterAddr(ctx)
if err == nil { if err == nil {
@ -351,7 +351,7 @@ func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
// Reset resets all the masters with matching name. The pattern argument is a // Reset resets all the masters with matching name. The pattern argument is a
// glob-style pattern. The reset process clears any previous state in a master // glob-style pattern. The reset process clears any previous state in a master
// (including a failover in progress), and removes every slave and sentinel // (including a failover in progress), and removes every replica and sentinel
// already discovered and associated with the master. // already discovered and associated with the master.
func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd { func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
cmd := NewIntCmd(ctx, "sentinel", "reset", pattern) cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
@ -381,9 +381,9 @@ func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
return cmd return cmd
} }
// Slaves shows a list of slaves for the specified master and their state. // Replicas shows a list of replicas for the specified master and their state.
func (c *SentinelClient) Slaves(ctx context.Context, name string) *MapStringStringSliceCmd { func (c *SentinelClient) Replicas(ctx context.Context, name string) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "slaves", name) cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "replicas", name)
_ = c.Process(ctx, cmd) _ = c.Process(ctx, cmd)
return cmd return cmd
} }
@ -460,18 +460,18 @@ func (c *sentinelFailover) closeSentinel() error {
return firstErr return firstErr
} }
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) { func (c *sentinelFailover) RandomReplicaAddr(ctx context.Context) (string, error) {
if c.opt == nil { if c.opt == nil {
return "", errors.New("opt is nil") return "", errors.New("opt is nil")
} }
addresses, err := c.slaveAddrs(ctx, false) addresses, err := c.replicaAddrs(ctx, false)
if err != nil { if err != nil {
return "", err return "", err
} }
if len(addresses) == 0 && c.opt.UseDisconnectedSlaves { if len(addresses) == 0 && c.opt.UseDisconnectedReplicas {
addresses, err = c.slaveAddrs(ctx, true) addresses, err = c.replicaAddrs(ctx, true)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -528,13 +528,13 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
return "", errors.New("redis: all sentinels specified in configuration are unreachable") return "", errors.New("redis: all sentinels specified in configuration are unreachable")
} }
func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) { func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
c.mu.RLock() c.mu.RLock()
sentinel := c.sentinel sentinel := c.sentinel
c.mu.RUnlock() c.mu.RUnlock()
if sentinel != nil { if sentinel != nil {
addrs := c.getSlaveAddrs(ctx, sentinel) addrs := c.getReplicaAddrs(ctx, sentinel)
if len(addrs) > 0 { if len(addrs) > 0 {
return addrs, nil return addrs, nil
} }
@ -544,7 +544,7 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool)
defer c.mu.Unlock() defer c.mu.Unlock()
if c.sentinel != nil { if c.sentinel != nil {
addrs := c.getSlaveAddrs(ctx, c.sentinel) addrs := c.getReplicaAddrs(ctx, c.sentinel)
if len(addrs) > 0 { if len(addrs) > 0 {
return addrs, nil return addrs, nil
} }
@ -556,15 +556,15 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool)
for i, sentinelAddr := range c.sentinelAddrs { for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr)) sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result() replicas, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: Replicas master=%q failed: %s",
c.opt.MasterName, err) c.opt.MasterName, err)
_ = sentinel.Close() _ = sentinel.Close()
continue continue
} }
sentinelReachable = true sentinelReachable = true
addrs := parseSlaveAddrs(slaves, useDisconnected) addrs := parseReplicaAddrs(replicas, useDisconnected)
if len(addrs) == 0 { if len(addrs) == 0 {
continue continue
} }
@ -591,17 +591,17 @@ func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *Sentinel
return net.JoinHostPort(addr[0], addr[1]) return net.JoinHostPort(addr[0], addr[1])
} }
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string { func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *SentinelClient) []string {
addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result() addrs, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s", internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
c.opt.MasterName, err) c.opt.MasterName, err)
return []string{} return []string{}
} }
return parseSlaveAddrs(addrs, false) return parseReplicaAddrs(addrs, false)
} }
func parseSlaveAddrs(addrs []map[string]string, keepDisconnected bool) []string { func parseReplicaAddrs(addrs []map[string]string, keepDisconnected bool) []string {
nodes := make([]string, 0, len(addrs)) nodes := make([]string, 0, len(addrs))
for _, node := range addrs { for _, node := range addrs {
isDown := false isDown := false
@ -656,7 +656,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
c.sentinel = sentinel c.sentinel = sentinel
c.discoverSentinels(ctx) c.discoverSentinels(ctx)
c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done") c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+replica-reconf-done")
go c.listen(c.pubsub) go c.listen(c.pubsub)
} }
@ -723,7 +723,7 @@ func contains(slice []string, str string) bool {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// NewFailoverClusterClient returns a client that supports routing read-only commands // NewFailoverClusterClient returns a client that supports routing read-only commands
// to a slave node. // to a replica node.
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
copy(sentinelAddrs, failoverOpt.SentinelAddrs) copy(sentinelAddrs, failoverOpt.SentinelAddrs)
@ -744,14 +744,14 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
Addr: masterAddr, Addr: masterAddr,
}} }}
slaveAddrs, err := failover.slaveAddrs(ctx, false) replicaAddrs, err := failover.replicaAddrs(ctx, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, slaveAddr := range slaveAddrs { for _, replicaAddr := range replicaAddrs {
nodes = append(nodes, ClusterNode{ nodes = append(nodes, ClusterNode{
Addr: slaveAddr, Addr: replicaAddr,
}) })
} }