Added CLIENT command

CLIENT LIST
CLIENT SETNAME name
CLIENT GETNAME
CLIENT KILL [ip:port] [ID client-id] [ADDR ip:port]

The CLIENT LIST command returns

  One client connection per line (separated by LF)
  Each line is composed of a succession of property=value
  fields separated by a space character.

  id: an unique 64-bit client ID
  addr: address/port of the clien
  age: total duration of the connection in seconds
  idle: idle time of the connection in seconds
  name: the name of the client

Suggested by @UriHendler, closes #139
This commit is contained in:
Josh Baker 2017-01-30 11:35:42 -07:00
parent 4d36e359ff
commit 49e1fcce7a
5 changed files with 223 additions and 9 deletions

View File

@ -124,7 +124,7 @@ func (c *Controller) loadAOF() error {
} }
nn += 1 + len(ns) + int(n) + 2 nn += 1 + len(ns) + int(n) + 2
} }
if _, _, err := c.command(&msg, nil); err != nil { if _, _, err := c.command(&msg, nil, nil); err != nil {
if commandErrIsFatal(err) { if commandErrIsFatal(err) {
return err return err
} }

View File

@ -1,10 +1,15 @@
package controller package controller
import ( import (
"errors"
"fmt"
"net" "net"
"sort"
"strings"
"time" "time"
"github.com/tidwall/resp" "github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/server"
) )
// Conn represents a simple resp connection. // Conn represents a simple resp connection.
@ -42,3 +47,186 @@ func (conn *Conn) Do(commandName string, args ...interface{}) (val resp.Value, e
val, _, err = conn.rd.ReadValue() val, _, err = conn.rd.ReadValue()
return val, err return val, err
} }
type byID []*clientConn
func (arr byID) Len() int {
return len(arr)
}
func (arr byID) Less(a, b int) bool {
return arr[a].id < arr[b].id
}
func (arr byID) Swap(a, b int) {
arr[a], arr[b] = arr[b], arr[a]
}
func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, error) {
start := time.Now()
if len(msg.Values) == 1 {
return "", errInvalidNumberOfArguments
}
switch strings.ToLower(msg.Values[1].String()) {
default:
return "", errors.New("Syntax error, try CLIENT " +
"(LIST | KILL | GETNAME | SETNAME)")
case "list":
if len(msg.Values) != 2 {
return "", errInvalidNumberOfArguments
}
var list []*clientConn
for _, cc := range c.conns {
list = append(list, cc)
}
sort.Sort(byID(list))
now := time.Now()
var buf []byte
for _, cc := range list {
buf = append(buf,
fmt.Sprintf("id=%d addr=%s name=%s age=%d idle=%d\n",
cc.id, cc.conn.RemoteAddr().String(), cc.name,
now.Sub(cc.opened)/time.Second,
now.Sub(cc.last)/time.Second,
)...,
)
}
switch msg.OutputType {
case server.JSON:
return `{"ok":true,"list":` + jsonString(string(buf)) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil
case server.RESP:
data, err := resp.BytesValue(buf).MarshalRESP()
if err != nil {
return "", err
}
return string(data), nil
}
return "", nil
case "getname":
if len(msg.Values) != 2 {
return "", errInvalidNumberOfArguments
}
name := ""
if cc, ok := c.conns[conn]; ok {
name = cc.name
}
switch msg.OutputType {
case server.JSON:
return `{"ok":true,"name":` + jsonString(name) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil
case server.RESP:
data, err := resp.StringValue(name).MarshalRESP()
if err != nil {
return "", err
}
return string(data), nil
}
case "setname":
if len(msg.Values) != 3 {
return "", errInvalidNumberOfArguments
}
name := msg.Values[2].String()
for i := 0; i < len(name); i++ {
if name[i] < '!' || name[i] > '~' {
return "", errors.New("Client names cannot contain spaces, newlines or special characters.")
}
}
if cc, ok := c.conns[conn]; ok {
cc.name = name
}
switch msg.OutputType {
case server.JSON:
return `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil
case server.RESP:
return "+OK\r\n", nil
}
case "kill":
if len(msg.Values) < 3 {
return "", errInvalidNumberOfArguments
}
var useAddr bool
var addr string
var useID bool
var id string
for i := 2; i < len(msg.Values); i++ {
arg := msg.Values[i].String()
if strings.Contains(arg, ":") {
addr = arg
useAddr = true
break
}
switch strings.ToLower(arg) {
default:
return "", errors.New("No such client")
case "addr":
i++
if i == len(msg.Values) {
return "", errors.New("syntax error")
}
addr = msg.Values[i].String()
useAddr = true
case "id":
i++
if i == len(msg.Values) {
return "", errors.New("syntax error")
}
id = msg.Values[i].String()
useID = true
}
}
var cclose *clientConn
for _, cc := range c.conns {
if useID && fmt.Sprintf("%d", cc.id) == id {
cclose = cc
break
} else if useAddr && cc.conn.RemoteAddr().String() == addr {
cclose = cc
break
}
}
if cclose == nil {
return "", errors.New("No such client")
}
var res string
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:
res = "+OK\r\n"
}
if cclose.conn == conn {
// closing self, return response now
cclose.conn.Write([]byte(res))
}
cclose.conn.Close()
return res, nil
}
return "", errors.New("invalid output type")
}
/*
func (c *Controller) cmdClientList(msg *server.Message) (string, error) {
var ok bool
var key string
if vs, key, ok = tokenval(vs); !ok || key == "" {
return "", errInvalidNumberOfArguments
}
col := c.getCol(key)
if col == nil {
if msg.OutputType == server.RESP {
return "+none\r\n", nil
}
return "", errKeyNotFound
}
typ := "hash"
switch msg.OutputType {
case server.JSON:
return `{"ok":true,"type":` + string(typ) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}", nil
case server.RESP:
return "+" + typ + "\r\n", nil
}
return "", nil
}
*/

View File

@ -57,6 +57,14 @@ func (col *collectionT) Less(item btree.Item, ctx interface{}) bool {
return col.Key < item.(*collectionT).Key return col.Key < item.(*collectionT).Key
} }
type clientConn struct {
id uint64
name string
opened time.Time
last time.Time
conn *server.Conn
}
// Controller is a tile38 controller // Controller is a tile38 controller
type Controller struct { type Controller struct {
mu sync.RWMutex mu sync.RWMutex
@ -82,7 +90,7 @@ type Controller struct {
hookcols map[string]map[string]*Hook // col key hookcols map[string]map[string]*Hook // col key
aofconnM map[net.Conn]bool aofconnM map[net.Conn]bool
expires map[string]map[string]time.Time expires map[string]map[string]time.Time
conns map[*server.Conn]bool conns map[*server.Conn]*clientConn
started time.Time started time.Time
http bool http bool
@ -121,7 +129,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
aofconnM: make(map[net.Conn]bool), aofconnM: make(map[net.Conn]bool),
expires: make(map[string]map[string]time.Time), expires: make(map[string]map[string]time.Time),
started: time.Now(), started: time.Now(),
conns: make(map[*server.Conn]bool), conns: make(map[*server.Conn]*clientConn),
epc: endpoint.NewEndpointManager(), epc: endpoint.NewEndpointManager(),
http: http, http: http,
} }
@ -190,6 +198,9 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
}() }()
handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error {
c.mu.Lock() c.mu.Lock()
if cc, ok := c.conns[conn]; ok {
cc.last = time.Now()
}
c.statsTotalCommands++ c.statsTotalCommands++
c.mu.Unlock() c.mu.Unlock()
err := c.handleInputCommand(conn, msg, w) err := c.handleInputCommand(conn, msg, w)
@ -215,9 +226,15 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
c.mu.RUnlock() c.mu.RUnlock()
return is return is
} }
var clientId uint64
opened := func(conn *server.Conn) { opened := func(conn *server.Conn) {
c.mu.Lock() c.mu.Lock()
c.conns[conn] = true clientId++
c.conns[conn] = &clientConn{
id: clientId,
opened: time.Now(),
conn: conn,
}
c.statsTotalConns++ c.statsTotalConns++
c.mu.Unlock() c.mu.Unlock()
} }
@ -479,9 +496,12 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message,
case "aofshrink": case "aofshrink":
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
case "client":
c.mu.Lock()
defer c.mu.Unlock()
} }
res, d, err := c.command(msg, w) res, d, err := c.command(msg, w, conn)
if err != nil { if err != nil {
if err.Error() == "going live" { if err.Error() == "going live" {
return err return err
@ -522,7 +542,11 @@ func (c *Controller) reset() {
c.cols = btree.New(16, 0) c.cols = btree.New(16, 0)
} }
func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d commandDetailsT, err error) { func (c *Controller) command(
msg *server.Message, w io.Writer, conn *server.Conn,
) (
res string, d commandDetailsT, err error,
) {
switch msg.Command { switch msg.Command {
default: default:
err = fmt.Errorf("unknown command '%s'", msg.Values[0]) err = fmt.Errorf("unknown command '%s'", msg.Values[0])
@ -624,8 +648,10 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co
msg.Values[1] = resp.StringValue(command) msg.Values[1] = resp.StringValue(command)
msg.Values = msg.Values[1:] msg.Values = msg.Values[1:]
msg.Command = strings.ToLower(command) msg.Command = strings.ToLower(command)
return c.command(msg, w) return c.command(msg, w, conn)
} }
case "client":
res, err = c.cmdClient(msg, conn)
} }
return return
} }

View File

@ -84,7 +84,7 @@ func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error)
nmsg.Values = values nmsg.Values = values
nmsg.Command = strings.ToLower(values[0].String()) nmsg.Command = strings.ToLower(values[0].String())
var d commandDetailsT var d commandDetailsT
_, d, err = c.command(nmsg, nil) _, d, err = c.command(nmsg, nil, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -126,7 +126,7 @@ func (c *Controller) followHandleCommand(values []resp.Value, followc uint64, w
Command: strings.ToLower(values[0].String()), Command: strings.ToLower(values[0].String()),
Values: values, Values: values,
} }
_, d, err := c.command(msg, nil) _, d, err := c.command(msg, nil, nil)
if err != nil { if err != nil {
if commandErrIsFatal(err) { if commandErrIsFatal(err) {
return c.aofsz, err return c.aofsz, err