forked from mirror/redis
commit
d47e48f8ab
|
@ -344,9 +344,11 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// set down-after-milliseconds=2000
|
||||
// link: https://github.com/redis/redis/issues/8607
|
||||
for _, cmd := range []*redis.StatusCmd{
|
||||
redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
|
||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
|
||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "2000"),
|
||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "failover-timeout", "1000"),
|
||||
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "parallel-syncs", "1"),
|
||||
} {
|
||||
|
|
|
@ -308,7 +308,8 @@ var _ = Describe("races", func() {
|
|||
Streams: []string{"test", "$"},
|
||||
Block: 1 * time.Second,
|
||||
}).Result()
|
||||
Expect(err).To(Equal(context.Canceled))
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(Or(Equal(context.Canceled.Error()), ContainSubstring("operation was canceled")))
|
||||
})
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
|
|
@ -243,13 +243,17 @@ var _ = Describe("Client", func() {
|
|||
cn, err := client.Pool().Get(context.Background())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(cn.UsedAt).NotTo(BeZero())
|
||||
|
||||
// set cn.SetUsedAt(time) or time.Sleep(>1*time.Second)
|
||||
// simulate the last time Conn was used
|
||||
// time.Sleep() is not the standard sleep time
|
||||
// link: https://go-review.googlesource.com/c/go/+/232298
|
||||
cn.SetUsedAt(time.Now().Add(-1 * time.Second))
|
||||
createdAt := cn.UsedAt()
|
||||
|
||||
client.Pool().Put(ctx, cn)
|
||||
Expect(cn.UsedAt().Equal(createdAt)).To(BeTrue())
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
err = client.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
|
|
10
sentinel.go
10
sentinel.go
|
@ -317,6 +317,16 @@ func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *
|
|||
return cmd
|
||||
}
|
||||
|
||||
func (c *SentinelClient) GetSlavesAddrByName(ctx context.Context, name string) []string {
|
||||
addrs, err := c.Slaves(ctx, name).Result()
|
||||
if err != nil {
|
||||
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
|
||||
name, err)
|
||||
return []string{}
|
||||
}
|
||||
return parseSlaveAddrs(addrs, false)
|
||||
}
|
||||
|
||||
func (c *SentinelClient) Sentinels(ctx context.Context, name string) *SliceCmd {
|
||||
cmd := NewSliceCmd(ctx, "sentinel", "sentinels", name)
|
||||
_ = c.Process(ctx, cmd)
|
||||
|
|
|
@ -13,6 +13,7 @@ var _ = Describe("Sentinel", func() {
|
|||
var client *redis.Client
|
||||
var master *redis.Client
|
||||
var masterPort string
|
||||
var sentinel *redis.SentinelClient
|
||||
|
||||
BeforeEach(func() {
|
||||
client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||
|
@ -22,7 +23,7 @@ var _ = Describe("Sentinel", func() {
|
|||
})
|
||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||
|
||||
sentinel := redis.NewSentinelClient(&redis.Options{
|
||||
sentinel = redis.NewSentinelClient(&redis.Options{
|
||||
Addr: ":" + sentinelPort1,
|
||||
MaxRetries: -1,
|
||||
})
|
||||
|
@ -51,6 +52,7 @@ var _ = Describe("Sentinel", func() {
|
|||
AfterEach(func() {
|
||||
_ = client.Close()
|
||||
_ = master.Close()
|
||||
_ = sentinel.Close()
|
||||
})
|
||||
|
||||
It("should facilitate failover", func() {
|
||||
|
@ -63,8 +65,28 @@ var _ = Describe("Sentinel", func() {
|
|||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(val).To(Equal("master"))
|
||||
|
||||
// Verify master->slaves sync.
|
||||
var slavesAddr []string
|
||||
Eventually(func() []string {
|
||||
slavesAddr = sentinel.GetSlavesAddrByName(ctx, sentinelName)
|
||||
return slavesAddr
|
||||
}, "15s", "100ms").Should(HaveLen(2))
|
||||
Eventually(func() bool {
|
||||
sync := true
|
||||
for _, addr := range slavesAddr {
|
||||
slave := redis.NewClient(&redis.Options{
|
||||
Addr: addr,
|
||||
MaxRetries: -1,
|
||||
})
|
||||
sync = slave.Get(ctx, "foo").Val() == "master"
|
||||
_ = slave.Close()
|
||||
}
|
||||
return sync
|
||||
}, "15s", "100ms").Should(BeTrue())
|
||||
|
||||
// Create subscription.
|
||||
ch := client.Subscribe(ctx, "foo").Channel()
|
||||
pub := client.Subscribe(ctx, "foo")
|
||||
ch := pub.Channel()
|
||||
|
||||
// Kill master.
|
||||
err = master.Shutdown(ctx).Err()
|
||||
|
@ -86,6 +108,7 @@ var _ = Describe("Sentinel", func() {
|
|||
}, "15s", "100ms").Should(Receive(&msg))
|
||||
Expect(msg.Channel).To(Equal("foo"))
|
||||
Expect(msg.Payload).To(Equal("hello"))
|
||||
Expect(pub.Close()).NotTo(HaveOccurred())
|
||||
|
||||
_, err = startRedis(masterPort)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
|
Loading…
Reference in New Issue