diff --git a/cmd/tile38-cli/main.go b/cmd/tile38-cli/main.go index 2823547e..7b4dc95e 100644 --- a/cmd/tile38-cli/main.go +++ b/cmd/tile38-cli/main.go @@ -180,6 +180,7 @@ func main() { } } connDial() + monitor := false livemode := false aof := false defer func() { @@ -359,6 +360,10 @@ func main() { if jsonOK(msg) { output = "json" } + case "monitor": + monitor = true + livemode = true + output = "resp" } if output == "resp" && (strings.HasPrefix(string(msg), "*3\r\n$10\r\npsubscribe\r\n") || @@ -382,7 +387,7 @@ func main() { } mustOutput := true - if oneCommand == "" && output == "json" && !jsonOK(msg) { + if !monitor && oneCommand == "" && output == "json" && !jsonOK(msg) { var cerr connError if err := json.Unmarshal(msg, &cerr); err == nil { fmt.Fprintln(os.Stderr, "(error) "+cerr.Err) diff --git a/internal/server/live.go b/internal/server/live.go index ab1d6410..a49bb4a2 100644 --- a/internal/server/live.go +++ b/internal/server/live.go @@ -87,6 +87,8 @@ func (server *Server) goLive( return server.liveAOF(s.pos, conn, rd, msg) case liveSubscriptionSwitches: return server.liveSubscription(conn, rd, msg, websocket) + case liveMonitorSwitches: + return server.liveMonitor(conn, rd, msg) case liveFenceSwitches: // fallthrough } diff --git a/internal/server/monitor.go b/internal/server/monitor.go new file mode 100644 index 00000000..ba24e8a5 --- /dev/null +++ b/internal/server/monitor.go @@ -0,0 +1,104 @@ +package server + +import ( + "fmt" + "io" + "net" + "strconv" + "strings" + "time" + + "github.com/tidwall/resp" +) + +type liveMonitorSwitches struct { + // no fields. everything is managed through the Message +} + +func (sub liveMonitorSwitches) Error() string { + return goingLive +} + +func (s *Server) cmdMonitor(msg *Message) (resp.Value, error) { + if len(msg.Args) != 1 { + return resp.Value{}, errInvalidNumberOfArguments + } + return NOMessage, liveMonitorSwitches{} +} + +func (s *Server) liveMonitor(conn net.Conn, rd *PipelineReader, msg *Message) error { + s.monconnsMu.Lock() + s.monconns[conn] = true + s.monconnsMu.Unlock() + defer func() { + s.monconnsMu.Lock() + delete(s.monconns, conn) + s.monconnsMu.Unlock() + conn.Close() + }() + s.monconnsMu.Lock() + conn.Write([]byte("+OK\r\n")) + s.monconnsMu.Unlock() + for { + msgs, err := rd.ReadMessages() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + for _, msg := range msgs { + if len(msg.Args) == 1 && strings.ToLower(msg.Args[0]) == "quit" { + s.monconnsMu.Lock() + conn.Write([]byte("+OK\r\n")) + s.monconnsMu.Unlock() + return nil + } + } + return nil + } +} + +// send messages to live MONITOR clients +func (s *Server) sendMonitor(err error, msg *Message, c *Client, lua bool) { + s.monconnsMu.RLock() + n := len(s.monconns) + s.monconnsMu.RUnlock() + if n == 0 { + return + } + if (c == nil && !lua) || + (err != nil && (err == errInvalidNumberOfArguments || + strings.HasPrefix(err.Error(), "unknown command "))) { + return + } + + // accept all commands except for these: + switch strings.ToLower(msg.Command()) { + case "config", "config set", "config get", "config rewrite", + "auth", "follow", "slaveof", "replconf", + "aof", "aofmd5", "client", + "monitor": + return + } + + var line []byte + for i, arg := range msg.Args { + if i > 0 { + line = append(line, ' ') + } + line = append(line, strconv.Quote(arg)...) + } + tstr := fmt.Sprintf("%.6f", float64(time.Now().UnixNano())/1e9) + var addr string + if lua { + addr = "lua" + } else { + addr = c.remoteAddr + } + s.monconnsMu.Lock() + for conn := range s.monconns { + fmt.Fprintf(conn, "+%s [0 %s] %s\r\n", tstr, addr, line) + } + s.monconnsMu.Unlock() +} diff --git a/internal/server/scripts.go b/internal/server/scripts.go index 3cb949b3..b0bccd5a 100644 --- a/internal/server/scripts.go +++ b/internal/server/scripts.go @@ -642,6 +642,7 @@ func (s *Server) commandInScript(msg *Message) ( case "server": res, err = s.cmdServer(msg) } + s.sendMonitor(err, msg, nil, true) return } diff --git a/internal/server/server.go b/internal/server/server.go index 957f9497..cd29b1a8 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -123,6 +123,9 @@ type Server struct { pubsub *pubsub hookex expire.List + + monconnsMu sync.RWMutex + monconns map[net.Conn]bool } // Serve starts a new tile38 server @@ -152,6 +155,7 @@ func Serve(host string, port int, dir string, http bool) error { conns: make(map[int]*Client), http: http, pubsub: newPubsub(), + monconns: make(map[net.Conn]bool), } server.hookex.Expired = func(item expire.Item) { @@ -746,6 +750,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { } return writeOutput("+PONG\r\n") } + server.sendMonitor(nil, msg, client, false) return nil } @@ -866,6 +871,8 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { // No locking for scripts, otherwise writes cannot happen within scripts case "subscribe", "psubscribe", "publish": // No locking for pubsub + case "montior": + // No locking for monitor } res, d, err := func() (res resp.Value, d commandDetails, err error) { if msg.Deadline != nil { @@ -1088,7 +1095,10 @@ func (server *Server) command(msg *Message, client *Client) ( res, err = server.cmdPublish(msg) case "test": res, err = server.cmdTest(msg) + case "monitor": + res, err = server.cmdMonitor(msg) } + server.sendMonitor(err, msg, client, false) return }