diff --git a/internal/server/client.go b/internal/server/client.go index 87221feb..dbfaa737 100644 --- a/internal/server/client.go +++ b/internal/server/client.go @@ -17,6 +17,7 @@ import ( // Client is an remote connection into to Tile38 type Client struct { id int // unique id + replPort int // the known replication port for follower connections authd bool // client has been authenticated outputType Type // Null, JSON, or RESP remoteAddr string // original remote address diff --git a/internal/server/follow.go b/internal/server/follow.go index 69878c52..42d27dc5 100644 --- a/internal/server/follow.go +++ b/internal/server/follow.go @@ -96,6 +96,41 @@ func (c *Server) cmdFollow(msg *Message) (res resp.Value, err error) { return OKMessage(msg, start), nil } +// cmdReplConf is a command handler that sets replication configuration info +func (c *Server) cmdReplConf(msg *Message, client *Client) (res resp.Value, err error) { + start := time.Now() + vs := msg.Args[1:] + var ok bool + var cmd, val string + + // Parse the message + if vs, cmd, ok = tokenval(vs); !ok || cmd == "" { + return NOMessage, errInvalidNumberOfArguments + } + if vs, val, ok = tokenval(vs); !ok || val == "" { + return NOMessage, errInvalidNumberOfArguments + } + + // Switch on the command received + switch cmd { + case "listening-port": + // Parse the port as an integer + port, err := strconv.Atoi(val) + if err != nil { + return NOMessage, errInvalidArgument(val) + } + + // Apply the replication port to the client and return + for _, c := range c.conns { + if c.remoteAddr == client.remoteAddr { + c.replPort = port + return OKMessage(msg, start), nil + } + } + } + return NOMessage, fmt.Errorf("cannot find follower") +} + func doServer(conn *RESPConn) (map[string]string, error) { v, err := conn.Do("server") if err != nil { @@ -191,7 +226,22 @@ func (c *Server) followStep(host string, port int, followc int) error { return err } - v, err := conn.Do("aof", pos) + // Send the replication port to the leader + v, err := conn.Do("replconf", "listening-port", c.port) + if err != nil { + return err + } + if v.Error() != nil { + return v.Error() + } + if v.String() != "OK" { + return errors.New("invalid response to replconf request") + } + if core.ShowDebugMessages { + log.Debug("follow:", addr, ":replconf") + } + + v, err = conn.Do("aof", pos) if err != nil { return err } diff --git a/internal/server/server.go b/internal/server/server.go index dae1afa0..69471948 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -970,7 +970,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { if server.config.followHost() != "" && !server.fcuponce { return writeErr("catching up to leader") } - case "follow", "readonly", "config": + case "follow", "slaveof", "replconf", "readonly", "config": // system operations // does not write to aof, but requires a write lock. server.mu.Lock() @@ -1081,7 +1081,6 @@ func (server *Server) command(msg *Message, client *Client) ( res, d, err = server.cmdRename(msg, false) case "renamenx": res, d, err = server.cmdRename(msg, true) - case "sethook": res, d, err = server.cmdSetHook(msg, false) case "delhook": @@ -1090,7 +1089,6 @@ func (server *Server) command(msg *Message, client *Client) ( res, d, err = server.cmdPDelHook(msg, false) case "hooks": res, err = server.cmdHooks(msg, false) - case "setchan": res, d, err = server.cmdSetHook(msg, true) case "delchan": @@ -1099,7 +1097,6 @@ func (server *Server) command(msg *Message, client *Client) ( res, d, err = server.cmdPDelHook(msg, true) case "chans": res, err = server.cmdHooks(msg, true) - case "expire": res, d, err = server.cmdExpire(msg) case "persist": @@ -1124,8 +1121,10 @@ func (server *Server) command(msg *Message, client *Client) ( return } res, err = server.cmdSleep(msg) - case "follow": + case "follow", "slaveof": res, err = server.cmdFollow(msg) + case "replconf": + res, err = server.cmdReplConf(msg, client) case "readonly": res, err = server.cmdReadOnly(msg) case "stats": diff --git a/internal/server/stats.go b/internal/server/stats.go index aab2cc45..c4db4880 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -352,9 +352,27 @@ func (c *Server) writeInfoStats(w *bytes.Buffer) { fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands.get()) // Total number of commands processed by the server fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired.get()) // Total number of key expiration events } + +// writeInfoReplication writes all replication data to the 'info' response func (c *Server) writeInfoReplication(w *bytes.Buffer) { + if c.config.followHost() != "" { + fmt.Fprintf(w, "role:slave\r\n") + fmt.Fprintf(w, "master_host:%s\r\n", c.config.followHost()) + fmt.Fprintf(w, "master_port:%v\r\n", c.config.followPort()) + } else { + fmt.Fprintf(w, "role:master\r\n") + var i int + for _, cc := range c.conns { + if cc.replPort != 0 { + fmt.Fprintf(w, "slave%v:ip=%s,port=%v,state=online\r\n", i, + strings.Split(cc.remoteAddr, ":")[0], cc.replPort) + i++ + } + } + } fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves } + func (c *Server) writeInfoCluster(w *bytes.Buffer) { fmt.Fprintf(w, "cluster_enabled:0\r\n") }