diff --git a/cluster.go b/cluster.go index b8adedc7..f5fbaf7d 100644 --- a/cluster.go +++ b/cluster.go @@ -13,10 +13,11 @@ type ClusterClient struct { addrs []string slots [][]string - slotsMx sync.RWMutex // protects slots & addrs cache + slotsMx sync.RWMutex // Protects slots and addrs. clients map[string]*Client - clientsMx sync.RWMutex + closed bool + clientsMx sync.RWMutex // Protects clients and closed. opt *ClusterOptions @@ -35,20 +36,27 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { } client.commandable.process = client.process client.reloadIfDue() - go client.reaper(time.NewTicker(5 * time.Minute)) + go client.reaper() return client } -// Close closes the cluster client. +// Close closes the cluster client, releasing any open resources. +// +// It is rare to Close a Client, as the Client is meant to be +// long-lived and shared between many goroutines. func (c *ClusterClient) Close() error { - // TODO: close should make client unusable - c.setSlots(nil) + defer c.clientsMx.Unlock() + c.clientsMx.Lock() + + if c.closed { + return nil + } + c.closed = true c.resetClients() + c.setSlots(nil) return nil } -// ------------------------------------------------------------------------ - // getClient returns a Client for a given address. func (c *ClusterClient) getClient(addr string) (*Client, error) { if addr == "" { @@ -64,6 +72,11 @@ func (c *ClusterClient) getClient(addr string) (*Client, error) { c.clientsMx.RUnlock() c.clientsMx.Lock() + if c.closed { + c.clientsMx.Unlock() + return nil, errClosed + } + client, ok = c.clients[addr] if !ok { opt := c.opt.clientOptions() @@ -83,7 +96,7 @@ func (c *ClusterClient) slotAddrs(slot int) []string { return addrs } -// randomClient returns a Client for the first live node. +// randomClient returns a Client for the first pingable node. func (c *ClusterClient) randomClient() (client *Client, err error) { for i := 0; i < 10; i++ { n := rand.Intn(len(c.addrs)) @@ -165,14 +178,12 @@ func (c *ClusterClient) process(cmd Cmder) { // Closes all clients and returns last error if there are any. func (c *ClusterClient) resetClients() (err error) { - c.clientsMx.Lock() for addr, client := range c.clients { if e := client.Close(); e != nil { err = e } delete(c.clients, addr) } - c.clientsMx.Unlock() return err } @@ -229,16 +240,28 @@ func (c *ClusterClient) scheduleReload() { } // reaper closes idle connections to the cluster. -func (c *ClusterClient) reaper(ticker *time.Ticker) { +func (c *ClusterClient) reaper() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() for _ = range ticker.C { + c.clientsMx.RLock() + + if c.closed { + c.clientsMx.RUnlock() + break + } + for _, client := range c.clients { pool := client.connPool - // pool.First removes idle connections from the pool for us. So - // just put returned connection back. + // pool.First removes idle connections from the pool and + // returns first non-idle connection. So just put returned + // connection back. if cn := pool.First(); cn != nil { pool.Put(cn) } } + + c.clientsMx.RUnlock() } } diff --git a/cluster_client_test.go b/cluster_client_test.go index 84a996d4..3f233682 100644 --- a/cluster_client_test.go +++ b/cluster_client_test.go @@ -83,6 +83,7 @@ var _ = Describe("ClusterClient", func() { Expect(subject.slots[8191]).To(BeEmpty()) Expect(subject.slots[8192]).To(BeEmpty()) Expect(subject.slots[16383]).To(BeEmpty()) + Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed")) }) It("should check if reload is due", func() {