Added MONITOR command

closes #571
This commit is contained in:
tidwall 2020-08-12 12:38:35 -07:00
parent 5f5c4d9f03
commit 3d7242d06c
5 changed files with 123 additions and 1 deletions

View File

@ -180,6 +180,7 @@ func main() {
}
}
connDial()
monitor := false
livemode := false
aof := false
defer func() {
@ -359,6 +360,10 @@ func main() {
if jsonOK(msg) {
output = "json"
}
case "monitor":
monitor = true
livemode = true
output = "resp"
}
if output == "resp" &&
(strings.HasPrefix(string(msg), "*3\r\n$10\r\npsubscribe\r\n") ||
@ -382,7 +387,7 @@ func main() {
}
mustOutput := true
if oneCommand == "" && output == "json" && !jsonOK(msg) {
if !monitor && oneCommand == "" && output == "json" && !jsonOK(msg) {
var cerr connError
if err := json.Unmarshal(msg, &cerr); err == nil {
fmt.Fprintln(os.Stderr, "(error) "+cerr.Err)

View File

@ -87,6 +87,8 @@ func (server *Server) goLive(
return server.liveAOF(s.pos, conn, rd, msg)
case liveSubscriptionSwitches:
return server.liveSubscription(conn, rd, msg, websocket)
case liveMonitorSwitches:
return server.liveMonitor(conn, rd, msg)
case liveFenceSwitches:
// fallthrough
}

104
internal/server/monitor.go Normal file
View File

@ -0,0 +1,104 @@
package server
import (
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
"github.com/tidwall/resp"
)
type liveMonitorSwitches struct {
// no fields. everything is managed through the Message
}
func (sub liveMonitorSwitches) Error() string {
return goingLive
}
func (s *Server) cmdMonitor(msg *Message) (resp.Value, error) {
if len(msg.Args) != 1 {
return resp.Value{}, errInvalidNumberOfArguments
}
return NOMessage, liveMonitorSwitches{}
}
func (s *Server) liveMonitor(conn net.Conn, rd *PipelineReader, msg *Message) error {
s.monconnsMu.Lock()
s.monconns[conn] = true
s.monconnsMu.Unlock()
defer func() {
s.monconnsMu.Lock()
delete(s.monconns, conn)
s.monconnsMu.Unlock()
conn.Close()
}()
s.monconnsMu.Lock()
conn.Write([]byte("+OK\r\n"))
s.monconnsMu.Unlock()
for {
msgs, err := rd.ReadMessages()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
for _, msg := range msgs {
if len(msg.Args) == 1 && strings.ToLower(msg.Args[0]) == "quit" {
s.monconnsMu.Lock()
conn.Write([]byte("+OK\r\n"))
s.monconnsMu.Unlock()
return nil
}
}
return nil
}
}
// send messages to live MONITOR clients
func (s *Server) sendMonitor(err error, msg *Message, c *Client, lua bool) {
s.monconnsMu.RLock()
n := len(s.monconns)
s.monconnsMu.RUnlock()
if n == 0 {
return
}
if (c == nil && !lua) ||
(err != nil && (err == errInvalidNumberOfArguments ||
strings.HasPrefix(err.Error(), "unknown command "))) {
return
}
// accept all commands except for these:
switch strings.ToLower(msg.Command()) {
case "config", "config set", "config get", "config rewrite",
"auth", "follow", "slaveof", "replconf",
"aof", "aofmd5", "client",
"monitor":
return
}
var line []byte
for i, arg := range msg.Args {
if i > 0 {
line = append(line, ' ')
}
line = append(line, strconv.Quote(arg)...)
}
tstr := fmt.Sprintf("%.6f", float64(time.Now().UnixNano())/1e9)
var addr string
if lua {
addr = "lua"
} else {
addr = c.remoteAddr
}
s.monconnsMu.Lock()
for conn := range s.monconns {
fmt.Fprintf(conn, "+%s [0 %s] %s\r\n", tstr, addr, line)
}
s.monconnsMu.Unlock()
}

View File

@ -642,6 +642,7 @@ func (s *Server) commandInScript(msg *Message) (
case "server":
res, err = s.cmdServer(msg)
}
s.sendMonitor(err, msg, nil, true)
return
}

View File

@ -123,6 +123,9 @@ type Server struct {
pubsub *pubsub
hookex expire.List
monconnsMu sync.RWMutex
monconns map[net.Conn]bool
}
// Serve starts a new tile38 server
@ -152,6 +155,7 @@ func Serve(host string, port int, dir string, http bool) error {
conns: make(map[int]*Client),
http: http,
pubsub: newPubsub(),
monconns: make(map[net.Conn]bool),
}
server.hookex.Expired = func(item expire.Item) {
@ -746,6 +750,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
}
return writeOutput("+PONG\r\n")
}
server.sendMonitor(nil, msg, client, false)
return nil
}
@ -866,6 +871,8 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
// No locking for scripts, otherwise writes cannot happen within scripts
case "subscribe", "psubscribe", "publish":
// No locking for pubsub
case "montior":
// No locking for monitor
}
res, d, err := func() (res resp.Value, d commandDetails, err error) {
if msg.Deadline != nil {
@ -1088,7 +1095,10 @@ func (server *Server) command(msg *Message, client *Client) (
res, err = server.cmdPublish(msg)
case "test":
res, err = server.cmdTest(msg)
case "monitor":
res, err = server.cmdMonitor(msg)
}
server.sendMonitor(err, msg, client, false)
return
}