mirror of https://github.com/tidwall/tile38.git
Adding more replication data to INFO response
This commit is contained in:
parent
f638904c14
commit
79c902efbf
|
@ -17,6 +17,7 @@ import (
|
|||
// Client is an remote connection into to Tile38
|
||||
type Client struct {
|
||||
id int // unique id
|
||||
replPort int // the known replication port for follower connections
|
||||
authd bool // client has been authenticated
|
||||
outputType Type // Null, JSON, or RESP
|
||||
remoteAddr string // original remote address
|
||||
|
|
|
@ -96,6 +96,41 @@ func (c *Server) cmdFollow(msg *Message) (res resp.Value, err error) {
|
|||
return OKMessage(msg, start), nil
|
||||
}
|
||||
|
||||
// cmdReplConf is a command handler that sets replication configuration info
|
||||
func (c *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err error) {
|
||||
start := time.Now()
|
||||
vs := msg.Args[1:]
|
||||
var ok bool
|
||||
var cmd, val string
|
||||
|
||||
// Parse the message
|
||||
if vs, cmd, ok = tokenval(vs); !ok || cmd == "" {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
if vs, val, ok = tokenval(vs); !ok || val == "" {
|
||||
return NOMessage, errInvalidNumberOfArguments
|
||||
}
|
||||
|
||||
// Switch on the command received
|
||||
switch cmd {
|
||||
case "listening-port":
|
||||
// Parse the port as an integer
|
||||
port, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return NOMessage, errInvalidArgument(val)
|
||||
}
|
||||
|
||||
// Apply the replication port to the client and return
|
||||
for _, c := range c.conns {
|
||||
if c.remoteAddr == client.remoteAddr {
|
||||
c.replPort = port
|
||||
return OKMessage(msg, start), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return NOMessage, fmt.Errorf("cannot find follower")
|
||||
}
|
||||
|
||||
func doServer(conn *RESPConn) (map[string]string, error) {
|
||||
v, err := conn.Do("server")
|
||||
if err != nil {
|
||||
|
@ -191,7 +226,22 @@ func (c *Server) followStep(host string, port int, followc int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
v, err := conn.Do("aof", pos)
|
||||
// Send the replication port to the leader
|
||||
v, err := conn.Do("replconf", "listening-port", c.port)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v.Error() != nil {
|
||||
return v.Error()
|
||||
}
|
||||
if v.String() != "OK" {
|
||||
return errors.New("invalid response to replconf request")
|
||||
}
|
||||
if core.ShowDebugMessages {
|
||||
log.Debug("follow:", addr, ":replconf")
|
||||
}
|
||||
|
||||
v, err = conn.Do("aof", pos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -970,7 +970,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
|
|||
if server.config.followHost() != "" && !server.fcuponce {
|
||||
return writeErr("catching up to leader")
|
||||
}
|
||||
case "follow", "readonly", "config":
|
||||
case "follow", "slaveof", "replconf", "readonly", "config":
|
||||
// system operations
|
||||
// does not write to aof, but requires a write lock.
|
||||
server.mu.Lock()
|
||||
|
@ -1081,7 +1081,6 @@ func (server *Server) command(msg *Message, client *Client) (
|
|||
res, d, err = server.cmdRename(msg, false)
|
||||
case "renamenx":
|
||||
res, d, err = server.cmdRename(msg, true)
|
||||
|
||||
case "sethook":
|
||||
res, d, err = server.cmdSetHook(msg, false)
|
||||
case "delhook":
|
||||
|
@ -1090,7 +1089,6 @@ func (server *Server) command(msg *Message, client *Client) (
|
|||
res, d, err = server.cmdPDelHook(msg, false)
|
||||
case "hooks":
|
||||
res, err = server.cmdHooks(msg, false)
|
||||
|
||||
case "setchan":
|
||||
res, d, err = server.cmdSetHook(msg, true)
|
||||
case "delchan":
|
||||
|
@ -1099,7 +1097,6 @@ func (server *Server) command(msg *Message, client *Client) (
|
|||
res, d, err = server.cmdPDelHook(msg, true)
|
||||
case "chans":
|
||||
res, err = server.cmdHooks(msg, true)
|
||||
|
||||
case "expire":
|
||||
res, d, err = server.cmdExpire(msg)
|
||||
case "persist":
|
||||
|
@ -1124,8 +1121,10 @@ func (server *Server) command(msg *Message, client *Client) (
|
|||
return
|
||||
}
|
||||
res, err = server.cmdSleep(msg)
|
||||
case "follow":
|
||||
case "follow", "slaveof":
|
||||
res, err = server.cmdFollow(msg)
|
||||
case "replconf":
|
||||
res, err = server.cmdReplConf(msg, client)
|
||||
case "readonly":
|
||||
res, err = server.cmdReadOnly(msg)
|
||||
case "stats":
|
||||
|
|
|
@ -352,9 +352,27 @@ func (c *Server) writeInfoStats(w *bytes.Buffer) {
|
|||
fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands.get()) // Total number of commands processed by the server
|
||||
fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired.get()) // Total number of key expiration events
|
||||
}
|
||||
|
||||
// writeInfoReplication writes all replication data to the 'info' response
|
||||
func (c *Server) writeInfoReplication(w *bytes.Buffer) {
|
||||
if c.config.followHost() != "" {
|
||||
fmt.Fprintf(w, "role:slave\r\n")
|
||||
fmt.Fprintf(w, "master_host:%s\r\n", c.config.followHost())
|
||||
fmt.Fprintf(w, "master_port:%v\r\n", c.config.followPort())
|
||||
} else {
|
||||
fmt.Fprintf(w, "role:master\r\n")
|
||||
var i int
|
||||
for _, cc := range c.conns {
|
||||
if cc.replPort != 0 {
|
||||
fmt.Fprintf(w, "slave%v:ip=%s,port=%v,state=online\r\n", i,
|
||||
strings.Split(cc.remoteAddr, ":")[0], cc.replPort)
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves
|
||||
}
|
||||
|
||||
func (c *Server) writeInfoCluster(w *bytes.Buffer) {
|
||||
fmt.Fprintf(w, "cluster_enabled:0\r\n")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue