From 4afa84c2c4692cec3b787e39fc629fe02b830846 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 8 Oct 2019 12:43:00 +0300 Subject: [PATCH] Fix sentinel leak --- sentinel.go | 75 +++++++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/sentinel.go b/sentinel.go index b81e6b7..0983e2c 100644 --- a/sentinel.go +++ b/sentinel.go @@ -301,6 +301,19 @@ func (c *sentinelFailover) Close() error { return nil } +func (c *sentinelFailover) closeSentinel() error { + firstErr := c.pubsub.Close() + c.pubsub = nil + + err := c.sentinel.Close() + if err != nil && firstErr == nil { + firstErr = err + } + c.sentinel = nil + + return firstErr +} + func (c *sentinelFailover) Pool() *pool.ConnPool { c.poolOnce.Do(func() { opt := *c.opt @@ -331,14 +344,28 @@ func (c *sentinelFailover) MasterAddr() (string, error) { } func (c *sentinelFailover) masterAddr() (string, error) { - addr := c.getMasterAddr() - if addr != "" { - return addr, nil + c.mu.RLock() + sentinel := c.sentinel + c.mu.RUnlock() + + if sentinel != nil { + addr := c.getMasterAddr(sentinel) + if addr != "" { + return addr, nil + } } c.mu.Lock() defer c.mu.Unlock() + if c.sentinel != nil { + addr := c.getMasterAddr(c.sentinel) + if addr != "" { + return addr, nil + } + _ = c.closeSentinel() + } + for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, @@ -378,27 +405,13 @@ func (c *sentinelFailover) masterAddr() (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (c *sentinelFailover) getMasterAddr() string { - c.mu.RLock() - sentinel := c.sentinel - c.mu.RUnlock() - - if sentinel == nil { - return "" - } - +func (c *sentinelFailover) getMasterAddr(sentinel *SentinelClient) string { addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() if err != nil { internal.Logger.Printf("sentinel: GetMasterAddrByName name=%q failed: %s", c.masterName, err) - c.mu.Lock() - if c.sentinel == sentinel { - _ = c.closeSentinel() - } - c.mu.Unlock() return "" } - return net.JoinHostPort(addr[0], addr[1]) } @@ -413,6 +426,10 @@ func (c *sentinelFailover) switchMaster(addr string) { c.mu.Lock() defer c.mu.Unlock() + if c._masterAddr == addr { + return + } + internal.Logger.Printf("sentinel: new master=%q addr=%q", c.masterName, addr) _ = c.Pool().Filter(func(cn *pool.Conn) bool { @@ -422,28 +439,18 @@ func (c *sentinelFailover) switchMaster(addr string) { } func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { - c.discoverSentinels(sentinel) + if c.sentinel != nil { + panic("not reached") + } c.sentinel = sentinel + c.discoverSentinels() c.pubsub = sentinel.Subscribe("+switch-master") go c.listen(c.pubsub) } -func (c *sentinelFailover) closeSentinel() error { - firstErr := c.pubsub.Close() - c.pubsub = nil - - err := c.sentinel.Close() - if err != nil && firstErr == nil { - firstErr = err - } - c.sentinel = nil - - return firstErr -} - -func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { - sentinels, err := sentinel.Sentinels(c.masterName).Result() +func (c *sentinelFailover) discoverSentinels() { + sentinels, err := c.sentinel.Sentinels(c.masterName).Result() if err != nil { internal.Logger.Printf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) return