2018-10-29 01:49:45 +03:00
package server
2016-03-05 02:08:16 +03:00
import (
2016-08-26 22:54:19 +03:00
"bytes"
2016-03-05 02:08:16 +03:00
"encoding/json"
2021-05-26 02:36:49 +03:00
"errors"
2016-03-05 02:08:16 +03:00
"fmt"
2016-12-23 00:52:37 +03:00
"os"
2016-03-05 02:08:16 +03:00
"runtime"
2016-03-29 01:50:18 +03:00
"sort"
2019-01-15 01:29:29 +03:00
"strconv"
2016-08-26 22:54:19 +03:00
"strings"
2019-09-04 02:35:42 +03:00
"sync"
2016-03-05 02:08:16 +03:00
"time"
2022-07-11 18:39:40 +03:00
"github.com/tidwall/buntdb"
2016-03-29 01:50:18 +03:00
"github.com/tidwall/resp"
2018-10-11 00:25:40 +03:00
"github.com/tidwall/tile38/core"
2022-09-13 18:16:41 +03:00
"github.com/tidwall/tile38/internal/collection"
2016-03-05 02:08:16 +03:00
)
2019-09-04 02:35:42 +03:00
var memStats runtime . MemStats
var memStatsMu sync . Mutex
var memStatsBG bool
// ReadMemStats returns the latest memstats. It provides an instant response.
func readMemStats ( ) runtime . MemStats {
memStatsMu . Lock ( )
if ! memStatsBG {
runtime . ReadMemStats ( & memStats )
go func ( ) {
2019-09-04 22:47:30 +03:00
var ms runtime . MemStats
2019-09-04 02:35:42 +03:00
for {
2019-09-04 22:47:30 +03:00
runtime . ReadMemStats ( & ms )
2019-09-04 02:35:42 +03:00
memStatsMu . Lock ( )
2019-09-04 22:47:30 +03:00
memStats = ms
2019-09-04 02:35:42 +03:00
memStatsMu . Unlock ( )
time . Sleep ( time . Second / 5 )
}
} ( )
memStatsBG = true
}
ms := memStats
memStatsMu . Unlock ( )
return ms
}
2022-09-24 01:29:46 +03:00
// STATS key [key...]
func ( s * Server ) cmdSTATS ( msg * Message ) ( resp . Value , error ) {
2016-03-05 02:08:16 +03:00
start := time . Now ( )
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
2022-09-24 01:29:46 +03:00
// >> Args
args := msg . Args
if len ( args ) < 2 {
return retrerr ( errInvalidNumberOfArguments )
2016-03-05 02:08:16 +03:00
}
2022-09-24 01:29:46 +03:00
// >> Operation
2016-03-29 02:11:29 +03:00
var vals [ ] resp . Value
2022-09-24 01:29:46 +03:00
var ms = [ ] map [ string ] interface { } { }
for i := 1 ; i < len ( args ) ; i ++ {
key := args [ i ]
2022-09-13 18:16:41 +03:00
col , _ := s . cols . Get ( key )
2016-03-05 02:08:16 +03:00
if col != nil {
m := make ( map [ string ] interface { } )
2016-10-25 01:35:47 +03:00
m [ "num_points" ] = col . PointCount ( )
2016-03-05 02:08:16 +03:00
m [ "in_memory_size" ] = col . TotalWeight ( )
2016-07-13 07:59:36 +03:00
m [ "num_objects" ] = col . Count ( )
2016-10-25 01:35:47 +03:00
m [ "num_strings" ] = col . StringCount ( )
2016-03-29 02:11:29 +03:00
switch msg . OutputType {
2018-10-29 01:49:45 +03:00
case JSON :
2016-03-29 02:11:29 +03:00
ms = append ( ms , m )
2018-10-29 01:49:45 +03:00
case RESP :
2016-03-29 02:11:29 +03:00
vals = append ( vals , resp . ArrayValue ( respValuesSimpleMap ( m ) ) )
}
2016-03-05 02:08:16 +03:00
} else {
2016-03-29 02:11:29 +03:00
switch msg . OutputType {
2018-10-29 01:49:45 +03:00
case JSON :
2016-03-29 02:11:29 +03:00
ms = append ( ms , nil )
2018-10-29 01:49:45 +03:00
case RESP :
2016-03-29 02:11:29 +03:00
vals = append ( vals , resp . NullValue ( ) )
}
2016-03-05 02:08:16 +03:00
}
}
2016-03-29 03:38:21 +03:00
2022-09-24 01:29:46 +03:00
// >> Response
if msg . OutputType == JSON {
data , _ := json . Marshal ( ms )
return resp . StringValue ( ` { "ok":true,"stats": ` + string ( data ) +
` ,"elapsed":" ` + time . Since ( start ) . String ( ) + "\"}" ) , nil
2016-03-05 02:08:16 +03:00
}
2022-09-24 01:29:46 +03:00
return resp . ArrayValue ( vals ) , nil
2016-03-05 02:08:16 +03:00
}
2018-11-26 23:24:15 +03:00
2022-09-24 03:34:09 +03:00
// HEALTHZ
func ( s * Server ) cmdHEALTHZ ( msg * Message ) ( resp . Value , error ) {
2021-05-26 02:36:49 +03:00
start := time . Now ( )
2022-09-24 03:34:09 +03:00
// >> Args
args := msg . Args
if len ( args ) != 1 {
return retrerr ( errInvalidNumberOfArguments )
}
// >> Operation
2021-05-26 02:42:26 +03:00
if s . config . followHost ( ) != "" {
m := make ( map [ string ] interface { } )
s . basicStats ( m )
2021-06-09 20:55:07 +03:00
if fmt . Sprintf ( "%v" , m [ "caught_up" ] ) != "true" {
2022-09-24 03:34:09 +03:00
return retrerr ( errors . New ( "not caught up" ) )
2021-05-26 02:42:26 +03:00
}
2021-05-26 02:36:49 +03:00
}
2022-09-24 03:34:09 +03:00
// >> Response
if msg . OutputType == JSON {
return resp . StringValue ( ` { "ok":true,"elapsed":" ` +
time . Since ( start ) . String ( ) + "\"}" ) , nil
2021-05-26 02:36:49 +03:00
}
2022-09-24 03:34:09 +03:00
return resp . SimpleStringValue ( "OK" ) , nil
2021-05-26 02:36:49 +03:00
}
2022-09-24 03:54:49 +03:00
// SERVER [ext]
func ( s * Server ) cmdSERVER ( msg * Message ) ( resp . Value , error ) {
2016-03-05 02:08:16 +03:00
start := time . Now ( )
2022-09-24 03:54:49 +03:00
// >> Args
args := msg . Args
var ext bool
for i := 1 ; i < len ( args ) ; i ++ {
switch strings . ToLower ( args [ i ] ) {
case "ext" :
ext = true
default :
return retrerr ( errInvalidArgument ( args [ i ] ) )
}
}
// >> Operation
2018-11-26 23:24:15 +03:00
m := make ( map [ string ] interface { } )
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
2022-09-24 03:54:49 +03:00
if ext {
s . extStats ( m )
} else {
2019-10-30 20:17:59 +03:00
s . basicStats ( m )
2016-03-05 02:08:16 +03:00
}
2018-11-26 23:24:15 +03:00
2022-09-24 03:54:49 +03:00
// >> Response
if msg . OutputType == JSON {
data , _ := json . Marshal ( m )
return resp . StringValue ( ` { "ok":true,"stats": ` + string ( data ) +
` ,"elapsed":" ` + time . Since ( start ) . String ( ) + "\"}" ) , nil
2018-11-26 23:24:15 +03:00
}
2022-09-24 03:54:49 +03:00
return resp . ArrayValue ( respValuesSimpleMap ( m ) ) , nil
2018-11-26 23:24:15 +03:00
}
// basicStats populates the passed map with basic system/go/tile38 statistics
2019-10-30 20:17:59 +03:00
func ( s * Server ) basicStats ( m map [ string ] interface { } ) {
m [ "id" ] = s . config . serverID ( )
if s . config . followHost ( ) != "" {
m [ "following" ] = fmt . Sprintf ( "%s:%d" , s . config . followHost ( ) ,
s . config . followPort ( ) )
m [ "caught_up" ] = s . fcup
m [ "caught_up_once" ] = s . fcuponce
2016-03-05 02:08:16 +03:00
}
2019-10-30 20:17:59 +03:00
m [ "http_transport" ] = s . http
2016-12-23 00:52:37 +03:00
m [ "pid" ] = os . Getpid ( )
2019-10-30 20:17:59 +03:00
m [ "aof_size" ] = s . aofsz
m [ "num_collections" ] = s . cols . Len ( )
2021-09-13 20:02:36 +03:00
m [ "num_hooks" ] = s . hooks . Len ( )
2016-03-05 02:08:16 +03:00
sz := 0
2022-09-13 18:16:41 +03:00
s . cols . Scan ( func ( key string , col * collection . Collection ) bool {
2016-03-05 02:08:16 +03:00
sz += col . TotalWeight ( )
return true
} )
m [ "in_memory_size" ] = sz
points := 0
objects := 0
2022-07-11 18:39:40 +03:00
nstrings := 0
2022-09-13 18:16:41 +03:00
s . cols . Scan ( func ( key string , col * collection . Collection ) bool {
2016-03-05 02:08:16 +03:00
points += col . PointCount ( )
2016-07-13 07:59:36 +03:00
objects += col . Count ( )
2022-07-11 18:39:40 +03:00
nstrings += col . StringCount ( )
2016-03-05 02:08:16 +03:00
return true
} )
m [ "num_points" ] = points
m [ "num_objects" ] = objects
2022-07-11 18:39:40 +03:00
m [ "num_strings" ] = nstrings
2019-09-04 02:35:42 +03:00
mem := readMemStats ( )
2016-03-05 02:08:16 +03:00
avgsz := 0
if points != 0 {
avgsz = int ( mem . HeapAlloc ) / points
}
2016-10-25 01:58:30 +03:00
m [ "mem_alloc" ] = mem . Alloc
2016-03-05 02:08:16 +03:00
m [ "heap_size" ] = mem . HeapAlloc
2016-12-23 00:52:37 +03:00
m [ "heap_released" ] = mem . HeapReleased
2019-10-30 20:17:59 +03:00
m [ "max_heap_size" ] = s . config . maxMemory ( )
2016-03-05 02:08:16 +03:00
m [ "avg_item_size" ] = avgsz
2018-11-13 20:24:15 +03:00
m [ "version" ] = core . Version
2016-03-05 02:08:16 +03:00
m [ "pointer_size" ] = ( 32 << uintptr ( uint64 ( ^ uintptr ( 0 ) ) >> 63 ) ) / 8
2019-10-30 20:17:59 +03:00
m [ "read_only" ] = s . config . readOnly ( )
2018-03-10 03:24:23 +03:00
m [ "cpus" ] = runtime . NumCPU ( )
2018-11-26 23:24:15 +03:00
n , _ := runtime . ThreadCreateProfile ( nil )
m [ "threads" ] = float64 ( n )
2022-07-11 18:39:40 +03:00
var nevents int
s . qdb . View ( func ( tx * buntdb . Tx ) error {
// All entries in the buntdb log are events, except for one, which
// is "hook:idx".
nevents , _ = tx . Len ( )
nevents -= 1 // Ignore the "hook:idx"
if nevents < 0 {
nevents = 0
}
return nil
} )
m [ "pending_events" ] = nevents
2018-11-26 23:24:15 +03:00
}
2016-03-05 02:08:16 +03:00
2018-11-26 23:24:15 +03:00
// extStats populates the passed map with extended system/go/tile38 statistics
2019-10-30 20:17:59 +03:00
func ( s * Server ) extStats ( m map [ string ] interface { } ) {
2018-11-26 23:24:15 +03:00
n , _ := runtime . ThreadCreateProfile ( nil )
2019-09-04 02:35:42 +03:00
mem := readMemStats ( )
2018-11-26 23:24:15 +03:00
// Go/Memory Stats
// Number of goroutines that currently exist
m [ "go_goroutines" ] = runtime . NumGoroutine ( )
// Number of OS threads created
m [ "go_threads" ] = float64 ( n )
// A summary of the GC invocation durations
m [ "go_version" ] = runtime . Version ( )
// Number of bytes allocated and still in use
m [ "alloc_bytes" ] = mem . Alloc
// Total number of bytes allocated, even if freed
m [ "alloc_bytes_total" ] = mem . TotalAlloc
// Number of CPUS available on the system
m [ "sys_cpus" ] = runtime . NumCPU ( )
// Number of bytes obtained from system
m [ "sys_bytes" ] = mem . Sys
// Total number of pointer lookups
m [ "lookups_total" ] = mem . Lookups
// Total number of mallocs
m [ "mallocs_total" ] = mem . Mallocs
// Total number of frees
m [ "frees_total" ] = mem . Frees
// Number of heap bytes allocated and still in use
m [ "heap_alloc_bytes" ] = mem . HeapAlloc
// Number of heap bytes obtained from system
m [ "heap_sys_bytes" ] = mem . HeapSys
// Number of heap bytes waiting to be used
m [ "heap_idle_bytes" ] = mem . HeapIdle
// Number of heap bytes that are in use
m [ "heap_inuse_bytes" ] = mem . HeapInuse
// Number of heap bytes released to OS
m [ "heap_released_bytes" ] = mem . HeapReleased
// Number of allocated objects
m [ "heap_objects" ] = mem . HeapObjects
// Number of bytes in use by the stack allocator
m [ "stack_inuse_bytes" ] = mem . StackInuse
// Number of bytes obtained from system for stack allocator
m [ "stack_sys_bytes" ] = mem . StackSys
// Number of bytes in use by mspan structures
m [ "mspan_inuse_bytes" ] = mem . MSpanInuse
// Number of bytes used for mspan structures obtained from system
m [ "mspan_sys_bytes" ] = mem . MSpanSys
// Number of bytes in use by mcache structures
m [ "mcache_inuse_bytes" ] = mem . MCacheInuse
// Number of bytes used for mcache structures obtained from system
m [ "mcache_sys_bytes" ] = mem . MCacheSys
// Number of bytes used by the profiling bucket hash table
m [ "buck_hash_sys_bytes" ] = mem . BuckHashSys
// Number of bytes used for garbage collection system metadata
m [ "gc_sys_bytes" ] = mem . GCSys
// Number of bytes used for other system allocations
m [ "other_sys_bytes" ] = mem . OtherSys
// Number of heap bytes when next garbage collection will take place
m [ "next_gc_bytes" ] = mem . NextGC
// Number of seconds since 1970 of last garbage collection
m [ "last_gc_time_seconds" ] = float64 ( mem . LastGC ) / 1e9
// The fraction of this program's available CPU time used by the GC since
// the program started
m [ "gc_cpu_fraction" ] = mem . GCCPUFraction
// Tile38 Stats
// ID of the server
2019-10-30 20:17:59 +03:00
m [ "tile38_id" ] = s . config . serverID ( )
2018-11-26 23:24:15 +03:00
// The process ID of the server
m [ "tile38_pid" ] = os . Getpid ( )
// Version of Tile38 running
m [ "tile38_version" ] = core . Version
// Maximum heap size allowed
2019-10-30 20:17:59 +03:00
m [ "tile38_max_heap_size" ] = s . config . maxMemory ( )
2018-11-26 23:24:15 +03:00
// Type of instance running
2019-10-30 20:17:59 +03:00
if s . config . followHost ( ) == "" {
2018-11-26 23:24:15 +03:00
m [ "tile38_type" ] = "leader"
} else {
m [ "tile38_type" ] = "follower"
2016-03-05 02:08:16 +03:00
}
2018-11-26 23:24:15 +03:00
// Whether or not the server is read-only
2019-10-30 20:17:59 +03:00
m [ "tile38_read_only" ] = s . config . readOnly ( )
2018-11-26 23:24:15 +03:00
// Size of pointer
m [ "tile38_pointer_size" ] = ( 32 << uintptr ( uint64 ( ^ uintptr ( 0 ) ) >> 63 ) ) / 8
// Uptime of the Tile38 server in seconds
2019-10-30 20:17:59 +03:00
m [ "tile38_uptime_in_seconds" ] = time . Since ( s . started ) . Seconds ( )
2018-11-26 23:24:15 +03:00
// Number of currently connected Tile38 clients
2019-10-30 20:17:59 +03:00
s . connsmu . RLock ( )
m [ "tile38_connected_clients" ] = len ( s . conns )
s . connsmu . RUnlock ( )
2018-11-26 23:24:15 +03:00
// Whether or not a cluster is enabled
m [ "tile38_cluster_enabled" ] = false
// Whether or not the Tile38 AOF is enabled
2022-09-25 01:42:07 +03:00
m [ "tile38_aof_enabled" ] = s . opts . AppendOnly
2018-11-26 23:24:15 +03:00
// Whether or not an AOF shrink is currently in progress
2019-10-30 20:17:59 +03:00
m [ "tile38_aof_rewrite_in_progress" ] = s . shrinking
2018-11-26 23:24:15 +03:00
// Length of time the last AOF shrink took
2022-09-27 20:15:31 +03:00
m [ "tile38_aof_last_rewrite_time_sec" ] = s . lastShrinkDuration . Load ( ) / int64 ( time . Second )
2018-11-26 23:24:15 +03:00
// Duration of the on-going AOF rewrite operation if any
var currentShrinkStart time . Time
if currentShrinkStart . IsZero ( ) {
m [ "tile38_aof_current_rewrite_time_sec" ] = 0
} else {
m [ "tile38_aof_current_rewrite_time_sec" ] = time . Since ( currentShrinkStart ) . Seconds ( )
}
// Total size of the AOF in bytes
2019-10-30 20:17:59 +03:00
m [ "tile38_aof_size" ] = s . aofsz
2018-11-26 23:24:15 +03:00
// Whether or no the HTTP transport is being served
2019-10-30 20:17:59 +03:00
m [ "tile38_http_transport" ] = s . http
2018-11-26 23:24:15 +03:00
// Number of connections accepted by the server
2022-09-27 20:15:31 +03:00
m [ "tile38_total_connections_received" ] = s . statsTotalConns . Load ( )
2018-11-26 23:24:15 +03:00
// Number of commands processed by the server
2022-09-27 20:15:31 +03:00
m [ "tile38_total_commands_processed" ] = s . statsTotalCommands . Load ( )
2019-03-14 21:23:23 +03:00
// Number of webhook messages sent by server
2022-09-27 20:15:31 +03:00
m [ "tile38_total_messages_sent" ] = s . statsTotalMsgsSent . Load ( )
2018-11-26 23:24:15 +03:00
// Number of key expiration events
2022-09-27 20:15:31 +03:00
m [ "tile38_expired_keys" ] = s . statsExpired . Load ( )
2018-11-26 23:24:15 +03:00
// Number of connected slaves
2019-10-30 20:17:59 +03:00
m [ "tile38_connected_slaves" ] = len ( s . aofconnM )
2018-11-26 23:24:15 +03:00
points := 0
objects := 0
strings := 0
2022-09-13 18:16:41 +03:00
s . cols . Scan ( func ( key string , col * collection . Collection ) bool {
2018-11-26 23:24:15 +03:00
points += col . PointCount ( )
objects += col . Count ( )
strings += col . StringCount ( )
return true
} )
// Number of points in the database
m [ "tile38_num_points" ] = points
// Number of objects in the database
m [ "tile38_num_objects" ] = objects
// Number of string in the database
m [ "tile38_num_strings" ] = strings
// Number of collections in the database
2019-10-30 20:17:59 +03:00
m [ "tile38_num_collections" ] = s . cols . Len ( )
2018-11-26 23:24:15 +03:00
// Number of hooks in the database
2021-09-13 20:02:36 +03:00
m [ "tile38_num_hooks" ] = s . hooks . Len ( )
2021-08-20 15:00:14 +03:00
// Number of hook groups in the database
m [ "tile38_num_hook_groups" ] = s . groupHooks . Len ( )
// Number of object groups in the database
m [ "tile38_num_object_groups" ] = s . groupObjects . Len ( )
2018-11-26 23:24:15 +03:00
avgsz := 0
if points != 0 {
avgsz = int ( mem . HeapAlloc ) / points
}
// Average point size in bytes
m [ "tile38_avg_point_size" ] = avgsz
sz := 0
2022-09-13 18:16:41 +03:00
s . cols . Scan ( func ( key string , col * collection . Collection ) bool {
2018-11-26 23:24:15 +03:00
sz += col . TotalWeight ( )
return true
} )
// Total in memory size of all collections
m [ "tile38_in_memory_size" ] = sz
2016-03-05 02:08:16 +03:00
}
2016-08-26 23:42:52 +03:00
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoServer ( w * bytes . Buffer ) {
2016-08-26 23:42:52 +03:00
fmt . Fprintf ( w , "tile38_version:%s\r\n" , core . Version )
2018-11-26 23:24:15 +03:00
fmt . Fprintf ( w , "redis_version:%s\r\n" , core . Version ) // Version of the Redis server
2019-10-30 20:17:59 +03:00
fmt . Fprintf ( w , "uptime_in_seconds:%d\r\n" , int ( time . Since ( s . started ) . Seconds ( ) ) ) // Number of seconds since Redis server start
2016-08-26 22:54:19 +03:00
}
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoClients ( w * bytes . Buffer ) {
s . connsmu . RLock ( )
fmt . Fprintf ( w , "connected_clients:%d\r\n" , len ( s . conns ) ) // Number of client connections (excluding connections from slaves)
s . connsmu . RUnlock ( )
2016-08-26 22:54:19 +03:00
}
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoMemory ( w * bytes . Buffer ) {
2019-09-04 02:35:42 +03:00
mem := readMemStats ( )
2016-08-26 22:54:19 +03:00
fmt . Fprintf ( w , "used_memory:%d\r\n" , mem . Alloc ) // total number of bytes allocated by Redis using its allocator (either standard libc, jemalloc, or an alternative allocator such as tcmalloc
}
func boolInt ( t bool ) int {
if t {
return 1
}
return 0
}
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoPersistence ( w * bytes . Buffer ) {
2022-09-25 01:42:07 +03:00
fmt . Fprintf ( w , "aof_enabled:%d\r\n" , boolInt ( s . opts . AppendOnly ) )
2022-09-27 20:15:31 +03:00
fmt . Fprintf ( w , "aof_rewrite_in_progress:%d\r\n" , boolInt ( s . shrinking ) ) // Flag indicating a AOF rewrite operation is on-going
fmt . Fprintf ( w , "aof_last_rewrite_time_sec:%d\r\n" , s . lastShrinkDuration . Load ( ) / int64 ( time . Second ) ) // Duration of the last AOF rewrite operation in seconds
2018-11-23 11:39:04 +03:00
var currentShrinkStart time . Time // c.currentShrinkStart.get()
2017-09-30 18:00:29 +03:00
if currentShrinkStart . IsZero ( ) {
2016-08-26 22:54:19 +03:00
fmt . Fprintf ( w , "aof_current_rewrite_time_sec:0\r\n" ) // Duration of the on-going AOF rewrite operation if any
} else {
2021-03-31 18:13:44 +03:00
fmt . Fprintf ( w , "aof_current_rewrite_time_sec:%d\r\n" , time . Since ( currentShrinkStart ) / time . Second ) // Duration of the on-going AOF rewrite operation if any
2016-08-26 22:54:19 +03:00
}
}
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoStats ( w * bytes . Buffer ) {
2022-09-27 20:15:31 +03:00
fmt . Fprintf ( w , "total_connections_received:%d\r\n" , s . statsTotalConns . Load ( ) ) // Total number of connections accepted by the server
fmt . Fprintf ( w , "total_commands_processed:%d\r\n" , s . statsTotalCommands . Load ( ) ) // Total number of commands processed by the server
fmt . Fprintf ( w , "total_messages_sent:%d\r\n" , s . statsTotalMsgsSent . Load ( ) ) // Total number of commands processed by the server
fmt . Fprintf ( w , "expired_keys:%d\r\n" , s . statsExpired . Load ( ) ) // Total number of key expiration events
2016-08-26 22:54:19 +03:00
}
2019-01-19 00:51:20 +03:00
2023-05-21 18:22:27 +03:00
func replicaIPAndPort ( cc * Client ) ( ip string , port int ) {
ip = cc . remoteAddr
i := strings . LastIndex ( ip , ":" )
if i != - 1 {
ip = ip [ : i ]
if ip == "[::1]" {
ip = "localhost"
}
}
port = cc . replPort
return ip , port
}
2019-01-19 00:51:20 +03:00
// writeInfoReplication writes all replication data to the 'info' response
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoReplication ( w * bytes . Buffer ) {
if s . config . followHost ( ) != "" {
2019-01-19 00:51:20 +03:00
fmt . Fprintf ( w , "role:slave\r\n" )
2019-10-30 20:17:59 +03:00
fmt . Fprintf ( w , "master_host:%s\r\n" , s . config . followHost ( ) )
fmt . Fprintf ( w , "master_port:%v\r\n" , s . config . followPort ( ) )
2022-06-22 19:50:43 +03:00
if s . config . replicaPriority ( ) >= 0 {
fmt . Fprintf ( w , "slave_priority:%v\r\n" , s . config . replicaPriority ( ) )
2022-06-20 16:50:40 +03:00
}
2019-01-19 00:51:20 +03:00
} else {
fmt . Fprintf ( w , "role:master\r\n" )
var i int
2019-10-30 20:17:59 +03:00
s . connsmu . RLock ( )
for _ , cc := range s . conns {
2019-01-19 00:51:20 +03:00
if cc . replPort != 0 {
2023-05-21 18:22:27 +03:00
ip , port := replicaIPAndPort ( cc )
2019-01-19 00:51:20 +03:00
fmt . Fprintf ( w , "slave%v:ip=%s,port=%v,state=online\r\n" , i ,
2023-05-21 18:22:27 +03:00
ip , port )
2019-01-19 00:51:20 +03:00
i ++
}
}
2019-10-30 20:17:59 +03:00
s . connsmu . RUnlock ( )
2019-01-19 00:51:20 +03:00
}
2019-10-30 20:17:59 +03:00
fmt . Fprintf ( w , "connected_slaves:%d\r\n" , len ( s . aofconnM ) ) // Number of connected slaves
2016-08-26 22:54:19 +03:00
}
2019-01-19 00:51:20 +03:00
2019-10-30 20:17:59 +03:00
func ( s * Server ) writeInfoCluster ( w * bytes . Buffer ) {
2016-08-26 22:54:19 +03:00
fmt . Fprintf ( w , "cluster_enabled:0\r\n" )
}
2022-09-25 00:28:47 +03:00
// INFO [section ...]
func ( s * Server ) cmdINFO ( msg * Message ) ( res resp . Value , err error ) {
2016-08-26 22:54:19 +03:00
start := time . Now ( )
Lua scripting feature. (#224)
* Start on lua scripting
* Implement evalsha, script load, script exists, and script flush
* Type conversions from lua to resp/json.
Refactor to make luastate and luascripts persistent in the controller.
* Change controller.command and all underlying commands to return resp.Value.
Serialize only during the ouput.
* First stab at tile38 call from lua
* Change tile38 into tile38.call in Lua
* Property return errors from scripts
* Minor refactoring. No locking on script run
* Cleanup/refactoring
* Create a pool of 5 lua states, allow for more as needed. Refactor.
* Use safe map for scripts. Add a limit for max number of lua states. Refactor.
* Refactor
* Refactor script commands into atomic, read-only, and non-atomic classes.
Proper locking for all three classes.
Add tests for scripts
* More tests for scripts
* Properly escape newlines in lua-produced errors
* Better test for readonly failure
* Correctly convert ok/err messages between lua and resp.
Add pcall, sha1hex, error_reply, status_reply functions to tile38 namespace in lua.
* Add pcall test. Change writeErr to work with string argument
* Make sure eval/evalsha never attempt to write AOF
* Add eval-set and eval-get to benchmarks
* Fix eval benchmark tests, add more
* Improve benchmarks
* Optimizations and refactoring.
* Add lua memtest
* Typo
* Add dependency
* golint fixes
* gofmt fixes
* Add scripting commands to the core/commands.json
* Use ARGV for args inside lua
2017-10-05 18:20:40 +03:00
2022-09-25 00:28:47 +03:00
// >> Args
args := msg . Args
msects := make ( map [ string ] bool )
allsects := [ ] string {
"server" , "clients" , "memory" , "persistence" , "stats" ,
"replication" , "cpu" , "cluster" , "keyspace" ,
}
if len ( args ) == 1 {
for _ , s := range allsects {
msects [ s ] = true
}
}
for i := 1 ; i < len ( args ) ; i ++ {
section := strings . ToLower ( args [ i ] )
2016-08-26 22:54:19 +03:00
switch section {
2022-09-25 00:28:47 +03:00
case "all" , "default" :
for _ , s := range allsects {
msects [ s ] = true
}
2016-08-26 22:54:19 +03:00
default :
2022-09-25 00:28:47 +03:00
for _ , s := range allsects {
if s == section {
msects [ section ] = true
}
}
}
}
// >> Operation
var sects [ ] string
for _ , s := range allsects {
if msects [ s ] {
sects = append ( sects , s )
2016-08-26 22:54:19 +03:00
}
}
2016-03-29 02:11:29 +03:00
2016-08-26 22:54:19 +03:00
w := & bytes . Buffer { }
2022-09-25 00:28:47 +03:00
for i , section := range sects {
2016-08-26 22:54:19 +03:00
if i > 0 {
w . WriteString ( "\r\n" )
}
switch strings . ToLower ( section ) {
default :
continue
case "server" :
w . WriteString ( "# Server\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoServer ( w )
2016-08-26 22:54:19 +03:00
case "clients" :
w . WriteString ( "# Clients\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoClients ( w )
2016-08-26 22:54:19 +03:00
case "memory" :
w . WriteString ( "# Memory\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoMemory ( w )
2016-08-26 22:54:19 +03:00
case "persistence" :
w . WriteString ( "# Persistence\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoPersistence ( w )
2016-08-26 22:54:19 +03:00
case "stats" :
w . WriteString ( "# Stats\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoStats ( w )
2016-08-26 22:54:19 +03:00
case "replication" :
w . WriteString ( "# Replication\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoReplication ( w )
2016-08-26 22:54:19 +03:00
case "cpu" :
w . WriteString ( "# CPU\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoCPU ( w )
2016-08-26 22:54:19 +03:00
case "cluster" :
w . WriteString ( "# Cluster\r\n" )
2019-10-30 20:17:59 +03:00
s . writeInfoCluster ( w )
2016-08-26 22:54:19 +03:00
}
}
2022-09-25 00:28:47 +03:00
// >> Response
if msg . OutputType == JSON {
2019-01-15 01:29:29 +03:00
// Create a map of all key/value info fields
m := make ( map [ string ] interface { } )
for _ , kv := range strings . Split ( w . String ( ) , "\r\n" ) {
kv = strings . TrimSpace ( kv )
if ! strings . HasPrefix ( kv , "#" ) {
if split := strings . SplitN ( kv , ":" , 2 ) ; len ( split ) == 2 {
m [ split [ 0 ] ] = tryParseType ( split [ 1 ] )
}
}
}
// Marshal the map and use the output in the JSON response
2022-09-25 00:28:47 +03:00
data , _ := json . Marshal ( m )
return resp . StringValue ( ` { "ok":true,"info": ` + string ( data ) +
` ,"elapsed":" ` + time . Since ( start ) . String ( ) + "\"}" ) , nil
2016-08-26 22:54:19 +03:00
}
2022-09-25 00:28:47 +03:00
return resp . BytesValue ( w . Bytes ( ) ) , nil
2016-08-26 22:54:19 +03:00
}
2019-01-15 01:29:29 +03:00
// tryParseType attempts to parse the passed string as an integer, float64 and
// a bool returning any successful parsed values. It returns the passed string
// if all tries fail
func tryParseType ( str string ) interface { } {
if v , err := strconv . ParseInt ( str , 10 , 64 ) ; err == nil {
return v
}
if v , err := strconv . ParseFloat ( str , 64 ) ; err == nil {
return v
}
if v , err := strconv . ParseBool ( str ) ; err == nil {
return v
}
return str
}
2016-03-29 02:11:29 +03:00
func respValuesSimpleMap ( m map [ string ] interface { } ) [ ] resp . Value {
var keys [ ] string
2016-04-03 05:16:36 +03:00
for key := range m {
2016-03-29 02:11:29 +03:00
keys = append ( keys , key )
}
sort . Strings ( keys )
var vals [ ] resp . Value
for _ , key := range keys {
val := m [ key ]
vals = append ( vals , resp . StringValue ( key ) )
vals = append ( vals , resp . StringValue ( fmt . Sprintf ( "%v" , val ) ) )
}
return vals
}
2023-05-21 18:22:27 +03:00
// ROLE
func ( s * Server ) cmdROLE ( msg * Message ) ( res resp . Value , err error ) {
start := time . Now ( )
var role string
var offset int
var ips [ ] string
var ports [ ] int
var offsets [ ] int
var host string
var port int
var state string
if s . config . followHost ( ) == "" {
role = "master"
offset = s . aofsz
s . connsmu . RLock ( )
for _ , cc := range s . conns {
if cc . replPort != 0 {
ip , port := replicaIPAndPort ( cc )
ips = append ( ips , ip )
ports = append ( ports , port )
offsets = append ( offsets , s . aofsz )
}
}
s . connsmu . RUnlock ( )
} else {
role = "slave"
host = s . config . followHost ( )
port = s . config . followPort ( )
offset = int ( s . faofsz )
state = "connected"
}
if msg . OutputType == JSON {
var json [ ] byte
json = append ( json , ` { "ok":true,"role": { " ` ... )
json = append ( json , ` "role": ` ... )
json = appendJSONString ( json , role )
if role == "master" {
json = append ( json , ` ,"offset": ` ... )
json = strconv . AppendInt ( json , int64 ( offset ) , 10 )
json = append ( json , ` ,"slaves":[ ` ... )
for i := range ips {
if i > 0 {
json = append ( json , ',' )
}
json = append ( json , '{' )
json = append ( json , ` "ip": ` ... )
json = appendJSONString ( json , ips [ i ] )
json = append ( json , ` ,"port": ` ... )
json = appendJSONString ( json , fmt . Sprint ( ports [ i ] ) )
json = append ( json , ` ,"offset": ` ... )
json = appendJSONString ( json , fmt . Sprint ( offsets [ i ] ) )
json = append ( json , '}' )
}
json = append ( json , ` ] ` ... )
} else if role == "slave" {
json = append ( json , ` ,"host": ` ... )
json = appendJSONString ( json , host )
json = append ( json , ` ,"port": ` ... )
json = strconv . AppendInt ( json , int64 ( port ) , 10 )
json = append ( json , ` ,"state": ` ... )
json = appendJSONString ( json , state )
json = append ( json , ` ,"offset": ` ... )
json = strconv . AppendInt ( json , int64 ( offset ) , 10 )
}
json = append ( json , ` },"elapsed": ` ... )
json = appendJSONString ( json , time . Since ( start ) . String ( ) )
json = append ( json , '}' )
return resp . StringValue ( string ( json ) ) , nil
} else {
var vals [ ] resp . Value
vals = append ( vals , resp . StringValue ( role ) )
if role == "master" {
vals = append ( vals , resp . IntegerValue ( offset ) )
var replicaVals [ ] resp . Value
for i := range ips {
var vals [ ] resp . Value
vals = append ( vals , resp . StringValue ( ips [ i ] ) )
vals = append ( vals , resp . StringValue ( fmt . Sprint ( ports [ i ] ) ) )
vals = append ( vals , resp . StringValue ( fmt . Sprint ( offsets [ i ] ) ) )
replicaVals = append ( replicaVals , resp . ArrayValue ( vals ) )
}
vals = append ( vals , resp . ArrayValue ( replicaVals ) )
} else if role == "slave" {
vals = append ( vals , resp . StringValue ( host ) )
vals = append ( vals , resp . IntegerValue ( port ) )
vals = append ( vals , resp . StringValue ( state ) )
vals = append ( vals , resp . IntegerValue ( offset ) )
}
return resp . ArrayValue ( vals ) , nil
}
}