mirror of https://github.com/tidwall/tile38.git
parent
9093926135
commit
d819db5f8b
|
@ -180,6 +180,7 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
connDial()
|
connDial()
|
||||||
|
monitor := false
|
||||||
livemode := false
|
livemode := false
|
||||||
aof := false
|
aof := false
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -359,6 +360,10 @@ func main() {
|
||||||
if jsonOK(msg) {
|
if jsonOK(msg) {
|
||||||
output = "json"
|
output = "json"
|
||||||
}
|
}
|
||||||
|
case "monitor":
|
||||||
|
monitor = true
|
||||||
|
livemode = true
|
||||||
|
output = "resp"
|
||||||
}
|
}
|
||||||
if output == "resp" &&
|
if output == "resp" &&
|
||||||
(strings.HasPrefix(string(msg), "*3\r\n$10\r\npsubscribe\r\n") ||
|
(strings.HasPrefix(string(msg), "*3\r\n$10\r\npsubscribe\r\n") ||
|
||||||
|
@ -382,7 +387,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
mustOutput := true
|
mustOutput := true
|
||||||
if oneCommand == "" && output == "json" && !jsonOK(msg) {
|
if !monitor && oneCommand == "" && output == "json" && !jsonOK(msg) {
|
||||||
var cerr connError
|
var cerr connError
|
||||||
if err := json.Unmarshal(msg, &cerr); err == nil {
|
if err := json.Unmarshal(msg, &cerr); err == nil {
|
||||||
fmt.Fprintln(os.Stderr, "(error) "+cerr.Err)
|
fmt.Fprintln(os.Stderr, "(error) "+cerr.Err)
|
||||||
|
|
|
@ -87,6 +87,8 @@ func (server *Server) goLive(
|
||||||
return server.liveAOF(s.pos, conn, rd, msg)
|
return server.liveAOF(s.pos, conn, rd, msg)
|
||||||
case liveSubscriptionSwitches:
|
case liveSubscriptionSwitches:
|
||||||
return server.liveSubscription(conn, rd, msg, websocket)
|
return server.liveSubscription(conn, rd, msg, websocket)
|
||||||
|
case liveMonitorSwitches:
|
||||||
|
return server.liveMonitor(conn, rd, msg)
|
||||||
case liveFenceSwitches:
|
case liveFenceSwitches:
|
||||||
// fallthrough
|
// fallthrough
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/tidwall/resp"
|
||||||
|
)
|
||||||
|
|
||||||
|
type liveMonitorSwitches struct {
|
||||||
|
// no fields. everything is managed through the Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sub liveMonitorSwitches) Error() string {
|
||||||
|
return goingLive
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) cmdMonitor(msg *Message) (resp.Value, error) {
|
||||||
|
if len(msg.Args) != 1 {
|
||||||
|
return resp.Value{}, errInvalidNumberOfArguments
|
||||||
|
}
|
||||||
|
return NOMessage, liveMonitorSwitches{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) liveMonitor(conn net.Conn, rd *PipelineReader, msg *Message) error {
|
||||||
|
s.monconnsMu.Lock()
|
||||||
|
s.monconns[conn] = true
|
||||||
|
s.monconnsMu.Unlock()
|
||||||
|
defer func() {
|
||||||
|
s.monconnsMu.Lock()
|
||||||
|
delete(s.monconns, conn)
|
||||||
|
s.monconnsMu.Unlock()
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
s.monconnsMu.Lock()
|
||||||
|
conn.Write([]byte("+OK\r\n"))
|
||||||
|
s.monconnsMu.Unlock()
|
||||||
|
for {
|
||||||
|
msgs, err := rd.ReadMessages()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, msg := range msgs {
|
||||||
|
if len(msg.Args) == 1 && strings.ToLower(msg.Args[0]) == "quit" {
|
||||||
|
s.monconnsMu.Lock()
|
||||||
|
conn.Write([]byte("+OK\r\n"))
|
||||||
|
s.monconnsMu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send messages to live MONITOR clients
|
||||||
|
func (s *Server) sendMonitor(err error, msg *Message, c *Client, lua bool) {
|
||||||
|
s.monconnsMu.RLock()
|
||||||
|
n := len(s.monconns)
|
||||||
|
s.monconnsMu.RUnlock()
|
||||||
|
if n == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (c == nil && !lua) ||
|
||||||
|
(err != nil && (err == errInvalidNumberOfArguments ||
|
||||||
|
strings.HasPrefix(err.Error(), "unknown command "))) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// accept all commands except for these:
|
||||||
|
switch strings.ToLower(msg.Command()) {
|
||||||
|
case "config", "config set", "config get", "config rewrite",
|
||||||
|
"auth", "follow", "slaveof", "replconf",
|
||||||
|
"aof", "aofmd5", "client",
|
||||||
|
"monitor":
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var line []byte
|
||||||
|
for i, arg := range msg.Args {
|
||||||
|
if i > 0 {
|
||||||
|
line = append(line, ' ')
|
||||||
|
}
|
||||||
|
line = append(line, strconv.Quote(arg)...)
|
||||||
|
}
|
||||||
|
tstr := fmt.Sprintf("%.6f", float64(time.Now().UnixNano())/1e9)
|
||||||
|
var addr string
|
||||||
|
if lua {
|
||||||
|
addr = "lua"
|
||||||
|
} else {
|
||||||
|
addr = c.remoteAddr
|
||||||
|
}
|
||||||
|
s.monconnsMu.Lock()
|
||||||
|
for conn := range s.monconns {
|
||||||
|
fmt.Fprintf(conn, "+%s [0 %s] %s\r\n", tstr, addr, line)
|
||||||
|
}
|
||||||
|
s.monconnsMu.Unlock()
|
||||||
|
}
|
|
@ -642,6 +642,7 @@ func (s *Server) commandInScript(msg *Message) (
|
||||||
case "server":
|
case "server":
|
||||||
res, err = s.cmdServer(msg)
|
res, err = s.cmdServer(msg)
|
||||||
}
|
}
|
||||||
|
s.sendMonitor(err, msg, nil, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,6 +123,9 @@ type Server struct {
|
||||||
|
|
||||||
pubsub *pubsub
|
pubsub *pubsub
|
||||||
hookex expire.List
|
hookex expire.List
|
||||||
|
|
||||||
|
monconnsMu sync.RWMutex
|
||||||
|
monconns map[net.Conn]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve starts a new tile38 server
|
// Serve starts a new tile38 server
|
||||||
|
@ -152,6 +155,7 @@ func Serve(host string, port int, dir string, http bool) error {
|
||||||
conns: make(map[int]*Client),
|
conns: make(map[int]*Client),
|
||||||
http: http,
|
http: http,
|
||||||
pubsub: newPubsub(),
|
pubsub: newPubsub(),
|
||||||
|
monconns: make(map[net.Conn]bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
server.hookex.Expired = func(item expire.Item) {
|
server.hookex.Expired = func(item expire.Item) {
|
||||||
|
@ -746,6 +750,7 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
|
||||||
}
|
}
|
||||||
return writeOutput("+PONG\r\n")
|
return writeOutput("+PONG\r\n")
|
||||||
}
|
}
|
||||||
|
server.sendMonitor(nil, msg, client, false)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,6 +871,8 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
|
||||||
// No locking for scripts, otherwise writes cannot happen within scripts
|
// No locking for scripts, otherwise writes cannot happen within scripts
|
||||||
case "subscribe", "psubscribe", "publish":
|
case "subscribe", "psubscribe", "publish":
|
||||||
// No locking for pubsub
|
// No locking for pubsub
|
||||||
|
case "montior":
|
||||||
|
// No locking for monitor
|
||||||
}
|
}
|
||||||
res, d, err := func() (res resp.Value, d commandDetails, err error) {
|
res, d, err := func() (res resp.Value, d commandDetails, err error) {
|
||||||
if msg.Deadline != nil {
|
if msg.Deadline != nil {
|
||||||
|
@ -1088,7 +1095,10 @@ func (server *Server) command(msg *Message, client *Client) (
|
||||||
res, err = server.cmdPublish(msg)
|
res, err = server.cmdPublish(msg)
|
||||||
case "test":
|
case "test":
|
||||||
res, err = server.cmdTest(msg)
|
res, err = server.cmdTest(msg)
|
||||||
|
case "monitor":
|
||||||
|
res, err = server.cmdMonitor(msg)
|
||||||
}
|
}
|
||||||
|
server.sendMonitor(err, msg, client, false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue