mirror of https://github.com/go-redis/redis.git
parent
250f81bf06
commit
af3827aeab
|
@ -344,9 +344,11 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set down-after-milliseconds=2000
|
||||||
|
// link: https://github.com/redis/redis/issues/8607
|
||||||
for _, cmd := range []*redis.StatusCmd{
|
for _, cmd := range []*redis.StatusCmd{
|
||||||
redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
|
redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
|
||||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
|
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "2000"),
|
||||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "failover-timeout", "1000"),
|
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "failover-timeout", "1000"),
|
||||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "parallel-syncs", "1"),
|
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "parallel-syncs", "1"),
|
||||||
} {
|
} {
|
||||||
|
|
|
@ -308,7 +308,8 @@ var _ = Describe("races", func() {
|
||||||
Streams: []string{"test", "$"},
|
Streams: []string{"test", "$"},
|
||||||
Block: 1 * time.Second,
|
Block: 1 * time.Second,
|
||||||
}).Result()
|
}).Result()
|
||||||
Expect(err).To(Equal(context.Canceled))
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(Or(Equal(context.Canceled.Error()), ContainSubstring("operation was canceled")))
|
||||||
})
|
})
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
|
@ -243,13 +243,17 @@ var _ = Describe("Client", func() {
|
||||||
cn, err := client.Pool().Get(context.Background())
|
cn, err := client.Pool().Get(context.Background())
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cn.UsedAt).NotTo(BeZero())
|
Expect(cn.UsedAt).NotTo(BeZero())
|
||||||
|
|
||||||
|
// set cn.SetUsedAt(time) or time.Sleep(>1*time.Second)
|
||||||
|
// simulate the last time Conn was used
|
||||||
|
// time.Sleep() is not the standard sleep time
|
||||||
|
// link: https://go-review.googlesource.com/c/go/+/232298
|
||||||
|
cn.SetUsedAt(time.Now().Add(-1 * time.Second))
|
||||||
createdAt := cn.UsedAt()
|
createdAt := cn.UsedAt()
|
||||||
|
|
||||||
client.Pool().Put(ctx, cn)
|
client.Pool().Put(ctx, cn)
|
||||||
Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
|
Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
|
|
||||||
err = client.Ping(ctx).Err()
|
err = client.Ping(ctx).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
|
10
sentinel.go
10
sentinel.go
|
@ -317,6 +317,16 @@ func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *SentinelClient) GetSlavesAddrByName(ctx context.Context, name string) []string {
|
||||||
|
addrs, err := c.Slaves(ctx, name).Result()
|
||||||
|
if err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
|
||||||
|
name, err)
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
return parseSlaveAddrs(addrs, false)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
|
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
|
||||||
cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
|
cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
|
||||||
_ = c.Process(ctx, cmd)
|
_ = c.Process(ctx, cmd)
|
||||||
|
|
|
@ -13,6 +13,7 @@ var _ = Describe("Sentinel", func() {
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
var master *redis.Client
|
var master *redis.Client
|
||||||
var masterPort string
|
var masterPort string
|
||||||
|
var sentinel *redis.SentinelClient
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
|
@ -22,7 +23,7 @@ var _ = Describe("Sentinel", func() {
|
||||||
})
|
})
|
||||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
sentinel := redis.NewSentinelClient(&redis.Options{
|
sentinel = redis.NewSentinelClient(&redis.Options{
|
||||||
Addr: ":" + sentinelPort1,
|
Addr: ":" + sentinelPort1,
|
||||||
MaxRetries: -1,
|
MaxRetries: -1,
|
||||||
})
|
})
|
||||||
|
@ -51,6 +52,7 @@ var _ = Describe("Sentinel", func() {
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
_ = client.Close()
|
_ = client.Close()
|
||||||
_ = master.Close()
|
_ = master.Close()
|
||||||
|
_ = sentinel.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should facilitate failover", func() {
|
It("should facilitate failover", func() {
|
||||||
|
@ -63,8 +65,28 @@ var _ = Describe("Sentinel", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(val).To(Equal("master"))
|
Expect(val).To(Equal("master"))
|
||||||
|
|
||||||
|
// Verify master->slaves sync.
|
||||||
|
var slavesAddr []string
|
||||||
|
Eventually(func() []string {
|
||||||
|
slavesAddr = sentinel.GetSlavesAddrByName(ctx, sentinelName)
|
||||||
|
return slavesAddr
|
||||||
|
}, "15s", "100ms").Should(HaveLen(2))
|
||||||
|
Eventually(func() bool {
|
||||||
|
sync := true
|
||||||
|
for _, addr := range slavesAddr {
|
||||||
|
slave := redis.NewClient(&redis.Options{
|
||||||
|
Addr: addr,
|
||||||
|
MaxRetries: -1,
|
||||||
|
})
|
||||||
|
sync = slave.Get(ctx, "foo").Val() == "master"
|
||||||
|
_ = slave.Close()
|
||||||
|
}
|
||||||
|
return sync
|
||||||
|
}, "15s", "100ms").Should(BeTrue())
|
||||||
|
|
||||||
// Create subscription.
|
// Create subscription.
|
||||||
ch := client.Subscribe(ctx, "foo").Channel()
|
pub := client.Subscribe(ctx, "foo")
|
||||||
|
ch := pub.Channel()
|
||||||
|
|
||||||
// Kill master.
|
// Kill master.
|
||||||
err = master.Shutdown(ctx).Err()
|
err = master.Shutdown(ctx).Err()
|
||||||
|
@ -86,6 +108,7 @@ var _ = Describe("Sentinel", func() {
|
||||||
}, "15s", "100ms").Should(Receive(&msg))
|
}, "15s", "100ms").Should(Receive(&msg))
|
||||||
Expect(msg.Channel).To(Equal("foo"))
|
Expect(msg.Channel).To(Equal("foo"))
|
||||||
Expect(msg.Payload).To(Equal("hello"))
|
Expect(msg.Payload).To(Equal("hello"))
|
||||||
|
Expect(pub.Close()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
_, err = startRedis(masterPort)
|
_, err = startRedis(masterPort)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
Loading…
Reference in New Issue