From 2511a1791d98b39ca017bec9cdc72e09cf0c067f Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 7 Apr 2015 12:30:06 +0300 Subject: [PATCH] Use only master node address. --- cluster.go | 56 ++++++++++++++++++++++++------------------------------ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/cluster.go b/cluster.go index 6fbc774..8790e12 100644 --- a/cluster.go +++ b/cluster.go @@ -26,11 +26,11 @@ type ClusterClient struct { commandable addrs []string - slots [][]string + slots []string slotsMx sync.RWMutex // protects slots & addrs cache clients map[string]*Client - clientsMx sync.RWMutex // protects clients + clientsMx sync.RWMutex opt *ClusterOptions @@ -106,12 +106,11 @@ func (c *ClusterClient) process(cmd Cmder) { slot := hashSlot(cmd.clusterKey()) c.slotsMx.RLock() - defer c.slotsMx.RUnlock() + masterAddr := c.slots[slot] + c.slotsMx.RUnlock() - addrs := c.slots[slot] - if len(addrs) > 0 { - // First address is master. - client = c.getClient(addrs[0]) + if masterAddr != "" { + client = c.getClient(masterAddr) } else { var err error client, err = c.randomClient() @@ -121,9 +120,6 @@ func (c *ClusterClient) process(cmd Cmder) { } } - // Index in the addrs slice pointing to the next replica. - replicaIndex := 1 - for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { if ask { pipe := client.Pipeline() @@ -141,20 +137,11 @@ func (c *ClusterClient) process(cmd Cmder) { return } - // On network errors try another node. + // On network errors try random node. if isNetworkError(err) { - if replicaIndex < len(addrs) { - // Try next available replica. - client = c.getClient(addrs[replicaIndex]) - replicaIndex++ - cmd.reset() - continue - } else { - // Otherwise try random node. - client, err = c.randomClient() - if err != nil { - return - } + client, err = c.randomClient() + if err != nil { + return } cmd.reset() continue @@ -197,16 +184,23 @@ func (c *ClusterClient) resetClients() (err error) { func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Lock() - c.slots = make([][]string, hashSlots) - for _, info := range slots { - for i := info.Start; i <= info.End; i++ { - c.slots[i] = info.Addrs - } - c.addrs = append(c.addrs, info.Addrs...) - } - c.addrs = removeDuplicates(c.addrs) + c.addrs = c.addrs[:0] + c.slots = make([]string, hashSlots) c.resetClients() + seen := make(map[string]struct{}) + for _, info := range slots { + masterAddr := info.Addrs[0] + for slot := info.Start; slot <= info.End; slot++ { + c.slots[slot] = masterAddr + } + + if _, ok := seen[masterAddr]; !ok { + c.addrs = append(c.addrs, masterAddr) + seen[masterAddr] = struct{}{} + } + } + c.slotsMx.Unlock() }