diff --git a/main_test.go b/main_test.go index 636518eb..0cb2b1d4 100644 --- a/main_test.go +++ b/main_test.go @@ -344,9 +344,11 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) { return nil, err } + // set down-after-milliseconds=2000 + // link: https://github.com/redis/redis/issues/8607 for _, cmd := range []*redis.StatusCmd{ redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"), - redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "500"), + redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "2000"), redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "failover-timeout", "1000"), redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "parallel-syncs", "1"), } { diff --git a/race_test.go b/race_test.go index e850050e..a1c2a080 100644 --- a/race_test.go +++ b/race_test.go @@ -308,7 +308,8 @@ var _ = Describe("races", func() { Streams: []string{"test", "$"}, Block: 1 * time.Second, }).Result() - Expect(err).To(Equal(context.Canceled)) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Or(Equal(context.Canceled.Error()), ContainSubstring("operation was canceled"))) }) time.Sleep(10 * time.Millisecond) diff --git a/redis_test.go b/redis_test.go index 32a3cbea..bbfa4c14 100644 --- a/redis_test.go +++ b/redis_test.go @@ -243,13 +243,17 @@ var _ = Describe("Client", func() { cn, err := client.Pool().Get(context.Background()) Expect(err).NotTo(HaveOccurred()) Expect(cn.UsedAt).NotTo(BeZero()) + + // set cn.SetUsedAt(time) or time.Sleep(>1*time.Second) + // simulate the last time Conn was used + // time.Sleep() is not the standard sleep time + // link: https://go-review.googlesource.com/c/go/+/232298 + cn.SetUsedAt(time.Now().Add(-1 * time.Second)) createdAt := cn.UsedAt() client.Pool().Put(ctx, cn) Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue()) - time.Sleep(time.Second) - err = client.Ping(ctx).Err() Expect(err).NotTo(HaveOccurred()) diff --git a/sentinel.go b/sentinel.go index 3e8afb74..590090f0 100644 --- a/sentinel.go +++ b/sentinel.go @@ -317,6 +317,16 @@ func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) * return cmd } +func (c *SentinelClient) GetSlavesAddrByName(ctx context.Context, name string) []string { + addrs, err := c.Slaves(ctx, name).Result() + if err != nil { + internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s", + name, err) + return []string{} + } + return parseSlaveAddrs(addrs, false) +} + func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd { cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name) _ = c.Process(ctx, cmd) diff --git a/sentinel_test.go b/sentinel_test.go index a4d848b4..8949faea 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -13,6 +13,7 @@ var _ = Describe("Sentinel", func() { var client *redis.Client var master *redis.Client var masterPort string + var sentinel *redis.SentinelClient BeforeEach(func() { client = redis.NewFailoverClient(&redis.FailoverOptions{ @@ -22,7 +23,7 @@ var _ = Describe("Sentinel", func() { }) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) - sentinel := redis.NewSentinelClient(&redis.Options{ + sentinel = redis.NewSentinelClient(&redis.Options{ Addr: ":" + sentinelPort1, MaxRetries: -1, }) @@ -51,6 +52,7 @@ var _ = Describe("Sentinel", func() { AfterEach(func() { _ = client.Close() _ = master.Close() + _ = sentinel.Close() }) It("should facilitate failover", func() { @@ -63,8 +65,28 @@ var _ = Describe("Sentinel", func() { Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal("master")) + // Verify master->slaves sync. + var slavesAddr []string + Eventually(func() []string { + slavesAddr = sentinel.GetSlavesAddrByName(ctx, sentinelName) + return slavesAddr + }, "15s", "100ms").Should(HaveLen(2)) + Eventually(func() bool { + sync := true + for _, addr := range slavesAddr { + slave := redis.NewClient(&redis.Options{ + Addr: addr, + MaxRetries: -1, + }) + sync = slave.Get(ctx, "foo").Val() == "master" + _ = slave.Close() + } + return sync + }, "15s", "100ms").Should(BeTrue()) + // Create subscription. - ch := client.Subscribe(ctx, "foo").Channel() + pub := client.Subscribe(ctx, "foo") + ch := pub.Channel() // Kill master. err = master.Shutdown(ctx).Err() @@ -86,6 +108,7 @@ var _ = Describe("Sentinel", func() { }, "15s", "100ms").Should(Receive(&msg)) Expect(msg.Channel).To(Equal("foo")) Expect(msg.Payload).To(Equal("hello")) + Expect(pub.Close()).NotTo(HaveOccurred()) _, err = startRedis(masterPort) Expect(err).NotTo(HaveOccurred())