forked from mirror/redis
Merge pull request #80 from go-redis/fix/improbably-race-condition
Better race protection
This commit is contained in:
commit
f0d65dece3
30
cluster.go
30
cluster.go
|
@ -14,13 +14,15 @@ import (
|
||||||
type ClusterClient struct {
|
type ClusterClient struct {
|
||||||
commandable
|
commandable
|
||||||
|
|
||||||
addrs map[string]struct{}
|
addrs map[string]struct{}
|
||||||
slots [][]string
|
slots [][]string
|
||||||
conns map[string]*Client
|
slotsMx sync.RWMutex // protects slots & addrs cache
|
||||||
opt *ClusterOptions
|
|
||||||
|
conns map[string]*Client
|
||||||
|
connsMx sync.Mutex // protects conns
|
||||||
|
|
||||||
|
opt *ClusterOptions
|
||||||
|
|
||||||
// Protect addrs, slots and conns cache
|
|
||||||
cachemx sync.RWMutex
|
|
||||||
_reload uint32
|
_reload uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,8 +47,8 @@ func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) {
|
||||||
|
|
||||||
// Close closes the cluster connection
|
// Close closes the cluster connection
|
||||||
func (c *ClusterClient) Close() error {
|
func (c *ClusterClient) Close() error {
|
||||||
c.cachemx.Lock()
|
c.slotsMx.Lock()
|
||||||
defer c.cachemx.Unlock()
|
defer c.slotsMx.Unlock()
|
||||||
|
|
||||||
return c.reset()
|
return c.reset()
|
||||||
}
|
}
|
||||||
|
@ -63,6 +65,7 @@ func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string {
|
||||||
|
|
||||||
// Returns a node's client for a given address
|
// Returns a node's client for a given address
|
||||||
func (c *ClusterClient) getNodeClientByAddr(addr string) *Client {
|
func (c *ClusterClient) getNodeClientByAddr(addr string) *Client {
|
||||||
|
c.connsMx.Lock()
|
||||||
client, ok := c.conns[addr]
|
client, ok := c.conns[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
opt := c.opt.clientOptions()
|
opt := c.opt.clientOptions()
|
||||||
|
@ -70,6 +73,7 @@ func (c *ClusterClient) getNodeClientByAddr(addr string) *Client {
|
||||||
client = NewTCPClient(opt)
|
client = NewTCPClient(opt)
|
||||||
c.conns[addr] = client
|
c.conns[addr] = client
|
||||||
}
|
}
|
||||||
|
c.connsMx.Unlock()
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,8 +85,8 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
|
|
||||||
hashSlot := hashSlot(cmd.clusterKey())
|
hashSlot := hashSlot(cmd.clusterKey())
|
||||||
|
|
||||||
c.cachemx.RLock()
|
c.slotsMx.RLock()
|
||||||
defer c.cachemx.RUnlock()
|
defer c.slotsMx.RUnlock()
|
||||||
|
|
||||||
tried := make(map[string]struct{}, len(c.addrs))
|
tried := make(map[string]struct{}, len(c.addrs))
|
||||||
addr := c.getMasterAddrBySlot(hashSlot)
|
addr := c.getMasterAddrBySlot(hashSlot)
|
||||||
|
@ -146,8 +150,8 @@ func (c *ClusterClient) reloadIfDue() (err error) {
|
||||||
|
|
||||||
var infos []ClusterSlotInfo
|
var infos []ClusterSlotInfo
|
||||||
|
|
||||||
c.cachemx.Lock()
|
c.slotsMx.Lock()
|
||||||
defer c.cachemx.Unlock()
|
defer c.slotsMx.Unlock()
|
||||||
|
|
||||||
// Try known addresses in random order (map interation order is random in Go)
|
// Try known addresses in random order (map interation order is random in Go)
|
||||||
// http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections
|
// http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections
|
||||||
|
@ -166,12 +170,14 @@ func (c *ClusterClient) reloadIfDue() (err error) {
|
||||||
|
|
||||||
// Closes all connections and flushes slots cache
|
// Closes all connections and flushes slots cache
|
||||||
func (c *ClusterClient) reset() (err error) {
|
func (c *ClusterClient) reset() (err error) {
|
||||||
|
c.connsMx.Lock()
|
||||||
for addr, client := range c.conns {
|
for addr, client := range c.conns {
|
||||||
if e := client.Close(); e != nil {
|
if e := client.Close(); e != nil {
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
delete(c.conns, addr)
|
delete(c.conns, addr)
|
||||||
}
|
}
|
||||||
|
c.connsMx.Unlock()
|
||||||
c.slots = make([][]string, hashSlots)
|
c.slots = make([][]string, hashSlots)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue