From d00fb6ead9013c740bfa793d61d424f3ff3c6f4f Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 1 May 2015 09:33:47 +0300 Subject: [PATCH] Implement Close and fix reaper goroutine leak. --- cluster.go | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/cluster.go b/cluster.go index b8adedc7..2c6a45ce 100644 --- a/cluster.go +++ b/cluster.go @@ -13,10 +13,11 @@ type ClusterClient struct { addrs []string slots [][]string - slotsMx sync.RWMutex // protects slots & addrs cache + slotsMx sync.RWMutex // Protects slots and addrs. clients map[string]*Client - clientsMx sync.RWMutex + closed bool + clientsMx sync.RWMutex // Protects clients and closed. opt *ClusterOptions @@ -39,16 +40,24 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return client } -// Close closes the cluster client. +// Close closes the cluster client, releasing any open resources. +// +// It is rare to Close a Client, as the Client is meant to be +// long-lived and shared between many goroutines. func (c *ClusterClient) Close() error { - // TODO: close should make client unusable - c.setSlots(nil) + c.clientsMx.Lock() + + if c.closed { + return nil + } + c.closed = true c.resetClients() + c.setSlots(nil) + + c.clientsMx.Unlock() return nil } -// ------------------------------------------------------------------------ - // getClient returns a Client for a given address. func (c *ClusterClient) getClient(addr string) (*Client, error) { if addr == "" { @@ -64,6 +73,10 @@ func (c *ClusterClient) getClient(addr string) (*Client, error) { c.clientsMx.RUnlock() c.clientsMx.Lock() + if c.closed { + return nil, errClosed + } + client, ok = c.clients[addr] if !ok { opt := c.opt.clientOptions() @@ -83,7 +96,7 @@ func (c *ClusterClient) slotAddrs(slot int) []string { return addrs } -// randomClient returns a Client for the first live node. +// randomClient returns a Client for the first pingable node. func (c *ClusterClient) randomClient() (client *Client, err error) { for i := 0; i < 10; i++ { n := rand.Intn(len(c.addrs)) @@ -165,14 +178,12 @@ func (c *ClusterClient) process(cmd Cmder) { // Closes all clients and returns last error if there are any. func (c *ClusterClient) resetClients() (err error) { - c.clientsMx.Lock() for addr, client := range c.clients { if e := client.Close(); e != nil { err = e } delete(c.clients, addr) } - c.clientsMx.Unlock() return err } @@ -231,14 +242,24 @@ func (c *ClusterClient) scheduleReload() { // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper(ticker *time.Ticker) { for _ = range ticker.C { + c.clientsMx.RLock() + + if c.closed { + c.clientsMx.RUnlock() + return + } + for _, client := range c.clients { pool := client.connPool - // pool.First removes idle connections from the pool for us. So - // just put returned connection back. + // pool.First removes idle connections from the pool and + // returns first non-idle connection. So just put returned + // connection back. if cn := pool.First(); cn != nil { pool.Put(cn) } } + + c.clientsMx.RUnlock() } }