Merge pull request #693 from Kilowhisky/feature/replica_announce

Feature/replica announce
This commit is contained in:
Josh Baker 2023-06-30 07:11:03 -07:00 committed by GitHub
commit d7a766c6d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 2 deletions

View File

@ -16,6 +16,7 @@ import (
type Client struct {
id int // unique id
replPort int // the known replication port for follower connections
replAddr string // the known replication addr for follower connections
authd bool // client has been authenticated
outputType Type // Null, JSON, or RESP
remoteAddr string // original remote address

View File

@ -34,9 +34,11 @@ const (
AutoGC = "autogc"
KeepAlive = "keepalive"
LogConfig = "logconfig"
AnnounceIP = "replica_announce_ip"
AnnouncePort = "replica_announce_port"
)
var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive, LogConfig, ReplicaPriority}
var validProperties = []string{RequirePass, LeaderAuth, ProtectedMode, MaxMemory, AutoGC, KeepAlive, LogConfig, ReplicaPriority, AnnouncePort, AnnounceIP}
// Config is a tile38 config
type Config struct {
@ -66,6 +68,10 @@ type Config struct {
_keepAlive int64
_logConfigP interface{}
_logConfig string
_announceIPP string
_announceIP string
_announcePortP string
_announcePort int64
}
func loadConfig(path string) (*Config, error) {
@ -94,6 +100,8 @@ func loadConfig(path string) (*Config, error) {
_autoGCP: gjson.Get(json, AutoGC).String(),
_keepAliveP: gjson.Get(json, KeepAlive).String(),
_logConfig: gjson.Get(json, LogConfig).String(),
_announceIPP: gjson.Get(json, AnnounceIP).String(),
_announcePortP: gjson.Get(json, AnnouncePort).String(),
}
if config._serverID == "" {
@ -131,6 +139,12 @@ func loadConfig(path string) (*Config, error) {
if err := config.setProperty(LogConfig, config._logConfig, true); err != nil {
return nil, err
}
if err := config.setProperty(AnnounceIP, config._announceIPP, true); err != nil {
return nil, err
}
if err := config.setProperty(AnnouncePort, config._announcePortP, true); err != nil {
return nil, err
}
config.write(false)
return config, nil
}
@ -162,6 +176,14 @@ func (config *Config) write(writeProperties bool) {
if config._logConfig != "" {
config._logConfigP = config._logConfig
}
if config._announceIP != "" {
config._announceIPP = config._announceIP
}
if config._announcePort == 0 {
config._announcePortP = ""
} else {
config._announcePortP = strconv.FormatUint(uint64(config._announcePort), 10)
}
}
m := make(map[string]interface{})
@ -211,6 +233,12 @@ func (config *Config) write(writeProperties bool) {
m[LogConfig] = lcfg
}
}
if config._announceIPP != "" {
m[AnnounceIP] = config._announceIPP
}
if config._announcePortP != "" {
m[AnnouncePort] = config._announcePortP
}
data, err := json.MarshalIndent(m, "", "\t")
if err != nil {
panic(err)
@ -332,6 +360,19 @@ func (config *Config) setProperty(name, value string, fromLoad bool) error {
} else {
config._replicaPriority = replicaPriority
}
case AnnounceIP:
config._announceIP = value
case AnnouncePort:
if value == "" {
config._announcePort = 0
} else {
announcePort, err := strconv.ParseUint(value, 10, 64)
if err != nil {
invalid = true
} else {
config._announcePort = int64(announcePort)
}
}
}
if invalid {
@ -377,6 +418,10 @@ func (config *Config) getProperty(name string) string {
} else {
return strconv.FormatUint(uint64(config._replicaPriority), 10)
}
case AnnounceIP:
return config._announceIP
case AnnouncePort:
return strconv.FormatUint(uint64(config._announcePort), 10)
}
}
@ -509,6 +554,18 @@ func (config *Config) keepAlive() int64 {
config.mu.RUnlock()
return v
}
func (config *Config) announceIP() string {
config.mu.RLock()
v := config._announceIP
config.mu.RUnlock()
return v
}
func (config *Config) announcePort() int {
config.mu.RLock()
v := config._announcePort
config.mu.RUnlock()
return int(v)
}
func (config *Config) setFollowHost(v string) {
config.mu.Lock()
config._followHost = v

View File

@ -130,6 +130,18 @@ func (s *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err
return OKMessage(msg, start), nil
}
}
case "ip-address":
// Apply the replication ip to the client and return
s.connsmu.RLock()
defer s.connsmu.RUnlock()
for _, c := range s.conns {
if c.remoteAddr == client.remoteAddr {
c.mu.Lock()
c.replAddr = val
c.mu.Unlock()
return OKMessage(msg, start), nil
}
}
}
return NOMessage, fmt.Errorf("cannot find follower")
}
@ -231,7 +243,11 @@ func (s *Server) followStep(host string, port int, followc int) error {
}
// Send the replication port to the leader
v, err := conn.Do("replconf", "listening-port", s.port)
p := s.config.announcePort()
if p == 0 {
p = s.port
}
v, err := conn.Do("replconf", "listening-port", p)
if err != nil {
return err
}
@ -241,6 +257,21 @@ func (s *Server) followStep(host string, port int, followc int) error {
if v.String() != "OK" {
return errors.New("invalid response to replconf request")
}
// Send the replication ip to the leader
ip := s.config.announceIP()
if ip != "" {
v, err := conn.Do("replconf", "ip-address", ip)
if err != nil {
return err
}
if v.Error() != nil {
return v.Error()
}
if v.String() != "OK" {
return errors.New("invalid response to replconf request")
}
}
if s.opts.ShowDebugMessages {
log.Debug("follow:", addr, ":replconf")
}

View File

@ -430,6 +430,9 @@ func (s *Server) writeInfoStats(w *bytes.Buffer) {
func replicaIPAndPort(cc *Client) (ip string, port int) {
ip = cc.remoteAddr
if cc.replAddr != "" {
ip = cc.replAddr
}
i := strings.LastIndex(ip, ":")
if i != -1 {
ip = ip[:i]