Fix cluster loopback handling. Fixes #589

This commit is contained in:
Vladimir Mihailenco 2018-08-14 15:56:43 +03:00
parent 8ebf0b7750
commit c1c2753ae8
1 changed files with 32 additions and 40 deletions

View File

@ -437,13 +437,15 @@ func newClusterState(
createdAt: time.Now(), createdAt: time.Now(),
} }
isLoopbackOrigin := isLoopbackAddr(origin) originHost, _, _ := net.SplitHostPort(origin)
isLoopbackOrigin := isLoopback(originHost)
for _, slot := range slots { for _, slot := range slots {
var nodes []*clusterNode var nodes []*clusterNode
for i, slotNode := range slot.Nodes { for i, slotNode := range slot.Nodes {
addr := slotNode.Addr addr := slotNode.Addr
if !isLoopbackOrigin && useOriginAddr(origin, addr) { if !isLoopbackOrigin {
addr = origin addr = replaceLoopbackHost(addr, originHost)
} }
node, err := c.nodes.GetOrCreate(addr) node, err := c.nodes.GetOrCreate(addr)
@ -477,6 +479,33 @@ func newClusterState(
return &c, nil return &c, nil
} }
func replaceLoopbackHost(nodeAddr, originHost string) string {
nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
if err != nil {
return nodeAddr
}
nodeIP := net.ParseIP(nodeHost)
if nodeIP == nil {
return nodeAddr
}
if !nodeIP.IsLoopback() {
return nodeAddr
}
// Use origin host which is not loopback and node port.
return net.JoinHostPort(originHost, nodePort)
}
func isLoopback(host string) bool {
ip := net.ParseIP(host)
if ip == nil {
return true
}
return ip.IsLoopback()
}
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot) nodes := c.slotNodes(slot)
if len(nodes) > 0 { if len(nodes) > 0 {
@ -1564,43 +1593,6 @@ func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
return pubsub return pubsub
} }
func useOriginAddr(originAddr, nodeAddr string) bool {
nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
if err != nil {
return false
}
nodeIP := net.ParseIP(nodeHost)
if nodeIP == nil {
return false
}
if !nodeIP.IsLoopback() {
return false
}
_, originPort, err := net.SplitHostPort(originAddr)
if err != nil {
return false
}
return nodePort == originPort
}
func isLoopbackAddr(addr string) bool {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return false
}
ip := net.ParseIP(host)
if ip == nil {
return false
}
return ip.IsLoopback()
}
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes { for _, n := range nodes {
if n == node { if n == node {