tile38/internal/server/stats.go

591 lines
17 KiB
Go
Raw Permalink Normal View History

package server
2016-03-05 02:08:16 +03:00
import (
2016-08-26 22:54:19 +03:00
"bytes"
2016-03-05 02:08:16 +03:00
"encoding/json"
"errors"
2016-03-05 02:08:16 +03:00
"fmt"
2016-12-23 00:52:37 +03:00
"os"
2016-03-05 02:08:16 +03:00
"runtime"
2016-03-29 01:50:18 +03:00
"sort"
2019-01-15 01:29:29 +03:00
"strconv"
2016-08-26 22:54:19 +03:00
"strings"
2019-09-04 02:35:42 +03:00
"sync"
2016-03-05 02:08:16 +03:00
"time"
2022-07-11 18:39:40 +03:00
"github.com/tidwall/buntdb"
2016-03-29 01:50:18 +03:00
"github.com/tidwall/resp"
"github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/internal/collection"
2016-03-05 02:08:16 +03:00
)
2019-09-04 02:35:42 +03:00
var memStats runtime.MemStats
var memStatsMu sync.Mutex
var memStatsBG bool
// ReadMemStats returns the latest memstats. It provides an instant response.
func readMemStats() runtime.MemStats {
memStatsMu.Lock()
if !memStatsBG {
runtime.ReadMemStats(&memStats)
go func() {
2019-09-04 22:47:30 +03:00
var ms runtime.MemStats
2019-09-04 02:35:42 +03:00
for {
2019-09-04 22:47:30 +03:00
runtime.ReadMemStats(&ms)
2019-09-04 02:35:42 +03:00
memStatsMu.Lock()
2019-09-04 22:47:30 +03:00
memStats = ms
2019-09-04 02:35:42 +03:00
memStatsMu.Unlock()
time.Sleep(time.Second / 5)
}
}()
memStatsBG = true
}
ms := memStats
memStatsMu.Unlock()
return ms
}
2022-09-24 01:29:46 +03:00
// STATS key [key...]
func (s *Server) cmdSTATS(msg *Message) (resp.Value, error) {
2016-03-05 02:08:16 +03:00
start := time.Now()
2017-10-05 18:20:40 +03:00
2022-09-24 01:29:46 +03:00
// >> Args
args := msg.Args
if len(args) < 2 {
return retrerr(errInvalidNumberOfArguments)
2016-03-05 02:08:16 +03:00
}
2022-09-24 01:29:46 +03:00
// >> Operation
2016-03-29 02:11:29 +03:00
var vals []resp.Value
2022-09-24 01:29:46 +03:00
var ms = []map[string]interface{}{}
for i := 1; i < len(args); i++ {
key := args[i]
col, _ := s.cols.Get(key)
2016-03-05 02:08:16 +03:00
if col != nil {
m := make(map[string]interface{})
m["num_points"] = col.PointCount()
2016-03-05 02:08:16 +03:00
m["in_memory_size"] = col.TotalWeight()
2016-07-13 07:59:36 +03:00
m["num_objects"] = col.Count()
m["num_strings"] = col.StringCount()
2016-03-29 02:11:29 +03:00
switch msg.OutputType {
case JSON:
2016-03-29 02:11:29 +03:00
ms = append(ms, m)
case RESP:
2016-03-29 02:11:29 +03:00
vals = append(vals, resp.ArrayValue(respValuesSimpleMap(m)))
}
2016-03-05 02:08:16 +03:00
} else {
2016-03-29 02:11:29 +03:00
switch msg.OutputType {
case JSON:
2016-03-29 02:11:29 +03:00
ms = append(ms, nil)
case RESP:
2016-03-29 02:11:29 +03:00
vals = append(vals, resp.NullValue())
}
2016-03-05 02:08:16 +03:00
}
}
2016-03-29 03:38:21 +03:00
2022-09-24 01:29:46 +03:00
// >> Response
if msg.OutputType == JSON {
data, _ := json.Marshal(ms)
return resp.StringValue(`{"ok":true,"stats":` + string(data) +
`,"elapsed":"` + time.Since(start).String() + "\"}"), nil
2016-03-05 02:08:16 +03:00
}
2022-09-24 01:29:46 +03:00
return resp.ArrayValue(vals), nil
2016-03-05 02:08:16 +03:00
}
2022-09-24 03:34:09 +03:00
// HEALTHZ
func (s *Server) cmdHEALTHZ(msg *Message) (resp.Value, error) {
start := time.Now()
2022-09-24 03:34:09 +03:00
// >> Args
args := msg.Args
if len(args) != 1 {
return retrerr(errInvalidNumberOfArguments)
}
// >> Operation
2021-05-26 02:42:26 +03:00
if s.config.followHost() != "" {
m := make(map[string]interface{})
s.basicStats(m)
2021-06-09 20:55:07 +03:00
if fmt.Sprintf("%v", m["caught_up"]) != "true" {
2022-09-24 03:34:09 +03:00
return retrerr(errors.New("not caught up"))
2021-05-26 02:42:26 +03:00
}
}
2022-09-24 03:34:09 +03:00
// >> Response
if msg.OutputType == JSON {
return resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}"), nil
}
2022-09-24 03:34:09 +03:00
return resp.SimpleStringValue("OK"), nil
}
2022-09-24 03:54:49 +03:00
// SERVER [ext]
func (s *Server) cmdSERVER(msg *Message) (resp.Value, error) {
2016-03-05 02:08:16 +03:00
start := time.Now()
2022-09-24 03:54:49 +03:00
// >> Args
args := msg.Args
var ext bool
for i := 1; i < len(args); i++ {
switch strings.ToLower(args[i]) {
case "ext":
ext = true
default:
return retrerr(errInvalidArgument(args[i]))
}
}
// >> Operation
m := make(map[string]interface{})
2017-10-05 18:20:40 +03:00
2022-09-24 03:54:49 +03:00
if ext {
s.extStats(m)
} else {
s.basicStats(m)
2016-03-05 02:08:16 +03:00
}
2022-09-24 03:54:49 +03:00
// >> Response
if msg.OutputType == JSON {
data, _ := json.Marshal(m)
return resp.StringValue(`{"ok":true,"stats":` + string(data) +
`,"elapsed":"` + time.Since(start).String() + "\"}"), nil
}
2022-09-24 03:54:49 +03:00
return resp.ArrayValue(respValuesSimpleMap(m)), nil
}
// basicStats populates the passed map with basic system/go/tile38 statistics
func (s *Server) basicStats(m map[string]interface{}) {
m["id"] = s.config.serverID()
if s.config.followHost() != "" {
m["following"] = fmt.Sprintf("%s:%d", s.config.followHost(),
s.config.followPort())
m["caught_up"] = s.fcup
m["caught_up_once"] = s.fcuponce
2016-03-05 02:08:16 +03:00
}
m["http_transport"] = s.http
2016-12-23 00:52:37 +03:00
m["pid"] = os.Getpid()
m["aof_size"] = s.aofsz
m["num_collections"] = s.cols.Len()
m["num_hooks"] = s.hooks.Len()
2016-03-05 02:08:16 +03:00
sz := 0
s.cols.Scan(func(key string, col *collection.Collection) bool {
2016-03-05 02:08:16 +03:00
sz += col.TotalWeight()
return true
})
m["in_memory_size"] = sz
points := 0
objects := 0
2022-07-11 18:39:40 +03:00
nstrings := 0
s.cols.Scan(func(key string, col *collection.Collection) bool {
2016-03-05 02:08:16 +03:00
points += col.PointCount()
2016-07-13 07:59:36 +03:00
objects += col.Count()
2022-07-11 18:39:40 +03:00
nstrings += col.StringCount()
2016-03-05 02:08:16 +03:00
return true
})
m["num_points"] = points
m["num_objects"] = objects
2022-07-11 18:39:40 +03:00
m["num_strings"] = nstrings
2019-09-04 02:35:42 +03:00
mem := readMemStats()
2016-03-05 02:08:16 +03:00
avgsz := 0
if points != 0 {
avgsz = int(mem.HeapAlloc) / points
}
2016-10-25 01:58:30 +03:00
m["mem_alloc"] = mem.Alloc
2016-03-05 02:08:16 +03:00
m["heap_size"] = mem.HeapAlloc
2016-12-23 00:52:37 +03:00
m["heap_released"] = mem.HeapReleased
m["max_heap_size"] = s.config.maxMemory()
2016-03-05 02:08:16 +03:00
m["avg_item_size"] = avgsz
m["version"] = core.Version
2016-03-05 02:08:16 +03:00
m["pointer_size"] = (32 << uintptr(uint64(^uintptr(0))>>63)) / 8
m["read_only"] = s.config.readOnly()
2018-03-10 03:24:23 +03:00
m["cpus"] = runtime.NumCPU()
n, _ := runtime.ThreadCreateProfile(nil)
m["threads"] = float64(n)
2022-07-11 18:39:40 +03:00
var nevents int
s.qdb.View(func(tx *buntdb.Tx) error {
// All entries in the buntdb log are events, except for one, which
// is "hook:idx".
nevents, _ = tx.Len()
nevents -= 1 // Ignore the "hook:idx"
if nevents < 0 {
nevents = 0
}
return nil
})
m["pending_events"] = nevents
}
2016-03-05 02:08:16 +03:00
// extStats populates the passed map with extended system/go/tile38 statistics
func (s *Server) extStats(m map[string]interface{}) {
n, _ := runtime.ThreadCreateProfile(nil)
2019-09-04 02:35:42 +03:00
mem := readMemStats()
// Go/Memory Stats
// Number of goroutines that currently exist
m["go_goroutines"] = runtime.NumGoroutine()
// Number of OS threads created
m["go_threads"] = float64(n)
// A summary of the GC invocation durations
m["go_version"] = runtime.Version()
// Number of bytes allocated and still in use
m["alloc_bytes"] = mem.Alloc
// Total number of bytes allocated, even if freed
m["alloc_bytes_total"] = mem.TotalAlloc
// Number of CPUS available on the system
m["sys_cpus"] = runtime.NumCPU()
// Number of bytes obtained from system
m["sys_bytes"] = mem.Sys
// Total number of pointer lookups
m["lookups_total"] = mem.Lookups
// Total number of mallocs
m["mallocs_total"] = mem.Mallocs
// Total number of frees
m["frees_total"] = mem.Frees
// Number of heap bytes allocated and still in use
m["heap_alloc_bytes"] = mem.HeapAlloc
// Number of heap bytes obtained from system
m["heap_sys_bytes"] = mem.HeapSys
// Number of heap bytes waiting to be used
m["heap_idle_bytes"] = mem.HeapIdle
// Number of heap bytes that are in use
m["heap_inuse_bytes"] = mem.HeapInuse
// Number of heap bytes released to OS
m["heap_released_bytes"] = mem.HeapReleased
// Number of allocated objects
m["heap_objects"] = mem.HeapObjects
// Number of bytes in use by the stack allocator
m["stack_inuse_bytes"] = mem.StackInuse
// Number of bytes obtained from system for stack allocator
m["stack_sys_bytes"] = mem.StackSys
// Number of bytes in use by mspan structures
m["mspan_inuse_bytes"] = mem.MSpanInuse
// Number of bytes used for mspan structures obtained from system
m["mspan_sys_bytes"] = mem.MSpanSys
// Number of bytes in use by mcache structures
m["mcache_inuse_bytes"] = mem.MCacheInuse
// Number of bytes used for mcache structures obtained from system
m["mcache_sys_bytes"] = mem.MCacheSys
// Number of bytes used by the profiling bucket hash table
m["buck_hash_sys_bytes"] = mem.BuckHashSys
// Number of bytes used for garbage collection system metadata
m["gc_sys_bytes"] = mem.GCSys
// Number of bytes used for other system allocations
m["other_sys_bytes"] = mem.OtherSys
// Number of heap bytes when next garbage collection will take place
m["next_gc_bytes"] = mem.NextGC
// Number of seconds since 1970 of last garbage collection
m["last_gc_time_seconds"] = float64(mem.LastGC) / 1e9
// The fraction of this program's available CPU time used by the GC since
// the program started
m["gc_cpu_fraction"] = mem.GCCPUFraction
// Tile38 Stats
// ID of the server
m["tile38_id"] = s.config.serverID()
// The process ID of the server
m["tile38_pid"] = os.Getpid()
// Version of Tile38 running
m["tile38_version"] = core.Version
// Maximum heap size allowed
m["tile38_max_heap_size"] = s.config.maxMemory()
// Type of instance running
if s.config.followHost() == "" {
m["tile38_type"] = "leader"
} else {
m["tile38_type"] = "follower"
2016-03-05 02:08:16 +03:00
}
// Whether or not the server is read-only
m["tile38_read_only"] = s.config.readOnly()
// Size of pointer
m["tile38_pointer_size"] = (32 << uintptr(uint64(^uintptr(0))>>63)) / 8
// Uptime of the Tile38 server in seconds
m["tile38_uptime_in_seconds"] = time.Since(s.started).Seconds()
// Number of currently connected Tile38 clients
s.connsmu.RLock()
m["tile38_connected_clients"] = len(s.conns)
s.connsmu.RUnlock()
// Whether or not a cluster is enabled
m["tile38_cluster_enabled"] = false
// Whether or not the Tile38 AOF is enabled
m["tile38_aof_enabled"] = s.opts.AppendOnly
// Whether or not an AOF shrink is currently in progress
m["tile38_aof_rewrite_in_progress"] = s.shrinking
// Length of time the last AOF shrink took
m["tile38_aof_last_rewrite_time_sec"] = s.lastShrinkDuration.Load() / int64(time.Second)
// Duration of the on-going AOF rewrite operation if any
var currentShrinkStart time.Time
if currentShrinkStart.IsZero() {
m["tile38_aof_current_rewrite_time_sec"] = 0
} else {
m["tile38_aof_current_rewrite_time_sec"] = time.Since(currentShrinkStart).Seconds()
}
// Total size of the AOF in bytes
m["tile38_aof_size"] = s.aofsz
// Whether or no the HTTP transport is being served
m["tile38_http_transport"] = s.http
// Number of connections accepted by the server
m["tile38_total_connections_received"] = s.statsTotalConns.Load()
// Number of commands processed by the server
m["tile38_total_commands_processed"] = s.statsTotalCommands.Load()
2019-03-14 21:23:23 +03:00
// Number of webhook messages sent by server
m["tile38_total_messages_sent"] = s.statsTotalMsgsSent.Load()
// Number of key expiration events
m["tile38_expired_keys"] = s.statsExpired.Load()
// Number of connected slaves
m["tile38_connected_slaves"] = len(s.aofconnM)
points := 0
objects := 0
strings := 0
s.cols.Scan(func(key string, col *collection.Collection) bool {
points += col.PointCount()
objects += col.Count()
strings += col.StringCount()
return true
})
// Number of points in the database
m["tile38_num_points"] = points
// Number of objects in the database
m["tile38_num_objects"] = objects
// Number of string in the database
m["tile38_num_strings"] = strings
// Number of collections in the database
m["tile38_num_collections"] = s.cols.Len()
// Number of hooks in the database
m["tile38_num_hooks"] = s.hooks.Len()
// Number of hook groups in the database
m["tile38_num_hook_groups"] = s.groupHooks.Len()
// Number of object groups in the database
m["tile38_num_object_groups"] = s.groupObjects.Len()
avgsz := 0
if points != 0 {
avgsz = int(mem.HeapAlloc) / points
}
// Average point size in bytes
m["tile38_avg_point_size"] = avgsz
sz := 0
s.cols.Scan(func(key string, col *collection.Collection) bool {
sz += col.TotalWeight()
return true
})
// Total in memory size of all collections
m["tile38_in_memory_size"] = sz
2016-03-05 02:08:16 +03:00
}
2016-08-26 23:42:52 +03:00
func (s *Server) writeInfoServer(w *bytes.Buffer) {
2016-08-26 23:42:52 +03:00
fmt.Fprintf(w, "tile38_version:%s\r\n", core.Version)
fmt.Fprintf(w, "redis_version:%s\r\n", core.Version) // Version of the Redis server
fmt.Fprintf(w, "uptime_in_seconds:%d\r\n", int(time.Since(s.started).Seconds())) // Number of seconds since Redis server start
2016-08-26 22:54:19 +03:00
}
func (s *Server) writeInfoClients(w *bytes.Buffer) {
s.connsmu.RLock()
fmt.Fprintf(w, "connected_clients:%d\r\n", len(s.conns)) // Number of client connections (excluding connections from slaves)
s.connsmu.RUnlock()
2016-08-26 22:54:19 +03:00
}
func (s *Server) writeInfoMemory(w *bytes.Buffer) {
2019-09-04 02:35:42 +03:00
mem := readMemStats()
2016-08-26 22:54:19 +03:00
fmt.Fprintf(w, "used_memory:%d\r\n", mem.Alloc) // total number of bytes allocated by Redis using its allocator (either standard libc, jemalloc, or an alternative allocator such as tcmalloc
}
func boolInt(t bool) int {
if t {
return 1
}
return 0
}
func (s *Server) writeInfoPersistence(w *bytes.Buffer) {
fmt.Fprintf(w, "aof_enabled:%d\r\n", boolInt(s.opts.AppendOnly))
fmt.Fprintf(w, "aof_rewrite_in_progress:%d\r\n", boolInt(s.shrinking)) // Flag indicating a AOF rewrite operation is on-going
fmt.Fprintf(w, "aof_last_rewrite_time_sec:%d\r\n", s.lastShrinkDuration.Load()/int64(time.Second)) // Duration of the last AOF rewrite operation in seconds
2018-11-23 11:39:04 +03:00
var currentShrinkStart time.Time // c.currentShrinkStart.get()
2017-09-30 18:00:29 +03:00
if currentShrinkStart.IsZero() {
2016-08-26 22:54:19 +03:00
fmt.Fprintf(w, "aof_current_rewrite_time_sec:0\r\n") // Duration of the on-going AOF rewrite operation if any
} else {
fmt.Fprintf(w, "aof_current_rewrite_time_sec:%d\r\n", time.Since(currentShrinkStart)/time.Second) // Duration of the on-going AOF rewrite operation if any
2016-08-26 22:54:19 +03:00
}
}
func (s *Server) writeInfoStats(w *bytes.Buffer) {
fmt.Fprintf(w, "total_connections_received:%d\r\n", s.statsTotalConns.Load()) // Total number of connections accepted by the server
fmt.Fprintf(w, "total_commands_processed:%d\r\n", s.statsTotalCommands.Load()) // Total number of commands processed by the server
fmt.Fprintf(w, "total_messages_sent:%d\r\n", s.statsTotalMsgsSent.Load()) // Total number of commands processed by the server
fmt.Fprintf(w, "expired_keys:%d\r\n", s.statsExpired.Load()) // Total number of key expiration events
2016-08-26 22:54:19 +03:00
}
// writeInfoReplication writes all replication data to the 'info' response
func (s *Server) writeInfoReplication(w *bytes.Buffer) {
if s.config.followHost() != "" {
fmt.Fprintf(w, "role:slave\r\n")
fmt.Fprintf(w, "master_host:%s\r\n", s.config.followHost())
fmt.Fprintf(w, "master_port:%v\r\n", s.config.followPort())
if s.config.replicaPriority() >= 0 {
fmt.Fprintf(w, "slave_priority:%v\r\n", s.config.replicaPriority())
}
} else {
fmt.Fprintf(w, "role:master\r\n")
var i int
s.connsmu.RLock()
for _, cc := range s.conns {
if cc.replPort != 0 {
fmt.Fprintf(w, "slave%v:ip=%s,port=%v,state=online\r\n", i,
strings.Split(cc.remoteAddr, ":")[0], cc.replPort)
i++
}
}
s.connsmu.RUnlock()
}
fmt.Fprintf(w, "connected_slaves:%d\r\n", len(s.aofconnM)) // Number of connected slaves
2016-08-26 22:54:19 +03:00
}
func (s *Server) writeInfoCluster(w *bytes.Buffer) {
2016-08-26 22:54:19 +03:00
fmt.Fprintf(w, "cluster_enabled:0\r\n")
}
2022-09-25 00:28:47 +03:00
// INFO [section ...]
func (s *Server) cmdINFO(msg *Message) (res resp.Value, err error) {
2016-08-26 22:54:19 +03:00
start := time.Now()
2017-10-05 18:20:40 +03:00
2022-09-25 00:28:47 +03:00
// >> Args
args := msg.Args
msects := make(map[string]bool)
allsects := []string{
"server", "clients", "memory", "persistence", "stats",
"replication", "cpu", "cluster", "keyspace",
}
if len(args) == 1 {
for _, s := range allsects {
msects[s] = true
}
}
for i := 1; i < len(args); i++ {
section := strings.ToLower(args[i])
2016-08-26 22:54:19 +03:00
switch section {
2022-09-25 00:28:47 +03:00
case "all", "default":
for _, s := range allsects {
msects[s] = true
}
2016-08-26 22:54:19 +03:00
default:
2022-09-25 00:28:47 +03:00
for _, s := range allsects {
if s == section {
msects[section] = true
}
}
}
}
// >> Operation
var sects []string
for _, s := range allsects {
if msects[s] {
sects = append(sects, s)
2016-08-26 22:54:19 +03:00
}
}
2016-03-29 02:11:29 +03:00
2016-08-26 22:54:19 +03:00
w := &bytes.Buffer{}
2022-09-25 00:28:47 +03:00
for i, section := range sects {
2016-08-26 22:54:19 +03:00
if i > 0 {
w.WriteString("\r\n")
}
switch strings.ToLower(section) {
default:
continue
case "server":
w.WriteString("# Server\r\n")
s.writeInfoServer(w)
2016-08-26 22:54:19 +03:00
case "clients":
w.WriteString("# Clients\r\n")
s.writeInfoClients(w)
2016-08-26 22:54:19 +03:00
case "memory":
w.WriteString("# Memory\r\n")
s.writeInfoMemory(w)
2016-08-26 22:54:19 +03:00
case "persistence":
w.WriteString("# Persistence\r\n")
s.writeInfoPersistence(w)
2016-08-26 22:54:19 +03:00
case "stats":
w.WriteString("# Stats\r\n")
s.writeInfoStats(w)
2016-08-26 22:54:19 +03:00
case "replication":
w.WriteString("# Replication\r\n")
s.writeInfoReplication(w)
2016-08-26 22:54:19 +03:00
case "cpu":
w.WriteString("# CPU\r\n")
s.writeInfoCPU(w)
2016-08-26 22:54:19 +03:00
case "cluster":
w.WriteString("# Cluster\r\n")
s.writeInfoCluster(w)
2016-08-26 22:54:19 +03:00
}
}
2022-09-25 00:28:47 +03:00
// >> Response
if msg.OutputType == JSON {
2019-01-15 01:29:29 +03:00
// Create a map of all key/value info fields
m := make(map[string]interface{})
for _, kv := range strings.Split(w.String(), "\r\n") {
kv = strings.TrimSpace(kv)
if !strings.HasPrefix(kv, "#") {
if split := strings.SplitN(kv, ":", 2); len(split) == 2 {
m[split[0]] = tryParseType(split[1])
}
}
}
// Marshal the map and use the output in the JSON response
2022-09-25 00:28:47 +03:00
data, _ := json.Marshal(m)
return resp.StringValue(`{"ok":true,"info":` + string(data) +
`,"elapsed":"` + time.Since(start).String() + "\"}"), nil
2016-08-26 22:54:19 +03:00
}
2022-09-25 00:28:47 +03:00
return resp.BytesValue(w.Bytes()), nil
2016-08-26 22:54:19 +03:00
}
2019-01-15 01:29:29 +03:00
// tryParseType attempts to parse the passed string as an integer, float64 and
// a bool returning any successful parsed values. It returns the passed string
// if all tries fail
func tryParseType(str string) interface{} {
if v, err := strconv.ParseInt(str, 10, 64); err == nil {
return v
}
if v, err := strconv.ParseFloat(str, 64); err == nil {
return v
}
if v, err := strconv.ParseBool(str); err == nil {
return v
}
return str
}
2016-03-29 02:11:29 +03:00
func respValuesSimpleMap(m map[string]interface{}) []resp.Value {
var keys []string
2016-04-03 05:16:36 +03:00
for key := range m {
2016-03-29 02:11:29 +03:00
keys = append(keys, key)
}
sort.Strings(keys)
var vals []resp.Value
for _, key := range keys {
val := m[key]
vals = append(vals, resp.StringValue(key))
vals = append(vals, resp.StringValue(fmt.Sprintf("%v", val)))
}
return vals
}