mirror of https://github.com/go-redis/redis.git
Merge pull request #839 from go-redis/fix/cluster-loopback-fix
Fix cluster loopback handling. Fixes #589
This commit is contained in:
commit
ef3e0e9827
72
cluster.go
72
cluster.go
|
@ -438,13 +438,15 @@ func newClusterState(
|
||||||
createdAt: time.Now(),
|
createdAt: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
isLoopbackOrigin := isLoopbackAddr(origin)
|
originHost, _, _ := net.SplitHostPort(origin)
|
||||||
|
isLoopbackOrigin := isLoopback(originHost)
|
||||||
|
|
||||||
for _, slot := range slots {
|
for _, slot := range slots {
|
||||||
var nodes []*clusterNode
|
var nodes []*clusterNode
|
||||||
for i, slotNode := range slot.Nodes {
|
for i, slotNode := range slot.Nodes {
|
||||||
addr := slotNode.Addr
|
addr := slotNode.Addr
|
||||||
if !isLoopbackOrigin && useOriginAddr(origin, addr) {
|
if !isLoopbackOrigin {
|
||||||
addr = origin
|
addr = replaceLoopbackHost(addr, originHost)
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := c.nodes.GetOrCreate(addr)
|
node, err := c.nodes.GetOrCreate(addr)
|
||||||
|
@ -478,6 +480,33 @@ func newClusterState(
|
||||||
return &c, nil
|
return &c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func replaceLoopbackHost(nodeAddr, originHost string) string {
|
||||||
|
nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
|
||||||
|
if err != nil {
|
||||||
|
return nodeAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeIP := net.ParseIP(nodeHost)
|
||||||
|
if nodeIP == nil {
|
||||||
|
return nodeAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nodeIP.IsLoopback() {
|
||||||
|
return nodeAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use origin host which is not loopback and node port.
|
||||||
|
return net.JoinHostPort(originHost, nodePort)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isLoopback(host string) bool {
|
||||||
|
ip := net.ParseIP(host)
|
||||||
|
if ip == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return ip.IsLoopback()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
|
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
|
||||||
nodes := c.slotNodes(slot)
|
nodes := c.slotNodes(slot)
|
||||||
if len(nodes) > 0 {
|
if len(nodes) > 0 {
|
||||||
|
@ -1565,43 +1594,6 @@ func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
|
||||||
return pubsub
|
return pubsub
|
||||||
}
|
}
|
||||||
|
|
||||||
func useOriginAddr(originAddr, nodeAddr string) bool {
|
|
||||||
nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeIP := net.ParseIP(nodeHost)
|
|
||||||
if nodeIP == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if !nodeIP.IsLoopback() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
_, originPort, err := net.SplitHostPort(originAddr)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodePort == originPort
|
|
||||||
}
|
|
||||||
|
|
||||||
func isLoopbackAddr(addr string) bool {
|
|
||||||
host, _, err := net.SplitHostPort(addr)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
ip := net.ParseIP(host)
|
|
||||||
if ip == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return ip.IsLoopback()
|
|
||||||
}
|
|
||||||
|
|
||||||
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
|
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
if n == node {
|
if n == node {
|
||||||
|
|
Loading…
Reference in New Issue