Function stats, function kill, fcall and fcall_ro (#2486)

This commit is contained in:
Elena Kolevska 2023-03-30 14:52:06 +01:00 committed by GitHub
parent 9aba95a74f
commit e96c7b5f58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 447 additions and 55 deletions

View File

@ -3993,6 +3993,231 @@ func (cmd *FunctionListCmd) readFunctions(rd *proto.Reader) ([]Function, error)
return functions, nil
}
// FunctionStats contains information about the scripts currently executing on the server, and the available engines
// - Engines:
// Statistics about the engine like number of functions and number of libraries
// - RunningScript:
// The script currently running on the shard we're connecting to.
// For Redis Enterprise and Redis Cloud, this represents the
// function with the longest running time, across all the running functions, on all shards
// - RunningScripts
// All scripts currently running in a Redis Enterprise clustered database.
// Only available on Redis Enterprise
type FunctionStats struct {
Engines []Engine
isRunning bool
rs RunningScript
allrs []RunningScript
}
func (fs *FunctionStats) Running() bool {
return fs.isRunning
}
func (fs *FunctionStats) RunningScript() (RunningScript, bool) {
return fs.rs, fs.isRunning
}
// AllRunningScripts returns all scripts currently running in a Redis Enterprise clustered database.
// Only available on Redis Enterprise
func (fs *FunctionStats) AllRunningScripts() []RunningScript {
return fs.allrs
}
type RunningScript struct {
Name string
Command []string
Duration time.Duration
}
type Engine struct {
Language string
LibrariesCount int64
FunctionsCount int64
}
type FunctionStatsCmd struct {
baseCmd
val FunctionStats
}
var _ Cmder = (*FunctionStatsCmd)(nil)
func NewFunctionStatsCmd(ctx context.Context, args ...interface{}) *FunctionStatsCmd {
return &FunctionStatsCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}
func (cmd *FunctionStatsCmd) SetVal(val FunctionStats) {
cmd.val = val
}
func (cmd *FunctionStatsCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *FunctionStatsCmd) Val() FunctionStats {
return cmd.val
}
func (cmd *FunctionStatsCmd) Result() (FunctionStats, error) {
return cmd.val, cmd.err
}
func (cmd *FunctionStatsCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
if err != nil {
return err
}
var key string
var result FunctionStats
for f := 0; f < n; f++ {
key, err = rd.ReadString()
if err != nil {
return err
}
switch key {
case "running_script":
result.rs, result.isRunning, err = cmd.readRunningScript(rd)
case "engines":
result.Engines, err = cmd.readEngines(rd)
case "all_running_scripts": // Redis Enterprise only
result.allrs, result.isRunning, err = cmd.readRunningScripts(rd)
default:
return fmt.Errorf("redis: function stats unexpected key %s", key)
}
if err != nil {
return err
}
}
cmd.val = result
return nil
}
func (cmd *FunctionStatsCmd) readRunningScript(rd *proto.Reader) (RunningScript, bool, error) {
err := rd.ReadFixedMapLen(3)
if err != nil {
if err == Nil {
return RunningScript{}, false, nil
}
return RunningScript{}, false, err
}
var runningScript RunningScript
for i := 0; i < 3; i++ {
key, err := rd.ReadString()
if err != nil {
return RunningScript{}, false, err
}
switch key {
case "name":
runningScript.Name, err = rd.ReadString()
case "duration_ms":
runningScript.Duration, err = cmd.readDuration(rd)
case "command":
runningScript.Command, err = cmd.readCommand(rd)
default:
return RunningScript{}, false, fmt.Errorf("redis: function stats unexpected running_script key %s", key)
}
if err != nil {
return RunningScript{}, false, err
}
}
return runningScript, true, nil
}
func (cmd *FunctionStatsCmd) readEngines(rd *proto.Reader) ([]Engine, error) {
n, err := rd.ReadMapLen()
if err != nil {
return nil, err
}
engines := make([]Engine, 0, n)
for i := 0; i < n; i++ {
engine := Engine{}
engine.Language, err = rd.ReadString()
if err != nil {
return nil, err
}
err = rd.ReadFixedMapLen(2)
if err != nil {
return nil, fmt.Errorf("redis: function stats unexpected %s engine map length", engine.Language)
}
for i := 0; i < 2; i++ {
key, err := rd.ReadString()
switch key {
case "libraries_count":
engine.LibrariesCount, err = rd.ReadInt()
case "functions_count":
engine.FunctionsCount, err = rd.ReadInt()
}
if err != nil {
return nil, err
}
}
engines = append(engines, engine)
}
return engines, nil
}
func (cmd *FunctionStatsCmd) readDuration(rd *proto.Reader) (time.Duration, error) {
t, err := rd.ReadInt()
if err != nil {
return time.Duration(0), err
}
return time.Duration(t) * time.Millisecond, nil
}
func (cmd *FunctionStatsCmd) readCommand(rd *proto.Reader) ([]string, error) {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
command := make([]string, 0, n)
for i := 0; i < n; i++ {
x, err := rd.ReadString()
if err != nil {
return nil, err
}
command = append(command, x)
}
return command, nil
}
func (cmd *FunctionStatsCmd) readRunningScripts(rd *proto.Reader) ([]RunningScript, bool, error) {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, false, err
}
runningScripts := make([]RunningScript, 0, n)
for i := 0; i < n; i++ {
rs, _, err := cmd.readRunningScript(rd)
if err != nil {
return nil, false, err
}
runningScripts = append(runningScripts, rs)
}
return runningScripts, len(runningScripts) > 0, nil
}
//------------------------------------------------------------------------------
// LCSQuery is a parameter used for the LCS command

View File

@ -408,10 +408,14 @@ type Cmdable interface {
FunctionLoadReplace(ctx context.Context, code string) *StringCmd
FunctionDelete(ctx context.Context, libName string) *StringCmd
FunctionFlush(ctx context.Context) *StringCmd
FunctionKill(ctx context.Context) *StringCmd
FunctionFlushAsync(ctx context.Context) *StringCmd
FunctionList(ctx context.Context, q FunctionListQuery) *FunctionListCmd
FunctionDump(ctx context.Context) *StringCmd
FunctionRestore(ctx context.Context, libDump string) *StringCmd
FunctionStats(ctx context.Context) *FunctionStatsCmd
FCall(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd
FCallRo(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd
Publish(ctx context.Context, channel string, message interface{}) *IntCmd
SPublish(ctx context.Context, channel string, message interface{}) *IntCmd
@ -3401,6 +3405,12 @@ func (c cmdable) FunctionFlush(ctx context.Context) *StringCmd {
return cmd
}
func (c cmdable) FunctionKill(ctx context.Context) *StringCmd {
cmd := NewStringCmd(ctx, "function", "kill")
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) FunctionFlushAsync(ctx context.Context) *StringCmd {
cmd := NewStringCmd(ctx, "function", "flush", "async")
_ = c(ctx, cmd)
@ -3434,6 +3444,44 @@ func (c cmdable) FunctionRestore(ctx context.Context, libDump string) *StringCmd
return cmd
}
func (c cmdable) FunctionStats(ctx context.Context) *FunctionStatsCmd {
cmd := NewFunctionStatsCmd(ctx, "function", "stats")
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) FCall(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd {
cmdArgs := fcallArgs("fcall", function, keys, args...)
cmd := NewCmd(ctx, cmdArgs...)
if len(keys) > 0 {
cmd.SetFirstKeyPos(3)
}
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) FCallRo(ctx context.Context, function string, keys []string, args ...interface{}) *Cmd {
cmdArgs := fcallArgs("fcall_ro", function, keys, args...)
cmd := NewCmd(ctx, cmdArgs...)
if len(keys) > 0 {
cmd.SetFirstKeyPos(3)
}
_ = c(ctx, cmd)
return cmd
}
func fcallArgs(command string, function string, keys []string, args ...interface{}) []interface{} {
cmdArgs := make([]interface{}, 3+len(keys), 3+len(keys)+len(args))
cmdArgs[0] = command
cmdArgs[1] = function
cmdArgs[2] = len(keys)
for i, key := range keys {
cmdArgs[3+i] = key
}
cmdArgs = append(cmdArgs, args...)
return cmdArgs
}
//------------------------------------------------------------------------------
// Publish posts the message to the channel.

View File

@ -332,6 +332,57 @@ var _ = Describe("Commands", func() {
Expect(cmd.LastKeyPos).To(Equal(int8(0)))
Expect(cmd.StepCount).To(Equal(int8(0)))
})
It("should return all command names", func() {
cmdList := client.CommandList(ctx, nil)
Expect(cmdList.Err()).NotTo(HaveOccurred())
cmdNames := cmdList.Val()
Expect(cmdNames).NotTo(BeEmpty())
// Assert that some expected commands are present in the list
Expect(cmdNames).To(ContainElement("get"))
Expect(cmdNames).To(ContainElement("set"))
Expect(cmdNames).To(ContainElement("hset"))
})
It("should filter commands by module", func() {
filter := &redis.FilterBy{
Module: "JSON",
}
cmdList := client.CommandList(ctx, filter)
Expect(cmdList.Err()).NotTo(HaveOccurred())
Expect(cmdList.Val()).To(HaveLen(0))
})
It("should filter commands by ACL category", func() {
filter := &redis.FilterBy{
ACLCat: "admin",
}
cmdList := client.CommandList(ctx, filter)
Expect(cmdList.Err()).NotTo(HaveOccurred())
cmdNames := cmdList.Val()
// Assert that the returned list only contains commands from the admin ACL category
Expect(len(cmdNames)).To(BeNumerically(">", 10))
})
It("should filter commands by pattern", func() {
filter := &redis.FilterBy{
Pattern: "*GET*",
}
cmdList := client.CommandList(ctx, filter)
Expect(cmdList.Err()).NotTo(HaveOccurred())
cmdNames := cmdList.Val()
// Assert that the returned list only contains commands that match the given pattern
Expect(cmdNames).To(ContainElement("get"))
Expect(cmdNames).To(ContainElement("getbit"))
Expect(cmdNames).To(ContainElement("getrange"))
Expect(cmdNames).NotTo(ContainElement("set"))
})
})
Describe("debugging", func() {
@ -6242,13 +6293,21 @@ var _ = Describe("Commands", func() {
{
Name: "lib1_func1",
Description: "This is the func-1 of lib 1",
Flags: []string{"no-writes", "allow-stale"},
Flags: []string{"allow-oom", "allow-stale"},
},
},
Code: `#!lua name=%s
local function f1(keys, args)
return 'Function 1'
local hash = keys[1] -- Get the key name
local time = redis.call('TIME')[1] -- Get the current time from the Redis server
-- Add the current timestamp to the arguments that the user passed to the function, stored in args
table.insert(args, '_updated_at')
table.insert(args, time)
-- Run HSET with the updated argument list
return redis.call('HSET', hash, unpack(args))
end
redis.register_function{
@ -6374,6 +6433,13 @@ var _ = Describe("Commands", func() {
Expect(functionFlush.Err()).NotTo(HaveOccurred())
})
It("Kills a running function", func() {
functionKill := client.FunctionKill(ctx)
Expect(functionKill.Err()).To(MatchError("NOTBUSY No scripts in execution right now."))
// Add test for a long-running function, once we make the test for `function stats` pass
})
It("Lists registered functions", func() {
err := client.FunctionLoad(ctx, lib1Code).Err()
Expect(err).NotTo(HaveOccurred())
@ -6413,57 +6479,6 @@ var _ = Describe("Commands", func() {
Expect(err).To(Equal(redis.Nil))
})
It("should return all command names", func() {
cmdList := client.CommandList(ctx, nil)
Expect(cmdList.Err()).NotTo(HaveOccurred())
cmdNames := cmdList.Val()
Expect(cmdNames).NotTo(BeEmpty())
// Assert that some expected commands are present in the list
Expect(cmdNames).To(ContainElement("get"))
Expect(cmdNames).To(ContainElement("set"))
Expect(cmdNames).To(ContainElement("hset"))
})
It("should filter commands by module", func() {
filter := &redis.FilterBy{
Module: "JSON",
}
cmdList := client.CommandList(ctx, filter)
Expect(cmdList.Err()).NotTo(HaveOccurred())
Expect(cmdList.Val()).To(HaveLen(0))
})
It("should filter commands by ACL category", func() {
filter := &redis.FilterBy{
ACLCat: "admin",
}
cmdList := client.CommandList(ctx, filter)
Expect(cmdList.Err()).NotTo(HaveOccurred())
cmdNames := cmdList.Val()
// Assert that the returned list only contains commands from the admin ACL category
Expect(len(cmdNames)).To(BeNumerically(">", 10))
})
It("should filter commands by pattern", func() {
filter := &redis.FilterBy{
Pattern: "*GET*",
}
cmdList := client.CommandList(ctx, filter)
Expect(cmdList.Err()).NotTo(HaveOccurred())
cmdNames := cmdList.Val()
// Assert that the returned list only contains commands that match the given pattern
Expect(cmdNames).To(ContainElement("get"))
Expect(cmdNames).To(ContainElement("getbit"))
Expect(cmdNames).To(ContainElement("getrange"))
Expect(cmdNames).NotTo(ContainElement("set"))
})
It("Dump and restores all libraries", func() {
err := client.FunctionLoad(ctx, lib1Code).Err()
Expect(err).NotTo(HaveOccurred())
@ -6492,6 +6507,109 @@ var _ = Describe("Commands", func() {
Expect(list).To(HaveLen(2))
})
It("Calls a function", func() {
lib1Code = fmt.Sprintf(lib1.Code, lib1.Name, lib1.Functions[0].Name,
lib1.Functions[0].Description, lib1.Functions[0].Flags[0], lib1.Functions[0].Flags[1])
err := client.FunctionLoad(ctx, lib1Code).Err()
Expect(err).NotTo(HaveOccurred())
x := client.FCall(ctx, lib1.Functions[0].Name, []string{"my_hash"}, "a", 1, "b", 2)
Expect(x.Err()).NotTo(HaveOccurred())
Expect(x.Int()).To(Equal(3))
})
It("Calls a function as read-only", func() {
lib1Code = fmt.Sprintf(lib1.Code, lib1.Name, lib1.Functions[0].Name,
lib1.Functions[0].Description, lib1.Functions[0].Flags[0], lib1.Functions[0].Flags[1])
err := client.FunctionLoad(ctx, lib1Code).Err()
Expect(err).NotTo(HaveOccurred())
// This function doesn't have a "no-writes" flag
x := client.FCallRo(ctx, lib1.Functions[0].Name, []string{"my_hash"}, "a", 1, "b", 2)
Expect(x.Err()).To(HaveOccurred())
lib2Code = fmt.Sprintf(lib2.Code, lib2.Name, lib2.Functions[0].Name, lib2.Functions[1].Name,
lib2.Functions[1].Description, lib2.Functions[1].Flags[0])
// This function has a "no-writes" flag
err = client.FunctionLoad(ctx, lib2Code).Err()
Expect(err).NotTo(HaveOccurred())
x = client.FCallRo(ctx, lib2.Functions[1].Name, []string{})
Expect(x.Err()).NotTo(HaveOccurred())
Expect(x.Text()).To(Equal("Function 2"))
})
It("Shows function stats", func() {
defer client.FunctionKill(ctx)
// We can not run blocking commands in Redis functions, so we're using an infinite loop,
// but we're killing the function after calling FUNCTION STATS
lib := redis.Library{
Name: "mylib1",
Engine: "LUA",
Functions: []redis.Function{
{
Name: "lib1_func1",
Description: "This is the func-1 of lib 1",
Flags: []string{"no-writes"},
},
},
Code: `#!lua name=%s
local function f1(keys, args)
local i = 0
while true do
i = i + 1
end
end
redis.register_function{
function_name='%s',
description ='%s',
callback=f1,
flags={'%s'}
}`,
}
libCode := fmt.Sprintf(lib.Code, lib.Name, lib.Functions[0].Name,
lib.Functions[0].Description, lib.Functions[0].Flags[0])
err := client.FunctionLoad(ctx, libCode).Err()
Expect(err).NotTo(HaveOccurred())
r, err := client.FunctionStats(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(r.Engines)).To(Equal(1))
Expect(r.Running()).To(BeFalse())
started := make(chan bool)
go func() {
defer GinkgoRecover()
client2 := redis.NewClient(redisOptions())
started <- true
_, err = client2.FCall(ctx, lib.Functions[0].Name, nil).Result()
Expect(err).To(HaveOccurred())
}()
<-started
time.Sleep(1 * time.Second)
r, err = client.FunctionStats(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(r.Engines)).To(Equal(1))
rs, isRunning := r.RunningScript()
Expect(isRunning).To(BeTrue())
Expect(rs.Name).To(Equal(lib.Functions[0].Name))
Expect(rs.Duration > 0).To(BeTrue())
close(started)
})
})
Describe("SlowLogGet", func() {
@ -6512,6 +6630,7 @@ var _ = Describe("Commands", func() {
Expect(len(result)).NotTo(BeZero())
})
})
})
type numberStruct struct {