diff --git a/cluster.go b/cluster.go index beedea5..eb7127a 100644 --- a/cluster.go +++ b/cluster.go @@ -14,13 +14,15 @@ import ( type ClusterClient struct { commandable - addrs map[string]struct{} - slots [][]string - conns map[string]*Client - opt *ClusterOptions + addrs map[string]struct{} + slots [][]string + slotsMx sync.RWMutex // protects slots & addrs cache + + conns map[string]*Client + connsMx sync.Mutex // protects conns + + opt *ClusterOptions - // Protect addrs, slots and conns cache - cachemx sync.RWMutex _reload uint32 } @@ -45,8 +47,8 @@ func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { // Close closes the cluster connection func (c *ClusterClient) Close() error { - c.cachemx.Lock() - defer c.cachemx.Unlock() + c.slotsMx.Lock() + defer c.slotsMx.Unlock() return c.reset() } @@ -63,6 +65,7 @@ func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string { // Returns a node's client for a given address func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { + c.connsMx.Lock() client, ok := c.conns[addr] if !ok { opt := c.opt.clientOptions() @@ -70,6 +73,7 @@ func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { client = NewTCPClient(opt) c.conns[addr] = client } + c.connsMx.Unlock() return client } @@ -81,8 +85,8 @@ func (c *ClusterClient) process(cmd Cmder) { hashSlot := hashSlot(cmd.clusterKey()) - c.cachemx.RLock() - defer c.cachemx.RUnlock() + c.slotsMx.RLock() + defer c.slotsMx.RUnlock() tried := make(map[string]struct{}, len(c.addrs)) addr := c.getMasterAddrBySlot(hashSlot) @@ -146,8 +150,8 @@ func (c *ClusterClient) reloadIfDue() (err error) { var infos []ClusterSlotInfo - c.cachemx.Lock() - defer c.cachemx.Unlock() + c.slotsMx.Lock() + defer c.slotsMx.Unlock() // Try known addresses in random order (map interation order is random in Go) // http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections @@ -166,12 +170,14 @@ func (c *ClusterClient) reloadIfDue() (err error) { // Closes all connections and flushes slots cache func (c *ClusterClient) reset() (err error) { + c.connsMx.Lock() for addr, client := range c.conns { if e := client.Close(); e != nil { err = e } delete(c.conns, addr) } + c.connsMx.Unlock() c.slots = make([][]string, hashSlots) return }