This commit is contained in:
kwen 2024-11-22 00:17:39 -05:00 committed by GitHub
commit af51a55c3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 39 additions and 15 deletions

View File

@ -549,26 +549,50 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
}
var masterAddr string
var wg sync.WaitGroup
var once sync.Once
done := make(chan struct{})
for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
wg.Add(1)
go func(i int, addr string) {
defer wg.Done()
select {
case <-done:
default:
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
addr, c.opt.MasterName, err)
_ = sentinelCli.Close()
return
}
internal.Logger.Printf(ctx, "get addr %s master res %v", addr, masterAddr)
once.Do(func() {
// Push working sentinel to the top.
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinelCli)
close(done)
})
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
_ = sentinel.Close()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", err
}
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
c.opt.MasterName, err)
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
}(i, sentinelAddr)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
}
go func() {
wg.Wait()
once.Do(func() {
close(done)
})
}()
<-done
if masterAddr != "" {
return masterAddr, nil
}
return "", errors.New("redis: all sentinels specified in configuration are unreachable")