diff --git a/controller/aof.go b/controller/aof.go index f17917a7..f094d394 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -124,7 +124,7 @@ func (c *Controller) loadAOF() error { } nn += 1 + len(ns) + int(n) + 2 } - if _, _, err := c.command(&msg, nil); err != nil { + if _, _, err := c.command(&msg, nil, nil); err != nil { if commandErrIsFatal(err) { return err } diff --git a/controller/client.go b/controller/client.go index 561b70bd..716c11c7 100644 --- a/controller/client.go +++ b/controller/client.go @@ -1,10 +1,15 @@ package controller import ( + "errors" + "fmt" "net" + "sort" + "strings" "time" "github.com/tidwall/resp" + "github.com/tidwall/tile38/controller/server" ) // Conn represents a simple resp connection. @@ -42,3 +47,186 @@ func (conn *Conn) Do(commandName string, args ...interface{}) (val resp.Value, e val, _, err = conn.rd.ReadValue() return val, err } + +type byID []*clientConn + +func (arr byID) Len() int { + return len(arr) +} +func (arr byID) Less(a, b int) bool { + return arr[a].id < arr[b].id +} +func (arr byID) Swap(a, b int) { + arr[a], arr[b] = arr[b], arr[a] +} +func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, error) { + start := time.Now() + if len(msg.Values) == 1 { + return "", errInvalidNumberOfArguments + } + switch strings.ToLower(msg.Values[1].String()) { + default: + return "", errors.New("Syntax error, try CLIENT " + + "(LIST | KILL | GETNAME | SETNAME)") + case "list": + if len(msg.Values) != 2 { + return "", errInvalidNumberOfArguments + } + var list []*clientConn + for _, cc := range c.conns { + list = append(list, cc) + } + sort.Sort(byID(list)) + now := time.Now() + var buf []byte + for _, cc := range list { + buf = append(buf, + fmt.Sprintf("id=%d addr=%s name=%s age=%d idle=%d\n", + cc.id, cc.conn.RemoteAddr().String(), cc.name, + now.Sub(cc.opened)/time.Second, + now.Sub(cc.last)/time.Second, + )..., + ) + } + switch msg.OutputType { + case server.JSON: + return `{"ok":true,"list":` + jsonString(string(buf)) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil + case server.RESP: + data, err := resp.BytesValue(buf).MarshalRESP() + if err != nil { + return "", err + } + return string(data), nil + } + return "", nil + case "getname": + if len(msg.Values) != 2 { + return "", errInvalidNumberOfArguments + } + name := "" + if cc, ok := c.conns[conn]; ok { + name = cc.name + } + switch msg.OutputType { + case server.JSON: + return `{"ok":true,"name":` + jsonString(name) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil + case server.RESP: + data, err := resp.StringValue(name).MarshalRESP() + if err != nil { + return "", err + } + return string(data), nil + } + case "setname": + if len(msg.Values) != 3 { + return "", errInvalidNumberOfArguments + } + name := msg.Values[2].String() + for i := 0; i < len(name); i++ { + if name[i] < '!' || name[i] > '~' { + return "", errors.New("Client names cannot contain spaces, newlines or special characters.") + } + } + if cc, ok := c.conns[conn]; ok { + cc.name = name + } + switch msg.OutputType { + case server.JSON: + return `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil + case server.RESP: + return "+OK\r\n", nil + } + case "kill": + if len(msg.Values) < 3 { + return "", errInvalidNumberOfArguments + } + var useAddr bool + var addr string + var useID bool + var id string + for i := 2; i < len(msg.Values); i++ { + arg := msg.Values[i].String() + if strings.Contains(arg, ":") { + addr = arg + useAddr = true + break + } + switch strings.ToLower(arg) { + default: + return "", errors.New("No such client") + case "addr": + i++ + if i == len(msg.Values) { + return "", errors.New("syntax error") + } + addr = msg.Values[i].String() + useAddr = true + case "id": + i++ + if i == len(msg.Values) { + return "", errors.New("syntax error") + } + id = msg.Values[i].String() + useID = true + } + } + var cclose *clientConn + for _, cc := range c.conns { + if useID && fmt.Sprintf("%d", cc.id) == id { + cclose = cc + break + } else if useAddr && cc.conn.RemoteAddr().String() == addr { + cclose = cc + break + } + } + if cclose == nil { + return "", errors.New("No such client") + } + + var res string + switch msg.OutputType { + case server.JSON: + res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + case server.RESP: + res = "+OK\r\n" + } + + if cclose.conn == conn { + // closing self, return response now + cclose.conn.Write([]byte(res)) + } + cclose.conn.Close() + return res, nil + } + return "", errors.New("invalid output type") +} + +/* +func (c *Controller) cmdClientList(msg *server.Message) (string, error) { + + var ok bool + var key string + if vs, key, ok = tokenval(vs); !ok || key == "" { + return "", errInvalidNumberOfArguments + } + + col := c.getCol(key) + if col == nil { + if msg.OutputType == server.RESP { + return "+none\r\n", nil + } + return "", errKeyNotFound + } + + typ := "hash" + + switch msg.OutputType { + case server.JSON: + return `{"ok":true,"type":` + string(typ) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil + case server.RESP: + return "+" + typ + "\r\n", nil + } + return "", nil +} +*/ diff --git a/controller/controller.go b/controller/controller.go index 709c9499..50074d90 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -57,6 +57,14 @@ func (col *collectionT) Less(item btree.Item, ctx interface{}) bool { return col.Key < item.(*collectionT).Key } +type clientConn struct { + id uint64 + name string + opened time.Time + last time.Time + conn *server.Conn +} + // Controller is a tile38 controller type Controller struct { mu sync.RWMutex @@ -82,7 +90,7 @@ type Controller struct { hookcols map[string]map[string]*Hook // col key aofconnM map[net.Conn]bool expires map[string]map[string]time.Time - conns map[*server.Conn]bool + conns map[*server.Conn]*clientConn started time.Time http bool @@ -121,7 +129,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http aofconnM: make(map[net.Conn]bool), expires: make(map[string]map[string]time.Time), started: time.Now(), - conns: make(map[*server.Conn]bool), + conns: make(map[*server.Conn]*clientConn), epc: endpoint.NewEndpointManager(), http: http, } @@ -190,6 +198,9 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http }() handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { c.mu.Lock() + if cc, ok := c.conns[conn]; ok { + cc.last = time.Now() + } c.statsTotalCommands++ c.mu.Unlock() err := c.handleInputCommand(conn, msg, w) @@ -215,9 +226,15 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http c.mu.RUnlock() return is } + var clientId uint64 opened := func(conn *server.Conn) { c.mu.Lock() - c.conns[conn] = true + clientId++ + c.conns[conn] = &clientConn{ + id: clientId, + opened: time.Now(), + conn: conn, + } c.statsTotalConns++ c.mu.Unlock() } @@ -479,9 +496,12 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message, case "aofshrink": c.mu.RLock() defer c.mu.RUnlock() + case "client": + c.mu.Lock() + defer c.mu.Unlock() } - res, d, err := c.command(msg, w) + res, d, err := c.command(msg, w, conn) if err != nil { if err.Error() == "going live" { return err @@ -522,7 +542,11 @@ func (c *Controller) reset() { c.cols = btree.New(16, 0) } -func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d commandDetailsT, err error) { +func (c *Controller) command( + msg *server.Message, w io.Writer, conn *server.Conn, +) ( + res string, d commandDetailsT, err error, +) { switch msg.Command { default: err = fmt.Errorf("unknown command '%s'", msg.Values[0]) @@ -624,8 +648,10 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co msg.Values[1] = resp.StringValue(command) msg.Values = msg.Values[1:] msg.Command = strings.ToLower(command) - return c.command(msg, w) + return c.command(msg, w, conn) } + case "client": + res, err = c.cmdClient(msg, conn) } return } diff --git a/controller/dev.go b/controller/dev.go index c19a860e..7d44e41a 100644 --- a/controller/dev.go +++ b/controller/dev.go @@ -84,7 +84,7 @@ func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error) nmsg.Values = values nmsg.Command = strings.ToLower(values[0].String()) var d commandDetailsT - _, d, err = c.command(nmsg, nil) + _, d, err = c.command(nmsg, nil, nil) if err != nil { return err } diff --git a/controller/follow.go b/controller/follow.go index 3f151574..85629cfc 100644 --- a/controller/follow.go +++ b/controller/follow.go @@ -126,7 +126,7 @@ func (c *Controller) followHandleCommand(values []resp.Value, followc uint64, w Command: strings.ToLower(values[0].String()), Values: values, } - _, d, err := c.command(msg, nil) + _, d, err := c.command(msg, nil, nil) if err != nil { if commandErrIsFatal(err) { return c.aofsz, err