Merge pull request #404 from tidwall/Adding-more-replication-data-to-INFO-response

Adding more replication data to INFO response
This commit is contained in:
Josh Baker 2019-01-24 11:53:13 -07:00 committed by GitHub
commit bfa35d5db9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 6 deletions

View File

@ -17,6 +17,7 @@ import (
// Client is an remote connection into to Tile38
type Client struct {
id int // unique id
replPort int // the known replication port for follower connections
authd bool // client has been authenticated
outputType Type // Null, JSON, or RESP
remoteAddr string // original remote address

View File

@ -96,6 +96,41 @@ func (c *Server) cmdFollow(msg *Message) (res resp.Value, err error) {
return OKMessage(msg, start), nil
}
// cmdReplConf is a command handler that sets replication configuration info
func (c *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err error) {
start := time.Now()
vs := msg.Args[1:]
var ok bool
var cmd, val string
// Parse the message
if vs, cmd, ok = tokenval(vs); !ok || cmd == "" {
return NOMessage, errInvalidNumberOfArguments
}
if vs, val, ok = tokenval(vs); !ok || val == "" {
return NOMessage, errInvalidNumberOfArguments
}
// Switch on the command received
switch cmd {
case "listening-port":
// Parse the port as an integer
port, err := strconv.Atoi(val)
if err != nil {
return NOMessage, errInvalidArgument(val)
}
// Apply the replication port to the client and return
for _, c := range c.conns {
if c.remoteAddr == client.remoteAddr {
c.replPort = port
return OKMessage(msg, start), nil
}
}
}
return NOMessage, fmt.Errorf("cannot find follower")
}
func doServer(conn *RESPConn) (map[string]string, error) {
v, err := conn.Do("server")
if err != nil {
@ -191,7 +226,22 @@ func (c *Server) followStep(host string, port int, followc int) error {
return err
}
v, err := conn.Do("aof", pos)
// Send the replication port to the leader
v, err := conn.Do("replconf", "listening-port", c.port)
if err != nil {
return err
}
if v.Error() != nil {
return v.Error()
}
if v.String() != "OK" {
return errors.New("invalid response to replconf request")
}
if core.ShowDebugMessages {
log.Debug("follow:", addr, ":replconf")
}
v, err = conn.Do("aof", pos)
if err != nil {
return err
}

View File

@ -970,7 +970,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
if server.config.followHost() != "" && !server.fcuponce {
return writeErr("catching up to leader")
}
case "follow", "readonly", "config":
case "follow", "slaveof", "replconf", "readonly", "config":
// system operations
// does not write to aof, but requires a write lock.
server.mu.Lock()
@ -1081,7 +1081,6 @@ func (server *Server) command(msg *Message, client *Client) (
res, d, err = server.cmdRename(msg, false)
case "renamenx":
res, d, err = server.cmdRename(msg, true)
case "sethook":
res, d, err = server.cmdSetHook(msg, false)
case "delhook":
@ -1090,7 +1089,6 @@ func (server *Server) command(msg *Message, client *Client) (
res, d, err = server.cmdPDelHook(msg, false)
case "hooks":
res, err = server.cmdHooks(msg, false)
case "setchan":
res, d, err = server.cmdSetHook(msg, true)
case "delchan":
@ -1099,7 +1097,6 @@ func (server *Server) command(msg *Message, client *Client) (
res, d, err = server.cmdPDelHook(msg, true)
case "chans":
res, err = server.cmdHooks(msg, true)
case "expire":
res, d, err = server.cmdExpire(msg)
case "persist":
@ -1124,8 +1121,10 @@ func (server *Server) command(msg *Message, client *Client) (
return
}
res, err = server.cmdSleep(msg)
case "follow":
case "follow", "slaveof":
res, err = server.cmdFollow(msg)
case "replconf":
res, err = server.cmdReplConf(msg, client)
case "readonly":
res, err = server.cmdReadOnly(msg)
case "stats":

View File

@ -352,9 +352,27 @@ func (c *Server) writeInfoStats(w *bytes.Buffer) {
fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands.get()) // Total number of commands processed by the server
fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired.get()) // Total number of key expiration events
}
// writeInfoReplication writes all replication data to the 'info' response
func (c *Server) writeInfoReplication(w *bytes.Buffer) {
if c.config.followHost() != "" {
fmt.Fprintf(w, "role:slave\r\n")
fmt.Fprintf(w, "master_host:%s\r\n", c.config.followHost())
fmt.Fprintf(w, "master_port:%v\r\n", c.config.followPort())
} else {
fmt.Fprintf(w, "role:master\r\n")
var i int
for _, cc := range c.conns {
if cc.replPort != 0 {
fmt.Fprintf(w, "slave%v:ip=%s,port=%v,state=online\r\n", i,
strings.Split(cc.remoteAddr, ":")[0], cc.replPort)
i++
}
}
}
fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves
}
func (c *Server) writeInfoCluster(w *bytes.Buffer) {
fmt.Fprintf(w, "cluster_enabled:0\r\n")
}