forked from mirror/redis
Merge pull request #2110 from go-redis/feat/redis-7
feat: upgrade to Redis 7
This commit is contained in:
commit
a8e56a2961
2
Makefile
2
Makefile
|
@ -16,7 +16,7 @@ bench: testdeps
|
|||
|
||||
testdata/redis:
|
||||
mkdir -p $@
|
||||
wget -qO- https://download.redis.io/releases/redis-6.2.5.tar.gz | tar xvz --strip-components=1 -C $@
|
||||
wget -qO- https://download.redis.io/releases/redis-7.0.0.tar.gz | tar xvz --strip-components=1 -C $@
|
||||
|
||||
testdata/redis/src/redis-server: testdata/redis
|
||||
cd $< && make all
|
||||
|
|
|
@ -69,6 +69,12 @@ And then install go-redis/v8 (note _v8_ in the import; omitting it is a popular
|
|||
go get github.com/go-redis/redis/v8
|
||||
```
|
||||
|
||||
If you need **Redis 7** support, install go-redis/v9:
|
||||
|
||||
```shell
|
||||
go get github.com/go-redis/redis/v9
|
||||
```
|
||||
|
||||
## Quickstart
|
||||
|
||||
```go
|
||||
|
|
256
command.go
256
command.go
|
@ -1769,19 +1769,35 @@ func (cmd *XAutoClaimCmd) String() string {
|
|||
}
|
||||
|
||||
func (cmd *XAutoClaimCmd) readReply(rd *proto.Reader) error {
|
||||
var err error
|
||||
if err = rd.ReadFixedArrayLen(2); err != nil {
|
||||
n, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch n {
|
||||
case 2, // Redis 6
|
||||
3: // Redis 7:
|
||||
// ok
|
||||
default:
|
||||
return fmt.Errorf("redis: got %d elements in XAutoClaim reply, wanted 2/3", n)
|
||||
}
|
||||
|
||||
cmd.start, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.val, err = readXMessageSlice(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if n >= 3 {
|
||||
if err := rd.DiscardNext(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1823,27 +1839,43 @@ func (cmd *XAutoClaimJustIDCmd) String() string {
|
|||
}
|
||||
|
||||
func (cmd *XAutoClaimJustIDCmd) readReply(rd *proto.Reader) error {
|
||||
var err error
|
||||
if err = rd.ReadFixedArrayLen(2); err != nil {
|
||||
n, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch n {
|
||||
case 2, // Redis 6
|
||||
3: // Redis 7:
|
||||
// ok
|
||||
default:
|
||||
return fmt.Errorf("redis: got %d elements in XAutoClaimJustID reply, wanted 2/3", n)
|
||||
}
|
||||
|
||||
cmd.start, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := rd.ReadArrayLen()
|
||||
|
||||
nn, err := rd.ReadArrayLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.val = make([]string, n)
|
||||
for i := 0; i < n; i++ {
|
||||
cmd.val = make([]string, nn)
|
||||
for i := 0; i < nn; i++ {
|
||||
cmd.val[i], err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if n >= 3 {
|
||||
if err := rd.DiscardNext(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1939,6 +1971,8 @@ type XInfoGroup struct {
|
|||
Consumers int64
|
||||
Pending int64
|
||||
LastDeliveredID string
|
||||
EntriesRead int64
|
||||
Lag int64
|
||||
}
|
||||
|
||||
var _ Cmder = (*XInfoGroupsCmd)(nil)
|
||||
|
@ -1976,12 +2010,15 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
|
|||
cmd.val = make([]XInfoGroup, n)
|
||||
|
||||
for i := 0; i < len(cmd.val); i++ {
|
||||
if err = rd.ReadFixedMapLen(4); err != nil {
|
||||
group := &cmd.val[i]
|
||||
|
||||
nn, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var key string
|
||||
for f := 0; f < 4; f++ {
|
||||
for j := 0; j < nn; j++ {
|
||||
key, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1989,18 +2026,37 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
|
|||
|
||||
switch key {
|
||||
case "name":
|
||||
cmd.val[i].Name, err = rd.ReadString()
|
||||
group.Name, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "consumers":
|
||||
cmd.val[i].Consumers, err = rd.ReadInt()
|
||||
group.Consumers, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "pending":
|
||||
cmd.val[i].Pending, err = rd.ReadInt()
|
||||
group.Pending, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "last-delivered-id":
|
||||
cmd.val[i].LastDeliveredID, err = rd.ReadString()
|
||||
group.LastDeliveredID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "entries-read":
|
||||
group.EntriesRead, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "lag":
|
||||
group.Lag, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("redis: unexpected content %s in XINFO GROUPS reply", key)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2016,13 +2072,16 @@ type XInfoStreamCmd struct {
|
|||
}
|
||||
|
||||
type XInfoStream struct {
|
||||
Length int64
|
||||
RadixTreeKeys int64
|
||||
RadixTreeNodes int64
|
||||
Groups int64
|
||||
LastGeneratedID string
|
||||
FirstEntry XMessage
|
||||
LastEntry XMessage
|
||||
Length int64
|
||||
RadixTreeKeys int64
|
||||
RadixTreeNodes int64
|
||||
Groups int64
|
||||
LastGeneratedID string
|
||||
MaxDeletedEntryID string
|
||||
EntriesAdded int64
|
||||
FirstEntry XMessage
|
||||
LastEntry XMessage
|
||||
RecordedFirstEntryID string
|
||||
}
|
||||
|
||||
var _ Cmder = (*XInfoStreamCmd)(nil)
|
||||
|
@ -2053,12 +2112,13 @@ func (cmd *XInfoStreamCmd) String() string {
|
|||
}
|
||||
|
||||
func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
|
||||
if err := rd.ReadFixedMapLen(7); err != nil {
|
||||
n, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmd.val = &XInfoStream{}
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
for i := 0; i < n; i++ {
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2066,30 +2126,56 @@ func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
|
|||
switch key {
|
||||
case "length":
|
||||
cmd.val.Length, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "radix-tree-keys":
|
||||
cmd.val.RadixTreeKeys, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "radix-tree-nodes":
|
||||
cmd.val.RadixTreeNodes, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "groups":
|
||||
cmd.val.Groups, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "last-generated-id":
|
||||
cmd.val.LastGeneratedID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "max-deleted-entry-id":
|
||||
cmd.val.MaxDeletedEntryID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "entries-added":
|
||||
cmd.val.EntriesAdded, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "first-entry":
|
||||
cmd.val.FirstEntry, err = readXMessage(rd)
|
||||
if err == Nil {
|
||||
err = nil
|
||||
if err != nil && err != Nil {
|
||||
return err
|
||||
}
|
||||
case "last-entry":
|
||||
cmd.val.LastEntry, err = readXMessage(rd)
|
||||
if err == Nil {
|
||||
err = nil
|
||||
if err != nil && err != Nil {
|
||||
return err
|
||||
}
|
||||
case "recorded-first-entry-id":
|
||||
cmd.val.RecordedFirstEntryID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("redis: unexpected content %s "+
|
||||
"in XINFO STREAM reply", key)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("redis: unexpected key %q in XINFO STREAM reply", key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -2103,17 +2189,22 @@ type XInfoStreamFullCmd struct {
|
|||
}
|
||||
|
||||
type XInfoStreamFull struct {
|
||||
Length int64
|
||||
RadixTreeKeys int64
|
||||
RadixTreeNodes int64
|
||||
LastGeneratedID string
|
||||
Entries []XMessage
|
||||
Groups []XInfoStreamGroup
|
||||
Length int64
|
||||
RadixTreeKeys int64
|
||||
RadixTreeNodes int64
|
||||
LastGeneratedID string
|
||||
MaxDeletedEntryID string
|
||||
EntriesAdded int64
|
||||
Entries []XMessage
|
||||
Groups []XInfoStreamGroup
|
||||
RecordedFirstEntryID string
|
||||
}
|
||||
|
||||
type XInfoStreamGroup struct {
|
||||
Name string
|
||||
LastDeliveredID string
|
||||
EntriesRead int64
|
||||
Lag int64
|
||||
PelCount int64
|
||||
Pending []XInfoStreamGroupPending
|
||||
Consumers []XInfoStreamConsumer
|
||||
|
@ -2167,13 +2258,14 @@ func (cmd *XInfoStreamFullCmd) String() string {
|
|||
}
|
||||
|
||||
func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error {
|
||||
if err := rd.ReadFixedMapLen(6); err != nil {
|
||||
n, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cmd.val = &XInfoStreamFull{}
|
||||
|
||||
for i := 0; i < 6; i++ {
|
||||
for i := 0; i < n; i++ {
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2182,22 +2274,51 @@ func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error {
|
|||
switch key {
|
||||
case "length":
|
||||
cmd.val.Length, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "radix-tree-keys":
|
||||
cmd.val.RadixTreeKeys, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "radix-tree-nodes":
|
||||
cmd.val.RadixTreeNodes, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "last-generated-id":
|
||||
cmd.val.LastGeneratedID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "entries-added":
|
||||
cmd.val.EntriesAdded, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "entries":
|
||||
cmd.val.Entries, err = readXMessageSlice(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "groups":
|
||||
cmd.val.Groups, err = readStreamGroups(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "max-deleted-entry-id":
|
||||
cmd.val.MaxDeletedEntryID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "recorded-first-entry-id":
|
||||
cmd.val.RecordedFirstEntryID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("redis: unexpected content %s "+
|
||||
"in XINFO STREAM FULL reply", key)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("redis: unexpected key %q in XINFO STREAM FULL reply", key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -2210,13 +2331,14 @@ func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) {
|
|||
}
|
||||
groups := make([]XInfoStreamGroup, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
if err = rd.ReadFixedMapLen(5); err != nil {
|
||||
nn, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
group := XInfoStreamGroup{}
|
||||
|
||||
for f := 0; f < 5; f++ {
|
||||
for j := 0; j < nn; j++ {
|
||||
key, err := rd.ReadString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2225,21 +2347,41 @@ func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) {
|
|||
switch key {
|
||||
case "name":
|
||||
group.Name, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "last-delivered-id":
|
||||
group.LastDeliveredID, err = rd.ReadString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "entries-read":
|
||||
group.EntriesRead, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "lag":
|
||||
group.Lag, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "pel-count":
|
||||
group.PelCount, err = rd.ReadInt()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "pending":
|
||||
group.Pending, err = readXInfoStreamGroupPending(rd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "consumers":
|
||||
group.Consumers, err = readXInfoStreamConsumers(rd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("redis: unexpected content %s "+
|
||||
"in XINFO STREAM FULL reply", key)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("redis: unexpected key %q in XINFO STREAM FULL reply", key)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2682,7 +2824,7 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
|
|||
if nn >= 4 {
|
||||
networkingMetadata := make(map[string]string)
|
||||
|
||||
metadataLength, err := rd.ReadArrayLen()
|
||||
metadataLength, err := rd.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
154
commands_test.go
154
commands_test.go
|
@ -259,7 +259,7 @@ var _ = Describe("Commands", func() {
|
|||
It("should Command", func() {
|
||||
cmds, err := client.Command(ctx).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(cmds)).To(BeNumerically("~", 200, 25))
|
||||
Expect(len(cmds)).To(BeNumerically("~", 240, 25))
|
||||
|
||||
cmd := cmds["mget"]
|
||||
Expect(cmd.Name).To(Equal("mget"))
|
||||
|
@ -272,7 +272,6 @@ var _ = Describe("Commands", func() {
|
|||
cmd = cmds["ping"]
|
||||
Expect(cmd.Name).To(Equal("ping"))
|
||||
Expect(cmd.Arity).To(Equal(int8(-1)))
|
||||
Expect(cmd.Flags).To(ContainElement("stale"))
|
||||
Expect(cmd.Flags).To(ContainElement("fast"))
|
||||
Expect(cmd.FirstKeyPos).To(Equal(int8(0)))
|
||||
Expect(cmd.LastKeyPos).To(Equal(int8(0)))
|
||||
|
@ -281,7 +280,7 @@ var _ = Describe("Commands", func() {
|
|||
})
|
||||
|
||||
Describe("debugging", func() {
|
||||
It("should DebugObject", func() {
|
||||
PIt("should DebugObject", func() {
|
||||
err := client.DebugObject(ctx, "foo").Err()
|
||||
Expect(err).To(MatchError("ERR no such key"))
|
||||
|
||||
|
@ -1309,7 +1308,7 @@ var _ = Describe("Commands", func() {
|
|||
Get: true,
|
||||
}
|
||||
val, err := client.SetArgs(ctx, "key", "hello", args).Result()
|
||||
Expect(err).To(Equal(proto.RedisError("ERR syntax error")))
|
||||
Expect(err).To(Equal(redis.Nil))
|
||||
Expect(val).To(Equal(""))
|
||||
})
|
||||
|
||||
|
@ -1347,7 +1346,7 @@ var _ = Describe("Commands", func() {
|
|||
Get: true,
|
||||
}
|
||||
val, err := client.SetArgs(ctx, "key", "hello", args).Result()
|
||||
Expect(err).To(Equal(proto.RedisError("ERR syntax error")))
|
||||
Expect(err).To(Equal(redis.Nil))
|
||||
Expect(val).To(Equal(""))
|
||||
})
|
||||
|
||||
|
@ -4838,13 +4837,22 @@ var _ = Describe("Commands", func() {
|
|||
res.RadixTreeNodes = 0
|
||||
|
||||
Expect(res).To(Equal(&redis.XInfoStream{
|
||||
Length: 3,
|
||||
RadixTreeKeys: 0,
|
||||
RadixTreeNodes: 0,
|
||||
Groups: 2,
|
||||
LastGeneratedID: "3-0",
|
||||
FirstEntry: redis.XMessage{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||
LastEntry: redis.XMessage{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
Length: 3,
|
||||
RadixTreeKeys: 0,
|
||||
RadixTreeNodes: 0,
|
||||
Groups: 2,
|
||||
LastGeneratedID: "3-0",
|
||||
MaxDeletedEntryID: "0-0",
|
||||
EntriesAdded: 3,
|
||||
FirstEntry: redis.XMessage{
|
||||
ID: "1-0",
|
||||
Values: map[string]interface{}{"uno": "un"},
|
||||
},
|
||||
LastEntry: redis.XMessage{
|
||||
ID: "3-0",
|
||||
Values: map[string]interface{}{"tres": "troix"},
|
||||
},
|
||||
RecordedFirstEntryID: "1-0",
|
||||
}))
|
||||
|
||||
// stream is empty
|
||||
|
@ -4858,13 +4866,16 @@ var _ = Describe("Commands", func() {
|
|||
res.RadixTreeNodes = 0
|
||||
|
||||
Expect(res).To(Equal(&redis.XInfoStream{
|
||||
Length: 0,
|
||||
RadixTreeKeys: 0,
|
||||
RadixTreeNodes: 0,
|
||||
Groups: 2,
|
||||
LastGeneratedID: "3-0",
|
||||
FirstEntry: redis.XMessage{},
|
||||
LastEntry: redis.XMessage{},
|
||||
Length: 0,
|
||||
RadixTreeKeys: 0,
|
||||
RadixTreeNodes: 0,
|
||||
Groups: 2,
|
||||
LastGeneratedID: "3-0",
|
||||
MaxDeletedEntryID: "3-0",
|
||||
EntriesAdded: 3,
|
||||
FirstEntry: redis.XMessage{},
|
||||
LastEntry: redis.XMessage{},
|
||||
RecordedFirstEntryID: "0-0",
|
||||
}))
|
||||
})
|
||||
|
||||
|
@ -4892,115 +4903,14 @@ var _ = Describe("Commands", func() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Expect(res).To(Equal(&redis.XInfoStreamFull{
|
||||
Length: 3,
|
||||
RadixTreeKeys: 0,
|
||||
RadixTreeNodes: 0,
|
||||
LastGeneratedID: "3-0",
|
||||
Entries: []redis.XMessage{
|
||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||
},
|
||||
Groups: []redis.XInfoStreamGroup{
|
||||
{
|
||||
Name: "group1",
|
||||
LastDeliveredID: "3-0",
|
||||
PelCount: 3,
|
||||
Pending: []redis.XInfoStreamGroupPending{
|
||||
{
|
||||
ID: "1-0",
|
||||
Consumer: "consumer1",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
{
|
||||
ID: "2-0",
|
||||
Consumer: "consumer1",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
},
|
||||
Consumers: []redis.XInfoStreamConsumer{
|
||||
{
|
||||
Name: "consumer1",
|
||||
SeenTime: time.Time{},
|
||||
PelCount: 2,
|
||||
Pending: []redis.XInfoStreamConsumerPending{
|
||||
{
|
||||
ID: "1-0",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
{
|
||||
ID: "2-0",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "consumer2",
|
||||
SeenTime: time.Time{},
|
||||
PelCount: 1,
|
||||
Pending: []redis.XInfoStreamConsumerPending{
|
||||
{
|
||||
ID: "3-0",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "group2",
|
||||
LastDeliveredID: "3-0",
|
||||
PelCount: 2,
|
||||
Pending: []redis.XInfoStreamGroupPending{
|
||||
{
|
||||
ID: "2-0",
|
||||
Consumer: "consumer1",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
{
|
||||
ID: "3-0",
|
||||
Consumer: "consumer1",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
},
|
||||
Consumers: []redis.XInfoStreamConsumer{
|
||||
{
|
||||
Name: "consumer1",
|
||||
SeenTime: time.Time{},
|
||||
PelCount: 2,
|
||||
Pending: []redis.XInfoStreamConsumerPending{
|
||||
{
|
||||
ID: "2-0",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
{
|
||||
ID: "3-0",
|
||||
DeliveryTime: time.Time{},
|
||||
DeliveryCount: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should XINFO GROUPS", func() {
|
||||
res, err := client.XInfoGroups(ctx, "stream").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XInfoGroup{
|
||||
{Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0"},
|
||||
{Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0"},
|
||||
{Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0", EntriesRead: 3},
|
||||
{Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0", EntriesRead: 3},
|
||||
}))
|
||||
})
|
||||
|
||||
|
|
|
@ -85,11 +85,11 @@ func (c *clusterState) IsConsistent(ctx context.Context) bool {
|
|||
}
|
||||
|
||||
func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []string {
|
||||
addrs, err := c.Slaves(ctx, name).Result()
|
||||
addrs, err := c.Replicas(ctx, name).Result()
|
||||
if err != nil {
|
||||
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
|
||||
internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
|
||||
name, err)
|
||||
return []string{}
|
||||
}
|
||||
return parseSlaveAddrs(addrs, false)
|
||||
return parseReplicaAddrs(addrs, false)
|
||||
}
|
||||
|
|
|
@ -388,7 +388,7 @@ func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
|
|||
return err
|
||||
}
|
||||
if n != fixedLen {
|
||||
return fmt.Errorf("redis: got %d elements of array length, wanted %d", n, fixedLen)
|
||||
return fmt.Errorf("redis: got %d elements in the array, wanted %d", n, fixedLen)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -407,19 +407,19 @@ func (r *Reader) ReadArrayLen() (int, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// ReadFixedMapLen read fixed map length.
|
||||
// ReadFixedMapLen reads fixed map length.
|
||||
func (r *Reader) ReadFixedMapLen(fixedLen int) error {
|
||||
n, err := r.ReadMapLen()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n != fixedLen {
|
||||
return fmt.Errorf("redis: got %d elements of map length, wanted %d", n, fixedLen)
|
||||
return fmt.Errorf("redis: got %d elements in the map, wanted %d", n, fixedLen)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadMapLen read the length of the map type.
|
||||
// ReadMapLen reads the length of the map type.
|
||||
// If responding to the array type (RespArray/RespSet/RespPush),
|
||||
// it must be a multiple of 2 and return n/2.
|
||||
// Other types will return an error.
|
||||
|
|
17
main_test.go
17
main_test.go
|
@ -283,8 +283,9 @@ func (p *redisProcess) Close() error {
|
|||
}
|
||||
|
||||
var (
|
||||
redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
|
||||
redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf"))
|
||||
redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
|
||||
redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "redis.conf"))
|
||||
redisSentinelConf, _ = filepath.Abs(filepath.Join("testdata", "redis", "sentinel.conf"))
|
||||
)
|
||||
|
||||
func redisDir(port string) (string, error) {
|
||||
|
@ -306,7 +307,8 @@ func startRedis(port string, args ...string) (*redisProcess, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
|
||||
|
||||
if err := exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -333,7 +335,12 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
|
||||
sentinelConf := filepath.Join(dir, "sentinel.conf")
|
||||
if err := os.WriteFile(sentinelConf, nil, 0o644); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
process, err := execCmd(redisServerBin, sentinelConf, "--sentinel", "--port", port, "--dir", dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -355,7 +362,7 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
|||
client.Process(ctx, cmd)
|
||||
if err := cmd.Err(); err != nil {
|
||||
process.Kill()
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("%s failed: %w", cmd, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
3
redis.go
3
redis.go
|
@ -228,8 +228,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
|||
|
||||
var auth bool
|
||||
|
||||
// The low version of redis-server does not support the hello command.
|
||||
// For redis-server (<6.0) that does not support the Hello command,
|
||||
// For redis-server <6.0 that does not support the Hello command,
|
||||
// we continue to provide services with RESP2.
|
||||
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
|
||||
auth = true
|
||||
|
|
70
sentinel.go
70
sentinel.go
|
@ -32,19 +32,19 @@ type FailoverOptions struct {
|
|||
// authentication.
|
||||
SentinelPassword string
|
||||
|
||||
// Allows routing read-only commands to the closest master or slave node.
|
||||
// Allows routing read-only commands to the closest master or replica node.
|
||||
// This option only works with NewFailoverClusterClient.
|
||||
RouteByLatency bool
|
||||
// Allows routing read-only commands to the random master or slave node.
|
||||
// Allows routing read-only commands to the random master or replica node.
|
||||
// This option only works with NewFailoverClusterClient.
|
||||
RouteRandomly bool
|
||||
|
||||
// Route all commands to slave read-only nodes.
|
||||
SlaveOnly bool
|
||||
// Route all commands to replica read-only nodes.
|
||||
ReplicaOnly bool
|
||||
|
||||
// Use slaves disconnected with master when cannot get connected slaves
|
||||
// Now, this option only works in RandomSlaveAddr function.
|
||||
UseDisconnectedSlaves bool
|
||||
// Use replicas disconnected with master when cannot get connected replicas
|
||||
// Now, this option only works in RandomReplicaAddr function.
|
||||
UseDisconnectedReplicas bool
|
||||
|
||||
// Following options are copied from Options struct.
|
||||
|
||||
|
@ -194,7 +194,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
|||
}
|
||||
|
||||
opt := failoverOpt.clientOptions()
|
||||
opt.Dialer = masterSlaveDialer(failover)
|
||||
opt.Dialer = masterReplicaDialer(failover)
|
||||
opt.init()
|
||||
|
||||
connPool := newConnPool(opt)
|
||||
|
@ -217,15 +217,15 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
|||
return &c
|
||||
}
|
||||
|
||||
func masterSlaveDialer(
|
||||
func masterReplicaDialer(
|
||||
failover *sentinelFailover,
|
||||
) func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return func(ctx context.Context, network, _ string) (net.Conn, error) {
|
||||
var addr string
|
||||
var err error
|
||||
|
||||
if failover.opt.SlaveOnly {
|
||||
addr, err = failover.RandomSlaveAddr(ctx)
|
||||
if failover.opt.ReplicaOnly {
|
||||
addr, err = failover.RandomReplicaAddr(ctx)
|
||||
} else {
|
||||
addr, err = failover.MasterAddr(ctx)
|
||||
if err == nil {
|
||||
|
@ -351,7 +351,7 @@ func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
|
|||
|
||||
// Reset resets all the masters with matching name. The pattern argument is a
|
||||
// glob-style pattern. The reset process clears any previous state in a master
|
||||
// (including a failover in progress), and removes every slave and sentinel
|
||||
// (including a failover in progress), and removes every replica and sentinel
|
||||
// already discovered and associated with the master.
|
||||
func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
|
||||
cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
|
||||
|
@ -381,9 +381,9 @@ func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
|
|||
return cmd
|
||||
}
|
||||
|
||||
// Slaves shows a list of slaves for the specified master and their state.
|
||||
func (c *SentinelClient) Slaves(ctx context.Context, name string) *MapStringStringSliceCmd {
|
||||
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "slaves", name)
|
||||
// Replicas shows a list of replicas for the specified master and their state.
|
||||
func (c *SentinelClient) Replicas(ctx context.Context, name string) *MapStringStringSliceCmd {
|
||||
cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "replicas", name)
|
||||
_ = c.Process(ctx, cmd)
|
||||
return cmd
|
||||
}
|
||||
|
@ -460,18 +460,18 @@ func (c *sentinelFailover) closeSentinel() error {
|
|||
return firstErr
|
||||
}
|
||||
|
||||
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
|
||||
func (c *sentinelFailover) RandomReplicaAddr(ctx context.Context) (string, error) {
|
||||
if c.opt == nil {
|
||||
return "", errors.New("opt is nil")
|
||||
}
|
||||
|
||||
addresses, err := c.slaveAddrs(ctx, false)
|
||||
addresses, err := c.replicaAddrs(ctx, false)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(addresses) == 0 && c.opt.UseDisconnectedSlaves {
|
||||
addresses, err = c.slaveAddrs(ctx, true)
|
||||
if len(addresses) == 0 && c.opt.UseDisconnectedReplicas {
|
||||
addresses, err = c.replicaAddrs(ctx, true)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -528,13 +528,13 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
|||
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
||||
}
|
||||
|
||||
func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
|
||||
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
|
||||
c.mu.RLock()
|
||||
sentinel := c.sentinel
|
||||
c.mu.RUnlock()
|
||||
|
||||
if sentinel != nil {
|
||||
addrs := c.getSlaveAddrs(ctx, sentinel)
|
||||
addrs := c.getReplicaAddrs(ctx, sentinel)
|
||||
if len(addrs) > 0 {
|
||||
return addrs, nil
|
||||
}
|
||||
|
@ -544,7 +544,7 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool)
|
|||
defer c.mu.Unlock()
|
||||
|
||||
if c.sentinel != nil {
|
||||
addrs := c.getSlaveAddrs(ctx, c.sentinel)
|
||||
addrs := c.getReplicaAddrs(ctx, c.sentinel)
|
||||
if len(addrs) > 0 {
|
||||
return addrs, nil
|
||||
}
|
||||
|
@ -556,15 +556,15 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool)
|
|||
for i, sentinelAddr := range c.sentinelAddrs {
|
||||
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
||||
|
||||
slaves, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
|
||||
replicas, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
|
||||
if err != nil {
|
||||
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
|
||||
internal.Logger.Printf(ctx, "sentinel: Replicas master=%q failed: %s",
|
||||
c.opt.MasterName, err)
|
||||
_ = sentinel.Close()
|
||||
continue
|
||||
}
|
||||
sentinelReachable = true
|
||||
addrs := parseSlaveAddrs(slaves, useDisconnected)
|
||||
addrs := parseReplicaAddrs(replicas, useDisconnected)
|
||||
if len(addrs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
@ -591,17 +591,17 @@ func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *Sentinel
|
|||
return net.JoinHostPort(addr[0], addr[1])
|
||||
}
|
||||
|
||||
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
|
||||
addrs, err := sentinel.Slaves(ctx, c.opt.MasterName).Result()
|
||||
func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *SentinelClient) []string {
|
||||
addrs, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
|
||||
if err != nil {
|
||||
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
|
||||
internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
|
||||
c.opt.MasterName, err)
|
||||
return []string{}
|
||||
}
|
||||
return parseSlaveAddrs(addrs, false)
|
||||
return parseReplicaAddrs(addrs, false)
|
||||
}
|
||||
|
||||
func parseSlaveAddrs(addrs []map[string]string, keepDisconnected bool) []string {
|
||||
func parseReplicaAddrs(addrs []map[string]string, keepDisconnected bool) []string {
|
||||
nodes := make([]string, 0, len(addrs))
|
||||
for _, node := range addrs {
|
||||
isDown := false
|
||||
|
@ -656,7 +656,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
|
|||
c.sentinel = sentinel
|
||||
c.discoverSentinels(ctx)
|
||||
|
||||
c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done")
|
||||
c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+replica-reconf-done")
|
||||
go c.listen(c.pubsub)
|
||||
}
|
||||
|
||||
|
@ -723,7 +723,7 @@ func contains(slice []string, str string) bool {
|
|||
//------------------------------------------------------------------------------
|
||||
|
||||
// NewFailoverClusterClient returns a client that supports routing read-only commands
|
||||
// to a slave node.
|
||||
// to a replica node.
|
||||
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
||||
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
||||
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
||||
|
@ -744,14 +744,14 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
|||
Addr: masterAddr,
|
||||
}}
|
||||
|
||||
slaveAddrs, err := failover.slaveAddrs(ctx, false)
|
||||
replicaAddrs, err := failover.replicaAddrs(ctx, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, slaveAddr := range slaveAddrs {
|
||||
for _, replicaAddr := range replicaAddrs {
|
||||
nodes = append(nodes, ClusterNode{
|
||||
Addr: slaveAddr,
|
||||
Addr: replicaAddr,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue