From c81883a629502ba9742c72680779528f9344f440 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Fri, 26 Aug 2016 12:54:19 -0700 Subject: [PATCH] info command --- controller/aofshrink.go | 5 +- controller/controller.go | 31 ++++++++- controller/expire.go | 3 + controller/server/server.go | 8 ++- controller/stats.go | 133 ++++++++++++++++++++++++++++++++++++ 5 files changed, 176 insertions(+), 4 deletions(-) diff --git a/controller/aofshrink.go b/controller/aofshrink.go index 04f56a7e..0fa38100 100644 --- a/controller/aofshrink.go +++ b/controller/aofshrink.go @@ -47,17 +47,20 @@ func (c *Controller) aofshrink() { }() var ferr error // stores the final error c.shrinking = true + c.currentShrinkStart = start endpos := int64(c.aofsz) // 1) Log the aofsize at start. Locked c.mu.Unlock() defer func() { c.mu.Lock() defer c.mu.Unlock() c.shrinking = false + c.lastShrinkDuration = time.Now().Sub(start) + c.currentShrinkStart = time.Time{} defer func() { if ferr != nil { log.Errorf("aof shrink failed: %s\n", ferr.Error()) } else { - log.Printf("aof shrink completed in %s", time.Now().Sub(start)) + log.Printf("aof shrink completed in %s", c.lastShrinkDuration) } }() if ferr != nil { diff --git a/controller/controller.go b/controller/controller.go index 3778244e..02e54304 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -70,6 +70,15 @@ type Controller struct { hookcols map[string]map[string]*Hook // col key aofconnM map[net.Conn]bool expires map[string]map[string]time.Time + conns map[*server.Conn]bool + started time.Time + + statsTotalConns int + statsTotalCommands int + statsExpired int + + lastShrinkDuration time.Duration + currentShrinkStart time.Time stopBackgroundExpiring bool stopWatchingMemory bool @@ -92,6 +101,8 @@ func ListenAndServe(host string, port int, dir string) error { hookcols: make(map[string]map[string]*Hook), aofconnM: make(map[net.Conn]bool), expires: make(map[string]map[string]time.Time), + started: time.Now(), + conns: make(map[*server.Conn]bool), } if err := os.MkdirAll(dir, 0700); err != nil { return err @@ -130,6 +141,9 @@ func ListenAndServe(host string, port int, dir string) error { c.mu.Unlock() }() handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { + c.mu.Lock() + c.statsTotalCommands++ + c.mu.Unlock() err := c.handleInputCommand(conn, msg, w) if err != nil { if err.Error() == "going live" { @@ -153,7 +167,18 @@ func ListenAndServe(host string, port int, dir string) error { c.mu.RUnlock() return is } - return server.ListenAndServe(host, port, protected, handler) + opened := func(conn *server.Conn) { + c.mu.Lock() + c.conns[conn] = true + c.statsTotalConns++ + c.mu.Unlock() + } + closed := func(conn *server.Conn) { + c.mu.Lock() + delete(c.conns, conn) + c.mu.Unlock() + } + return server.ListenAndServe(host, port, protected, handler, opened, closed) } func (c *Controller) watchMemory() { @@ -338,7 +363,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, msg *server.Message, if c.config.ReadOnly { return writeErr(errors.New("read only")) } - case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search", "ttl", "bounds": + case "get", "keys", "scan", "nearby", "within", "intersects", "hooks", "search", "ttl", "bounds", "server", "info": // read operations c.mu.RLock() defer c.mu.RUnlock() @@ -439,6 +464,8 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co res, err = c.cmdStats(msg) case "server": res, err = c.cmdServer(msg) + case "info": + res, err = c.cmdInfo(msg) case "scan": res, err = c.cmdScan(msg) case "nearby": diff --git a/controller/expire.go b/controller/expire.go index d1fb3f63..bfa6bad4 100644 --- a/controller/expire.go +++ b/controller/expire.go @@ -68,6 +68,9 @@ func (c *Controller) backgroundExpiring() { for id, at := range m { if now.After(at) { // issue a DEL command + c.mu.Lock() + c.statsExpired++ + c.mu.Unlock() msg := &server.Message{} msg.Values = resp.MultiBulkValue("del", key, id).Array() msg.Command = "del" diff --git a/controller/server/server.go b/controller/server/server.go index e4d82117..48e93621 100644 --- a/controller/server/server.go +++ b/controller/server/server.go @@ -52,6 +52,8 @@ func ListenAndServe( host string, port int, protected func() bool, handler func(conn *Conn, msg *Message, rd *AnyReaderWriter, w io.Writer, websocket bool) error, + opened func(conn *Conn), + closed func(conn *Conn), ) error { ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) if err != nil { @@ -64,7 +66,7 @@ func ListenAndServe( log.Error(err) continue } - go handleConn(&Conn{Conn: conn}, protected, handler) + go handleConn(&Conn{Conn: conn}, protected, handler, opened, closed) } } @@ -79,7 +81,11 @@ func handleConn( conn *Conn, protected func() bool, handler func(conn *Conn, msg *Message, rd *AnyReaderWriter, w io.Writer, websocket bool) error, + opened func(conn *Conn), + closed func(conn *Conn), ) { + opened(conn) + defer closed(conn) addr := conn.RemoteAddr().String() if core.ShowDebugMessages { log.Debugf("opened connection: %s", addr) diff --git a/controller/stats.go b/controller/stats.go index c2d8f33c..c69af69c 100644 --- a/controller/stats.go +++ b/controller/stats.go @@ -1,15 +1,19 @@ package controller import ( + "bytes" "encoding/json" "fmt" "runtime" "sort" + "strings" + "syscall" "time" "github.com/tidwall/btree" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/server" + "github.com/tidwall/tile38/core" ) func (c *Controller) cmdStats(msg *server.Message) (res string, err error) { @@ -127,7 +131,136 @@ func (c *Controller) cmdServer(msg *server.Message) (res string, err error) { return res, nil } +func (c *Controller) writeInfoServer(w *bytes.Buffer) { + fmt.Fprintf(w, "redis_version:%s\r\n", core.Version) //Version of the Redis server + fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", time.Now().Sub(c.started)/time.Second) //Number of seconds since Redis server start +} +func (c *Controller) writeInfoClients(w *bytes.Buffer) { + fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns)) // Number of client connections (excluding connections from slaves) +} +func (c *Controller) writeInfoMemory(w *bytes.Buffer) { + c.mu.RUnlock() + runtime.GC() + c.mu.RLock() + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + fmt.Fprintf(w, "used_memory:%d\r\n", mem.Alloc) // total number of bytes allocated by Redis using its allocator (either standard libc, jemalloc, or an alternative allocator such as tcmalloc +} +func boolInt(t bool) int { + if t { + return 1 + } + return 0 +} +func (c *Controller) writeInfoPersistence(w *bytes.Buffer) { + fmt.Fprintf(w, "aof_enabled:1\r\n") + fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(c.shrinking)) // Flag indicating a AOF rewrite operation is on-going + fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", c.lastShrinkDuration/time.Second) // Duration of the last AOF rewrite operation in seconds + if c.currentShrinkStart.IsZero() { + fmt.Fprintf(w, "aof_current_rewrite_time_sec:0\r\n") // Duration of the on-going AOF rewrite operation if any + } else { + fmt.Fprintf(w, "aof_current_rewrite_time_sec:%d\r\n", time.Now().Sub(c.currentShrinkStart)/time.Second) // Duration of the on-going AOF rewrite operation if any + } +} +func (c *Controller) writeInfoStats(w *bytes.Buffer) { + fmt.Fprintf(w, "total_connections_received:%d\r\n", c.statsTotalConns) // Total number of connections accepted by the server + fmt.Fprintf(w, "total_commands_processed:%d\r\n", c.statsTotalCommands) // Total number of commands processed by the server + fmt.Fprintf(w, "expired_keys:%d\r\n", c.statsExpired) // Total number of key expiration events +} +func (c *Controller) writeInfoReplication(w *bytes.Buffer) { + fmt.Fprintf(w, "connected_slaves:%d\r\n", len(c.aofconnM)) // Number of connected slaves +} +func (c *Controller) writeInfoCPU(w *bytes.Buffer) { + var selfRu syscall.Rusage + var cRu syscall.Rusage + syscall.Getrusage(syscall.RUSAGE_SELF, &selfRu) + syscall.Getrusage(syscall.RUSAGE_CHILDREN, &cRu) + fmt.Fprintf(w, + "used_cpu_sys:%.2f\r\n"+ + "used_cpu_user:%.2f\r\n"+ + "used_cpu_sys_children:%.2f\r\n"+ + "used_cpu_user_children:%.2f\r\n", + float64(selfRu.Stime.Sec)+float64(selfRu.Stime.Usec/1000000), + float64(selfRu.Utime.Sec)+float64(selfRu.Utime.Usec/1000000), + float64(cRu.Stime.Sec)+float64(cRu.Stime.Usec/1000000), + float64(cRu.Utime.Sec)+float64(cRu.Utime.Usec/1000000), + ) +} +func (c *Controller) writeInfoCluster(w *bytes.Buffer) { + fmt.Fprintf(w, "cluster_enabled:0\r\n") +} + +func (c *Controller) cmdInfo(msg *server.Message) (res string, err error) { + start := time.Now() + sections := []string{"server", "clients", "memory", "persistence", "stats", "replication", "cpu", "cluster", "keyspace"} + switch len(msg.Values) { + default: + return "", errInvalidNumberOfArguments + case 1: + case 2: + section := strings.ToLower(msg.Values[1].String()) + switch section { + default: + sections = []string{section} + case "all": + sections = []string{"server", "clients", "memory", "persistence", "stats", "replication", "cpu", "commandstats", "cluster", "keyspace"} + case "default": + } + } + + w := &bytes.Buffer{} + for i, section := range sections { + if i > 0 { + w.WriteString("\r\n") + } + switch strings.ToLower(section) { + default: + continue + case "server": + w.WriteString("# Server\r\n") + c.writeInfoServer(w) + case "clients": + w.WriteString("# Clients\r\n") + c.writeInfoClients(w) + case "memory": + w.WriteString("# Memory\r\n") + c.writeInfoMemory(w) + case "persistence": + w.WriteString("# Persistence\r\n") + c.writeInfoPersistence(w) + case "stats": + w.WriteString("# Stats\r\n") + c.writeInfoStats(w) + case "replication": + w.WriteString("# Replication\r\n") + c.writeInfoReplication(w) + case "cpu": + w.WriteString("# CPU\r\n") + c.writeInfoCPU(w) + case "cluster": + w.WriteString("# Cluster\r\n") + c.writeInfoCluster(w) + } + } + + switch msg.OutputType { + case server.JSON: + data, err := json.Marshal(w.String()) + if err != nil { + return "", err + } + res = `{"ok":true,"stats":` + string(data) + `,"elapsed":"` + time.Now().Sub(start).String() + "\"}" + case server.RESP: + data, err := resp.StringValue(w.String()).MarshalRESP() + if err != nil { + return "", err + } + res = string(data) + } + + return res, nil +} func respValuesSimpleMap(m map[string]interface{}) []resp.Value { var keys []string for key := range m {