From d00fb6ead9013c740bfa793d61d424f3ff3c6f4f Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 1 May 2015 09:33:47 +0300 Subject: [PATCH 1/3] Implement Close and fix reaper goroutine leak. --- cluster.go | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/cluster.go b/cluster.go index b8adedc7..2c6a45ce 100644 --- a/cluster.go +++ b/cluster.go @@ -13,10 +13,11 @@ type ClusterClient struct { addrs []string slots [][]string - slotsMx sync.RWMutex // protects slots & addrs cache + slotsMx sync.RWMutex // Protects slots and addrs. clients map[string]*Client - clientsMx sync.RWMutex + closed bool + clientsMx sync.RWMutex // Protects clients and closed. opt *ClusterOptions @@ -39,16 +40,24 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return client } -// Close closes the cluster client. +// Close closes the cluster client, releasing any open resources. +// +// It is rare to Close a Client, as the Client is meant to be +// long-lived and shared between many goroutines. func (c *ClusterClient) Close() error { - // TODO: close should make client unusable - c.setSlots(nil) + c.clientsMx.Lock() + + if c.closed { + return nil + } + c.closed = true c.resetClients() + c.setSlots(nil) + + c.clientsMx.Unlock() return nil } -// ------------------------------------------------------------------------ - // getClient returns a Client for a given address. func (c *ClusterClient) getClient(addr string) (*Client, error) { if addr == "" { @@ -64,6 +73,10 @@ func (c *ClusterClient) getClient(addr string) (*Client, error) { c.clientsMx.RUnlock() c.clientsMx.Lock() + if c.closed { + return nil, errClosed + } + client, ok = c.clients[addr] if !ok { opt := c.opt.clientOptions() @@ -83,7 +96,7 @@ func (c *ClusterClient) slotAddrs(slot int) []string { return addrs } -// randomClient returns a Client for the first live node. +// randomClient returns a Client for the first pingable node. func (c *ClusterClient) randomClient() (client *Client, err error) { for i := 0; i < 10; i++ { n := rand.Intn(len(c.addrs)) @@ -165,14 +178,12 @@ func (c *ClusterClient) process(cmd Cmder) { // Closes all clients and returns last error if there are any. func (c *ClusterClient) resetClients() (err error) { - c.clientsMx.Lock() for addr, client := range c.clients { if e := client.Close(); e != nil { err = e } delete(c.clients, addr) } - c.clientsMx.Unlock() return err } @@ -231,14 +242,24 @@ func (c *ClusterClient) scheduleReload() { // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper(ticker *time.Ticker) { for _ = range ticker.C { + c.clientsMx.RLock() + + if c.closed { + c.clientsMx.RUnlock() + return + } + for _, client := range c.clients { pool := client.connPool - // pool.First removes idle connections from the pool for us. So - // just put returned connection back. + // pool.First removes idle connections from the pool and + // returns first non-idle connection. So just put returned + // connection back. if cn := pool.First(); cn != nil { pool.Put(cn) } } + + c.clientsMx.RUnlock() } } From cc0ee1001924ebcbc2297d6820cb5fe2c35b1bc1 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 1 May 2015 11:01:01 +0300 Subject: [PATCH 2/3] Fix deadlock. Stop time.Ticker. Add a test that closed client is not deadlocked. --- cluster.go | 12 +++++++----- cluster_client_test.go | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cluster.go b/cluster.go index 2c6a45ce..f7a9f217 100644 --- a/cluster.go +++ b/cluster.go @@ -36,7 +36,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { } client.commandable.process = client.process client.reloadIfDue() - go client.reaper(time.NewTicker(5 * time.Minute)) + go client.reaper() return client } @@ -45,6 +45,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { // It is rare to Close a Client, as the Client is meant to be // long-lived and shared between many goroutines. func (c *ClusterClient) Close() error { + defer c.clientsMx.Unlock() c.clientsMx.Lock() if c.closed { @@ -53,8 +54,6 @@ func (c *ClusterClient) Close() error { c.closed = true c.resetClients() c.setSlots(nil) - - c.clientsMx.Unlock() return nil } @@ -74,6 +73,7 @@ func (c *ClusterClient) getClient(addr string) (*Client, error) { c.clientsMx.Lock() if c.closed { + c.clientsMx.Unlock() return nil, errClosed } @@ -240,13 +240,15 @@ func (c *ClusterClient) scheduleReload() { } // reaper closes idle connections to the cluster. -func (c *ClusterClient) reaper(ticker *time.Ticker) { +func (c *ClusterClient) reaper() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() for _ = range ticker.C { c.clientsMx.RLock() if c.closed { c.clientsMx.RUnlock() - return + break } for _, client := range c.clients { diff --git a/cluster_client_test.go b/cluster_client_test.go index 84a996d4..3f233682 100644 --- a/cluster_client_test.go +++ b/cluster_client_test.go @@ -83,6 +83,7 @@ var _ = Describe("ClusterClient", func() { Expect(subject.slots[8191]).To(BeEmpty()) Expect(subject.slots[8192]).To(BeEmpty()) Expect(subject.slots[16383]).To(BeEmpty()) + Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed")) }) It("should check if reload is due", func() { From 7da995891e064badfda7da2a75c2a36fdb8ceb56 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 1 May 2015 13:24:24 +0300 Subject: [PATCH 3/3] Lower ticker duration. --- cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster.go b/cluster.go index f7a9f217..f5fbaf7d 100644 --- a/cluster.go +++ b/cluster.go @@ -241,7 +241,7 @@ func (c *ClusterClient) scheduleReload() { // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper() { - ticker := time.NewTicker(5 * time.Minute) + ticker := time.NewTicker(time.Minute) defer ticker.Stop() for _ = range ticker.C { c.clientsMx.RLock()