forked from mirror/redis
Use only master node address.
This commit is contained in:
parent
94a31f499f
commit
2511a1791d
50
cluster.go
50
cluster.go
|
@ -26,11 +26,11 @@ type ClusterClient struct {
|
||||||
commandable
|
commandable
|
||||||
|
|
||||||
addrs []string
|
addrs []string
|
||||||
slots [][]string
|
slots []string
|
||||||
slotsMx sync.RWMutex // protects slots & addrs cache
|
slotsMx sync.RWMutex // protects slots & addrs cache
|
||||||
|
|
||||||
clients map[string]*Client
|
clients map[string]*Client
|
||||||
clientsMx sync.RWMutex // protects clients
|
clientsMx sync.RWMutex
|
||||||
|
|
||||||
opt *ClusterOptions
|
opt *ClusterOptions
|
||||||
|
|
||||||
|
@ -106,12 +106,11 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
slot := hashSlot(cmd.clusterKey())
|
slot := hashSlot(cmd.clusterKey())
|
||||||
|
|
||||||
c.slotsMx.RLock()
|
c.slotsMx.RLock()
|
||||||
defer c.slotsMx.RUnlock()
|
masterAddr := c.slots[slot]
|
||||||
|
c.slotsMx.RUnlock()
|
||||||
|
|
||||||
addrs := c.slots[slot]
|
if masterAddr != "" {
|
||||||
if len(addrs) > 0 {
|
client = c.getClient(masterAddr)
|
||||||
// First address is master.
|
|
||||||
client = c.getClient(addrs[0])
|
|
||||||
} else {
|
} else {
|
||||||
var err error
|
var err error
|
||||||
client, err = c.randomClient()
|
client, err = c.randomClient()
|
||||||
|
@ -121,9 +120,6 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index in the addrs slice pointing to the next replica.
|
|
||||||
replicaIndex := 1
|
|
||||||
|
|
||||||
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
||||||
if ask {
|
if ask {
|
||||||
pipe := client.Pipeline()
|
pipe := client.Pipeline()
|
||||||
|
@ -141,21 +137,12 @@ func (c *ClusterClient) process(cmd Cmder) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// On network errors try another node.
|
// On network errors try random node.
|
||||||
if isNetworkError(err) {
|
if isNetworkError(err) {
|
||||||
if replicaIndex < len(addrs) {
|
|
||||||
// Try next available replica.
|
|
||||||
client = c.getClient(addrs[replicaIndex])
|
|
||||||
replicaIndex++
|
|
||||||
cmd.reset()
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
// Otherwise try random node.
|
|
||||||
client, err = c.randomClient()
|
client, err = c.randomClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
|
||||||
cmd.reset()
|
cmd.reset()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -197,16 +184,23 @@ func (c *ClusterClient) resetClients() (err error) {
|
||||||
func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
||||||
c.slotsMx.Lock()
|
c.slotsMx.Lock()
|
||||||
|
|
||||||
c.slots = make([][]string, hashSlots)
|
c.addrs = c.addrs[:0]
|
||||||
for _, info := range slots {
|
c.slots = make([]string, hashSlots)
|
||||||
for i := info.Start; i <= info.End; i++ {
|
|
||||||
c.slots[i] = info.Addrs
|
|
||||||
}
|
|
||||||
c.addrs = append(c.addrs, info.Addrs...)
|
|
||||||
}
|
|
||||||
c.addrs = removeDuplicates(c.addrs)
|
|
||||||
c.resetClients()
|
c.resetClients()
|
||||||
|
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
for _, info := range slots {
|
||||||
|
masterAddr := info.Addrs[0]
|
||||||
|
for slot := info.Start; slot <= info.End; slot++ {
|
||||||
|
c.slots[slot] = masterAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := seen[masterAddr]; !ok {
|
||||||
|
c.addrs = append(c.addrs, masterAddr)
|
||||||
|
seen[masterAddr] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c.slotsMx.Unlock()
|
c.slotsMx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue