Add Redis Gears support (#2675)

* Add gears commands and tests

* Fix tfunctionlist and add docstrings

* fixes

* fixes

* add tfcallasync to gearsCmdable
This commit is contained in:
ofekshenawa 2023-08-16 17:06:07 +03:00 committed by GitHub
parent 558581eac6
commit aba21be8cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 368 additions and 1 deletions

View File

@ -52,4 +52,5 @@ uri
URI
url
variadic
RedisStack
RedisStack
RedisGears

View File

@ -3713,6 +3713,71 @@ func (cmd *MapStringStringSliceCmd) readReply(rd *proto.Reader) error {
return nil
}
//-----------------------------------------------------------------------
type MapStringInterfaceSliceCmd struct {
baseCmd
val []map[string]interface{}
}
var _ Cmder = (*MapStringInterfaceSliceCmd)(nil)
func NewMapStringInterfaceSliceCmd(ctx context.Context, args ...interface{}) *MapStringInterfaceSliceCmd {
return &MapStringInterfaceSliceCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}
func (cmd *MapStringInterfaceSliceCmd) SetVal(val []map[string]interface{}) {
cmd.val = val
}
func (cmd *MapStringInterfaceSliceCmd) Val() []map[string]interface{} {
return cmd.val
}
func (cmd *MapStringInterfaceSliceCmd) Result() ([]map[string]interface{}, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *MapStringInterfaceSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *MapStringInterfaceSliceCmd) readReply(rd *proto.Reader) error {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val = make([]map[string]interface{}, n)
for i := 0; i < n; i++ {
nn, err := rd.ReadMapLen()
if err != nil {
return err
}
cmd.val[i] = make(map[string]interface{}, nn)
for f := 0; f < nn; f++ {
k, err := rd.ReadString()
if err != nil {
return err
}
v, err := rd.ReadReply()
if err != nil {
if err != Nil {
return err
}
}
cmd.val[i][k] = v
}
}
return nil
}
//------------------------------------------------------------------------------
type KeyValuesCmd struct {

View File

@ -505,6 +505,7 @@ type Cmdable interface {
ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
gearsCmdable
probabilisticCmdable
}

161
redis_gears.go Normal file
View File

@ -0,0 +1,161 @@
package redis
import (
"context"
"fmt"
"strings"
)
type gearsCmdable interface {
TFunctionLoad(ctx context.Context, lib string) *StatusCmd
TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd
TFunctionDelete(ctx context.Context, libName string) *StatusCmd
TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd
TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd
TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
}
type TFunctionLoadOptions struct {
Replace bool
Config string
}
type TFunctionListOptions struct {
Withcode bool
Verbose int
Library string
}
type TFCallOptions struct {
Keys []string
Arguments []string
}
// TFunctionLoad - load a new JavaScript library into Redis.
// For more information - https://redis.io/commands/tfunction-load/
func (c cmdable) TFunctionLoad(ctx context.Context, lib string) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD", lib}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD"}
if options != nil {
if options.Replace {
args = append(args, "REPLACE")
}
if options.Config != "" {
args = append(args, "CONFIG", options.Config)
}
}
args = append(args, lib)
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFunctionDelete - delete a JavaScript library from Redis.
// For more information - https://redis.io/commands/tfunction-delete/
func (c cmdable) TFunctionDelete(ctx context.Context, libName string) *StatusCmd {
args := []interface{}{"TFUNCTION", "DELETE", libName}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFunctionList - list the functions with additional information about each function.
// For more information - https://redis.io/commands/tfunction-list/
func (c cmdable) TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
if options != nil {
if options.Withcode {
args = append(args, "WITHCODE")
}
if options.Verbose != 0 {
v := strings.Repeat("v", options.Verbose)
args = append(args, v)
}
if options.Library != "" {
args = append(args, "LIBRARY", options.Library)
}
}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFCall - invoke a function.
// For more information - https://redis.io/commands/tfcall/
func (c cmdable) TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
if options != nil {
if options.Keys != nil {
for _, key := range options.Keys {
args = append(args, key)
}
}
if options.Arguments != nil {
for _, key := range options.Arguments {
args = append(args, key)
}
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
// TFCallASYNC - invoke an asynchronous JavaScript function (coroutine).
// For more information - https://redis.io/commands/TFCallASYNC/
func (c cmdable) TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
func (c cmdable) TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
if options != nil {
if options.Keys != nil {
for _, key := range options.Keys {
args = append(args, key)
}
}
if options.Arguments != nil {
for _, key := range options.Arguments {
args = append(args, key)
}
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

139
redis_gears_test.go Normal file
View File

@ -0,0 +1,139 @@
package redis_test
import (
"context"
"fmt"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)
func libCode(libName string) string {
return fmt.Sprintf("#!js api_version=1.0 name=%s\n redis.registerFunction('foo', ()=>{{return 'bar'}})", libName)
}
func libCodeWithConfig(libName string) string {
lib := `#!js api_version=1.0 name=%s
var last_update_field_name = "__last_update__"
if (redis.config.last_update_field_name !== undefined) {
if (typeof redis.config.last_update_field_name != 'string') {
throw "last_update_field_name must be a string";
}
last_update_field_name = redis.config.last_update_field_name
}
redis.registerFunction("hset", function(client, key, field, val){
// get the current time in ms
var curr_time = client.call("time")[0];
return client.call('hset', key, field, val, last_update_field_name, curr_time);
});`
return fmt.Sprintf(lib, libName)
}
var _ = Describe("RedisGears commands", Label("gears"), func() {
ctx := context.TODO()
var client *redis.Client
BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should TFunctionLoad, TFunctionLoadArgs and TFunctionDelete ", Label("gears", "tfunctionload"), func() {
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtflo1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFunctionLoadOptions{Replace: true, Config: `{"last_update_field_name":"last_update"}`}
resultAdd, err = client.TFunctionLoadArgs(ctx, libCodeWithConfig("libtflo1"), opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionDelete(ctx, "libtflo1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
It("should TFunctionList", Label("gears", "tfunctionlist"), func() {
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfli1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionLoad(ctx, libCode("libtfli2")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultList, err := client.TFunctionList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultList[0]["engine"]).To(BeEquivalentTo("js"))
opt := &redis.TFunctionListOptions{Withcode: true, Verbose: 2}
resultListArgs, err := client.TFunctionListArgs(ctx, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultListArgs[0]["code"]).To(BeEquivalentTo(libCode("libtfli1")))
resultAdd, err = client.TFunctionDelete(ctx, "libtfli1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfli2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
It("should TFCall", Label("gears", "tfcall"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCall(ctx, "libtfc1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
It("should TFCallArgs", Label("gears", "tfcallargs"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallArgs(ctx, "libtfca1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
It("should TFCallASYNC", Label("gears", "TFCallASYNC"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCallASYNC(ctx, "libtfc1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
It("should TFCallASYNCArgs", Label("gears", "TFCallASYNCargs"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallASYNCArgs(ctx, "libtfca1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})
})