To properly support kubernetes & Sentinel. replica_announce properties should be supported #692

This commit is contained in:
Chris Rice 2023-06-28 14:59:28 -07:00
parent 0144ca6883
commit f11aedb948
4 changed files with 93 additions and 2 deletions

View File

@ -16,6 +16,7 @@ import (
type Client struct { type Client struct {
id int // unique id id int // unique id
replPort int // the known replication port for follower connections replPort int // the known replication port for follower connections
replAddr string // the known replication addr for follower connections
authd bool // client has been authenticated authd bool // client has been authenticated
outputType Type // Null, JSON, or RESP outputType Type // Null, JSON, or RESP
remoteAddr string // original remote address remoteAddr string // original remote address

View File

@ -34,9 +34,11 @@ const (
AutoGC = "autogc" AutoGC = "autogc"
KeepAlive = "keepalive" KeepAlive = "keepalive"
LogConfig = "logconfig" LogConfig = "logconfig"
AnnounceIP = "replica_announce_ip"
AnnouncePort = "replica_announce_port"
) )
var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive, LogConfig, ReplicaPriority} var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive, LogConfig, ReplicaPriority, AnnouncePort, AnnounceIP}
// Config is a tile38 config // Config is a tile38 config
type Config struct { type Config struct {
@ -66,6 +68,10 @@ type Config struct {
_keepAlive int64 _keepAlive int64
_logConfigP interface{} _logConfigP interface{}
_logConfig string _logConfig string
_announceIPP string
_announceIP string
_announcePortP string
_announcePort int64
} }
func loadConfig(path string) (*Config, error) { func loadConfig(path string) (*Config, error) {
@ -94,6 +100,8 @@ func loadConfig(path string) (*Config, error) {
_autoGCP: gjson.Get(json, AutoGC).String(), _autoGCP: gjson.Get(json, AutoGC).String(),
_keepAliveP: gjson.Get(json, KeepAlive).String(), _keepAliveP: gjson.Get(json, KeepAlive).String(),
_logConfig: gjson.Get(json, LogConfig).String(), _logConfig: gjson.Get(json, LogConfig).String(),
_announceIPP: gjson.Get(json, AnnounceIP).String(),
_announcePortP: gjson.Get(json, AnnouncePort).String(),
} }
if config._serverID == "" { if config._serverID == "" {
@ -131,6 +139,12 @@ func loadConfig(path string) (*Config, error) {
if err := config.setProperty(LogConfig, config._logConfig, true); err != nil { if err := config.setProperty(LogConfig, config._logConfig, true); err != nil {
return nil, err return nil, err
} }
if err := config.setProperty(AnnounceIP, config._announceIPP, true); err != nil {
return nil, err
}
if err := config.setProperty(AnnouncePort, config._announcePortP, true); err != nil {
return nil, err
}
config.write(false) config.write(false)
return config, nil return config, nil
} }
@ -162,6 +176,14 @@ func (config *Config) write(writeProperties bool) {
if config._logConfig != "" { if config._logConfig != "" {
config._logConfigP = config._logConfig config._logConfigP = config._logConfig
} }
if config._announceIP != "" {
config._announceIPP = config._announceIP
}
if config._announcePort == 0 {
config._announcePortP = ""
} else {
config._announcePortP = strconv.FormatUint(uint64(config._announcePort), 10)
}
} }
m := make(map[string]interface{}) m := make(map[string]interface{})
@ -211,6 +233,12 @@ func (config *Config) write(writeProperties bool) {
m[LogConfig] = lcfg m[LogConfig] = lcfg
} }
} }
if config._announceIPP != "" {
m[AutoGC] = config._announceIPP
}
if config._announcePortP != "" {
m[AutoGC] = config._announcePortP
}
data, err := json.MarshalIndent(m, "", "\t") data, err := json.MarshalIndent(m, "", "\t")
if err != nil { if err != nil {
panic(err) panic(err)
@ -332,6 +360,18 @@ func (config *Config) setProperty(name, value string, fromLoad bool) error {
} else { } else {
config._replicaPriority = replicaPriority config._replicaPriority = replicaPriority
} }
case AnnounceIP:
config._announceIP = value
case AnnouncePort:
if value == "" {
config._announcePort = 0
} else {
announcePort, err := strconv.ParseUint(value, 10, 64)
if err != nil {
invalid = true
}
config._announcePort = int64(announcePort)
}
} }
if invalid { if invalid {
@ -377,6 +417,10 @@ func (config *Config) getProperty(name string) string {
} else { } else {
return strconv.FormatUint(uint64(config._replicaPriority), 10) return strconv.FormatUint(uint64(config._replicaPriority), 10)
} }
case AnnounceIP:
return config._announceIP
case AnnouncePort:
return strconv.FormatUint(uint64(config._announcePort), 10)
} }
} }
@ -509,6 +553,18 @@ func (config *Config) keepAlive() int64 {
config.mu.RUnlock() config.mu.RUnlock()
return v return v
} }
func (config *Config) announceIP() string {
config.mu.RLock()
v := config._announceIP
config.mu.RUnlock()
return v
}
func (config *Config) announcePort() int {
config.mu.RLock()
v := config._announcePort
config.mu.RUnlock()
return int(v)
}
func (config *Config) setFollowHost(v string) { func (config *Config) setFollowHost(v string) {
config.mu.Lock() config.mu.Lock()
config._followHost = v config._followHost = v

View File

@ -130,6 +130,18 @@ func (s *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err
return OKMessage(msg, start), nil return OKMessage(msg, start), nil
} }
} }
case "ip-address":
// Apply the replication ip to the client and return
s.connsmu.RLock()
defer s.connsmu.RUnlock()
for _, c := range s.conns {
if c.remoteAddr == client.remoteAddr {
c.mu.Lock()
c.replAddr = val
c.mu.Unlock()
return OKMessage(msg, start), nil
}
}
} }
return NOMessage, fmt.Errorf("cannot find follower") return NOMessage, fmt.Errorf("cannot find follower")
} }
@ -231,7 +243,11 @@ func (s *Server) followStep(host string, port int, followc int) error {
} }
// Send the replication port to the leader // Send the replication port to the leader
v, err := conn.Do("replconf", "listening-port", s.port) p := s.config.announcePort()
if p == 0 {
p = s.port
}
v, err := conn.Do("replconf", "listening-port", p)
if err != nil { if err != nil {
return err return err
} }
@ -241,6 +257,21 @@ func (s *Server) followStep(host string, port int, followc int) error {
if v.String() != "OK" { if v.String() != "OK" {
return errors.New("invalid response to replconf request") return errors.New("invalid response to replconf request")
} }
// Send the replication ip to the leader
ip := s.config.announceIP()
if ip != "" {
v, err := conn.Do("replconf", "ip-address", ip)
if err != nil {
return err
}
if v.Error() != nil {
return v.Error()
}
if v.String() != "OK" {
return errors.New("invalid response to replconf request")
}
}
if s.opts.ShowDebugMessages { if s.opts.ShowDebugMessages {
log.Debug("follow:", addr, ":replconf") log.Debug("follow:", addr, ":replconf")
} }

View File

@ -430,6 +430,9 @@ func (s *Server) writeInfoStats(w *bytes.Buffer) {
func replicaIPAndPort(cc *Client) (ip string, port int) { func replicaIPAndPort(cc *Client) (ip string, port int) {
ip = cc.remoteAddr ip = cc.remoteAddr
if cc.replAddr != "" {
ip = cc.replAddr
}
i := strings.LastIndex(ip, ":") i := strings.LastIndex(ip, ":")
if i != -1 { if i != -1 {
ip = ip[:i] ip = ip[:i]