tile38/internal/server/client.go

213 lines
5.4 KiB
Go
Raw Normal View History

package server
import (
2019-01-15 01:29:29 +03:00
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"github.com/tidwall/resp"
)
// Client is an remote connection into to Tile38
type Client struct {
2019-04-26 21:50:49 +03:00
id int // unique id
replPort int // the known replication port for follower connections
replAddr string // the known replication addr for follower connections
2019-04-26 21:50:49 +03:00
authd bool // client has been authenticated
outputType Type // Null, JSON, or RESP
remoteAddr string // original remote address
in InputStream // input stream
pr PipelineReader // command reader
out []byte // output write buffer
goLiveErr error // error type used for going line
goLiveMsg *Message // last message for go live
mu sync.Mutex // guard
conn io.ReadWriteCloser // out-of-loop connection.
name string // optional defined name
opened time.Time // when the client was created/opened, unix nano
last time.Time // last client request/response, unix nano
2022-09-24 23:41:36 +03:00
closer io.Closer // used to close the connection
}
// Write ...
func (client *Client) Write(b []byte) (n int, err error) {
client.out = append(client.out, b...)
return len(b), nil
}
2022-09-24 16:22:58 +03:00
// CLIENT (LIST | KILL | GETNAME | SETNAME)
2022-09-25 00:28:47 +03:00
func (s *Server) cmdCLIENT(msg *Message, client *Client) (resp.Value, error) {
start := time.Now()
2022-09-25 00:28:47 +03:00
args := msg.Args
2022-09-24 23:41:36 +03:00
if len(args) == 1 {
return retrerr(errInvalidNumberOfArguments)
}
2022-09-24 23:41:36 +03:00
switch strings.ToLower(args[1]) {
case "list":
2022-09-24 23:41:36 +03:00
if len(args) != 2 {
return retrerr(errInvalidNumberOfArguments)
}
var list []*Client
s.connsmu.RLock()
for _, cc := range s.conns {
list = append(list, cc)
}
s.connsmu.RUnlock()
2022-09-24 23:41:36 +03:00
sort.Slice(list, func(i, j int) bool {
return list[i].id < list[j].id
})
now := time.Now()
var buf []byte
for _, client := range list {
client.mu.Lock()
buf = append(buf,
fmt.Sprintf("id=%d addr=%s name=%s age=%d idle=%d\n",
client.id,
client.remoteAddr,
client.name,
now.Sub(client.opened)/time.Second,
now.Sub(client.last)/time.Second,
)...,
)
client.mu.Unlock()
}
2022-09-25 00:28:47 +03:00
if msg.OutputType == JSON {
2019-01-15 01:29:29 +03:00
// Create a map of all key/value info fields
var cmap []map[string]interface{}
clients := strings.Split(string(buf), "\n")
for _, client := range clients {
client = strings.TrimSpace(client)
m := make(map[string]interface{})
var hasFields bool
for _, kv := range strings.Split(client, " ") {
kv = strings.TrimSpace(kv)
if split := strings.SplitN(kv, "=", 2); len(split) == 2 {
hasFields = true
m[split[0]] = tryParseType(split[1])
}
}
if hasFields {
cmap = append(cmap, m)
2019-01-15 01:29:29 +03:00
}
}
2022-09-24 16:22:58 +03:00
data, _ := json.Marshal(cmap)
return resp.StringValue(`{"ok":true,"list":` + string(data) +
`,"elapsed":"` + time.Since(start).String() + "\"}"), nil
}
2022-09-24 16:22:58 +03:00
return resp.BytesValue(buf), nil
case "getname":
2022-09-24 23:41:36 +03:00
if len(args) != 2 {
return retrerr(errInvalidNumberOfArguments)
}
2022-09-24 16:22:58 +03:00
client.mu.Lock()
name := client.name
client.mu.Unlock()
2022-09-25 00:28:47 +03:00
if msg.OutputType == JSON {
2022-09-24 16:22:58 +03:00
return resp.StringValue(`{"ok":true,"name":` + jsonString(name) +
`,"elapsed":"` + time.Since(start).String() + "\"}"), nil
}
2022-09-24 16:22:58 +03:00
return resp.StringValue(name), nil
case "setname":
2022-09-24 23:41:36 +03:00
if len(args) != 3 {
return retrerr(errInvalidNumberOfArguments)
}
2022-09-25 00:28:47 +03:00
name := msg.Args[2]
for i := 0; i < len(name); i++ {
if name[i] < '!' || name[i] > '~' {
2022-09-24 23:41:36 +03:00
return retrerr(clientErrorf(
"Client names cannot contain spaces, newlines or special characters.",
2022-09-24 23:41:36 +03:00
))
}
}
client.mu.Lock()
client.name = name
client.mu.Unlock()
2022-09-25 00:28:47 +03:00
if msg.OutputType == JSON {
2022-09-24 23:41:36 +03:00
return resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}"), nil
}
2022-09-24 23:41:36 +03:00
return resp.SimpleStringValue("OK"), nil
case "kill":
2022-09-24 23:41:36 +03:00
if len(args) < 3 {
return retrerr(errInvalidNumberOfArguments)
}
var useAddr bool
var addr string
var useID bool
var id string
2022-09-24 23:41:36 +03:00
for i := 2; i < len(args); i++ {
if useAddr || useID {
return retrerr(errInvalidNumberOfArguments)
}
arg := args[i]
if strings.Contains(arg, ":") {
addr = arg
useAddr = true
2022-09-24 23:41:36 +03:00
} else {
switch strings.ToLower(arg) {
case "addr":
i++
if i == len(args) {
return retrerr(errInvalidNumberOfArguments)
}
addr = args[i]
useAddr = true
case "id":
i++
if i == len(args) {
return retrerr(errInvalidNumberOfArguments)
}
id = args[i]
useID = true
default:
return retrerr(clientErrorf("No such client"))
}
}
}
2022-09-24 23:41:36 +03:00
var closing []io.Closer
s.connsmu.RLock()
for _, cc := range s.conns {
if useID && fmt.Sprintf("%d", cc.id) == id {
2022-09-24 23:41:36 +03:00
if cc.closer != nil {
closing = append(closing, cc.closer)
}
} else if useAddr {
if cc.remoteAddr == addr {
if cc.closer != nil {
closing = append(closing, cc.closer)
}
}
}
}
s.connsmu.RUnlock()
2022-09-24 23:41:36 +03:00
if len(closing) == 0 {
return retrerr(clientErrorf("No such client"))
}
2022-09-24 23:41:36 +03:00
// go func() {
// close the connections behind the scene
for _, closer := range closing {
closer.Close()
}
2022-09-24 23:41:36 +03:00
// }()
2022-09-25 00:28:47 +03:00
if msg.OutputType == JSON {
2022-09-24 23:41:36 +03:00
return resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}"), nil
}
2022-09-24 23:41:36 +03:00
return resp.SimpleStringValue("OK"), nil
default:
return retrerr(clientErrorf(
"Syntax error, try CLIENT (LIST | KILL | GETNAME | SETNAME)",
))
}
}