Fix sentinel leak

This commit is contained in:
Vladimir Mihailenco 2019-10-08 12:43:00 +03:00
parent 2cd577a047
commit 4afa84c2c4
1 changed files with 41 additions and 34 deletions

View File

@ -301,6 +301,19 @@ func (c *sentinelFailover) Close() error {
return nil return nil
} }
func (c *sentinelFailover) closeSentinel() error {
firstErr := c.pubsub.Close()
c.pubsub = nil
err := c.sentinel.Close()
if err != nil && firstErr == nil {
firstErr = err
}
c.sentinel = nil
return firstErr
}
func (c *sentinelFailover) Pool() *pool.ConnPool { func (c *sentinelFailover) Pool() *pool.ConnPool {
c.poolOnce.Do(func() { c.poolOnce.Do(func() {
opt := *c.opt opt := *c.opt
@ -331,14 +344,28 @@ func (c *sentinelFailover) MasterAddr() (string, error) {
} }
func (c *sentinelFailover) masterAddr() (string, error) { func (c *sentinelFailover) masterAddr() (string, error) {
addr := c.getMasterAddr() c.mu.RLock()
if addr != "" { sentinel := c.sentinel
return addr, nil c.mu.RUnlock()
if sentinel != nil {
addr := c.getMasterAddr(sentinel)
if addr != "" {
return addr, nil
}
} }
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c.sentinel != nil {
addr := c.getMasterAddr(c.sentinel)
if addr != "" {
return addr, nil
}
_ = c.closeSentinel()
}
for i, sentinelAddr := range c.sentinelAddrs { for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{ sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr, Addr: sentinelAddr,
@ -378,27 +405,13 @@ func (c *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable") return "", errors.New("redis: all sentinels are unreachable")
} }
func (c *sentinelFailover) getMasterAddr() string { func (c *sentinelFailover) getMasterAddr(sentinel *SentinelClient) string {
c.mu.RLock()
sentinel := c.sentinel
c.mu.RUnlock()
if sentinel == nil {
return ""
}
addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s", internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s",
c.masterName, err) c.masterName, err)
c.mu.Lock()
if c.sentinel == sentinel {
_ = c.closeSentinel()
}
c.mu.Unlock()
return "" return ""
} }
return net.JoinHostPort(addr[0], addr[1]) return net.JoinHostPort(addr[0], addr[1])
} }
@ -413,6 +426,10 @@ func (c *sentinelFailover) switchMaster(addr string) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if c._masterAddr == addr {
return
}
internal.Logger.Printf("sentinel: new master=%q addr=%q", internal.Logger.Printf("sentinel: new master=%q addr=%q",
c.masterName, addr) c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool { _ = c.Pool().Filter(func(cn *pool.Conn) bool {
@ -422,28 +439,18 @@ func (c *sentinelFailover) switchMaster(addr string) {
} }
func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
c.discoverSentinels(sentinel) if c.sentinel != nil {
panic("not reached")
}
c.sentinel = sentinel c.sentinel = sentinel
c.discoverSentinels()
c.pubsub = sentinel.Subscribe("+switch-master") c.pubsub = sentinel.Subscribe("+switch-master")
go c.listen(c.pubsub) go c.listen(c.pubsub)
} }
func (c *sentinelFailover) closeSentinel() error { func (c *sentinelFailover) discoverSentinels() {
firstErr := c.pubsub.Close() sentinels, err := c.sentinel.Sentinels(c.masterName).Result()
c.pubsub = nil
err := c.sentinel.Close()
if err != nil && firstErr == nil {
firstErr = err
}
c.sentinel = nil
return firstErr
}
func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
sentinels, err := sentinel.Sentinels(c.masterName).Result()
if err != nil { if err != nil {
internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
return return