From f5091d4be5b54af54a2a1214054131452a32c52e Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Mon, 30 Mar 2015 15:10:53 +0100 Subject: [PATCH] Better race protection --- cluster.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/cluster.go b/cluster.go index beedea5..3bf5f82 100644 --- a/cluster.go +++ b/cluster.go @@ -16,11 +16,13 @@ type ClusterClient struct { addrs map[string]struct{} slots [][]string - conns map[string]*Client - opt *ClusterOptions + sLock sync.RWMutex // protects slots & addrs cache + + conns map[string]*Client + cLock sync.Mutex // protects conns + + opt *ClusterOptions - // Protect addrs, slots and conns cache - cachemx sync.RWMutex _reload uint32 } @@ -45,8 +47,8 @@ func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { // Close closes the cluster connection func (c *ClusterClient) Close() error { - c.cachemx.Lock() - defer c.cachemx.Unlock() + c.sLock.Lock() + defer c.sLock.Unlock() return c.reset() } @@ -63,6 +65,7 @@ func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string { // Returns a node's client for a given address func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { + c.cLock.Lock() client, ok := c.conns[addr] if !ok { opt := c.opt.clientOptions() @@ -70,6 +73,7 @@ func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { client = NewTCPClient(opt) c.conns[addr] = client } + c.cLock.Unlock() return client } @@ -81,8 +85,8 @@ func (c *ClusterClient) process(cmd Cmder) { hashSlot := hashSlot(cmd.clusterKey()) - c.cachemx.RLock() - defer c.cachemx.RUnlock() + c.sLock.RLock() + defer c.sLock.RUnlock() tried := make(map[string]struct{}, len(c.addrs)) addr := c.getMasterAddrBySlot(hashSlot) @@ -146,8 +150,8 @@ func (c *ClusterClient) reloadIfDue() (err error) { var infos []ClusterSlotInfo - c.cachemx.Lock() - defer c.cachemx.Unlock() + c.sLock.Lock() + defer c.sLock.Unlock() // Try known addresses in random order (map interation order is random in Go) // http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections @@ -166,12 +170,14 @@ func (c *ClusterClient) reloadIfDue() (err error) { // Closes all connections and flushes slots cache func (c *ClusterClient) reset() (err error) { + c.cLock.Lock() for addr, client := range c.conns { if e := client.Close(); e != nil { err = e } delete(c.conns, addr) } + c.cLock.Unlock() c.slots = make([][]string, hashSlots) return }