From 04005cbdc73f9b5435c4ae005fe15a2218f72b92 Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Thu, 12 Sep 2024 11:26:10 +0300 Subject: [PATCH] Support Resp 3 Redis Search Unstable Mode (#3098) * Updated module version that points to retracted package version (#3074) * Updated module version that points to retracted package version * Updated testing image to latest * support raw parsing for problematic Redis Search types * Add UnstableResp3SearchModule to client options * Add tests for Resp3 Search unstable mode * Add tests for Resp3 Search unstable mode * Add readme note * Add words to spellcheck * Add UnstableResp3SearchModule check to assertStableCommand * Fix assertStableCommand logic * remove go.mod changes * Check panic occur on tests * rename method * update errors * Rename flag to UnstableResp3 --------- Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com> Co-authored-by: vladvildanov --- .github/wordlist.txt | 3 + README.md | 3 + command.go | 17 ++-- options.go | 3 + redis.go | 21 ++++- ring.go | 2 + search_commands.go | 39 +++++++++ search_test.go | 188 ++++++++++++++++++++++++++++++++++++++++++- sentinel.go | 3 + universal.go | 3 + 10 files changed, 273 insertions(+), 9 deletions(-) diff --git a/.github/wordlist.txt b/.github/wordlist.txt index dceddff4..c200c60b 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -1,4 +1,5 @@ ACLs +APIs autoload autoloader autoloading @@ -46,9 +47,11 @@ runtime SHA sharding SETNAME +SpellCheck SSL struct stunnel +SynDump TCP TLS uri diff --git a/README.md b/README.md index c7951a4d..37714a97 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,9 @@ rdb := redis.NewClient(&redis.Options{ }) ``` +#### Unstable RESP3 Structures for RediSearch Commands +When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes. + ## Contributing Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library! diff --git a/command.go b/command.go index 9ae97a95..4ced2979 100644 --- a/command.go +++ b/command.go @@ -40,7 +40,7 @@ type Cmder interface { readTimeout() *time.Duration readReply(rd *proto.Reader) error - + readRawReply(rd *proto.Reader) error SetErr(error) Err() error } @@ -122,11 +122,11 @@ func cmdString(cmd Cmder, val interface{}) string { //------------------------------------------------------------------------------ type baseCmd struct { - ctx context.Context - args []interface{} - err error - keyPos int8 - + ctx context.Context + args []interface{} + err error + keyPos int8 + rawVal interface{} _readTimeout *time.Duration } @@ -197,6 +197,11 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) { cmd._readTimeout = &d } +func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) { + cmd.rawVal, err = rd.ReadReply() + return err +} + //------------------------------------------------------------------------------ type Cmd struct { diff --git a/options.go b/options.go index 6ed693a0..8ba74ccd 100644 --- a/options.go +++ b/options.go @@ -153,6 +153,9 @@ type Options struct { // Add suffix to client name. Default is empty. IdentitySuffix string + + // Enable Unstable mode for Redis Search module with RESP3. + UnstableResp3 bool } func (opt *Options) init() { diff --git a/redis.go b/redis.go index 527afb67..c8b50080 100644 --- a/redis.go +++ b/redis.go @@ -412,6 +412,19 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error { return lastErr } +func (c *baseClient) assertUnstableCommand(cmd Cmder) bool { + switch cmd.(type) { + case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd: + if c.opt.UnstableResp3 { + return true + } else { + panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.") + } + default: + return false + } +} + func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { @@ -427,8 +440,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool atomic.StoreUint32(&retryTimeout, 1) return err } - - if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil { + readReplyFunc := cmd.readReply + // Apply unstable RESP3 search module. + if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) { + readReplyFunc = cmd.readRawReply + } + if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil { if cmd.readTimeout() == nil { atomic.StoreUint32(&retryTimeout, 1) } else { diff --git a/ring.go b/ring.go index 4ae00542..b4022173 100644 --- a/ring.go +++ b/ring.go @@ -100,6 +100,7 @@ type RingOptions struct { DisableIndentity bool IdentitySuffix string + UnstableResp3 bool } func (opt *RingOptions) init() { @@ -168,6 +169,7 @@ func (opt *RingOptions) clientOptions() *Options { DisableIndentity: opt.DisableIndentity, IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, } } diff --git a/search_commands.go b/search_commands.go index f5118c77..1a8a4cfe 100644 --- a/search_commands.go +++ b/search_commands.go @@ -638,6 +638,14 @@ func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) { return cmd.val, cmd.err } +func (cmd *AggregateCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *AggregateCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *AggregateCmd) String() string { return cmdString(cmd, cmd.val) } @@ -1337,6 +1345,13 @@ func (cmd *FTInfoCmd) Val() FTInfoResult { return cmd.val } +func (cmd *FTInfoCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTInfoCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) { n, err := rd.ReadMapLen() if err != nil { @@ -1447,6 +1462,14 @@ func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult { return cmd.val } +func (cmd *FTSpellCheckCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSpellCheckCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) { data, err := rd.ReadSlice() if err != nil { @@ -1628,6 +1651,14 @@ func (cmd *FTSearchCmd) Val() FTSearchResult { return cmd.val } +func (cmd *FTSearchCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSearchCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) { data, err := rd.ReadSlice() if err != nil { @@ -1904,6 +1935,14 @@ func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) { return cmd.val, cmd.err } +func (cmd *FTSynDumpCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSynDumpCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error { termSynonymPairs, err := rd.ReadSlice() if err != nil { diff --git a/search_test.go b/search_test.go index 0e1a473b..93859a4e 100644 --- a/search_test.go +++ b/search_test.go @@ -18,11 +18,13 @@ func WaitForIndexing(c *redis.Client, index string) { return } time.Sleep(100 * time.Millisecond) + } else { + return } } } -var _ = Describe("RediSearch commands", Label("search"), func() { +var _ = Describe("RediSearch commands Resp 2", Label("search"), func() { ctx := context.TODO() var client *redis.Client @@ -1415,3 +1417,187 @@ func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []str // Expect(results0["id"]).To(BeEquivalentTo("a")) // Expect(results0["extra_attributes"].(map[interface{}]interface{})["__v_score"]).To(BeEquivalentTo("0")) // }) + +var _ = Describe("RediSearch commands Resp 3", Label("search"), func() { + ctx := context.TODO() + var client *redis.Client + var client2 *redis.Client + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3, UnstableResp3: true}) + client2 = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3}) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should handle FTAggregate with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftaggregate"), func() { + text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true} + num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true} + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, num1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + client.HSet(ctx, "doc1", "PrimaryKey", "9::362330", "CreatedDateTimeUTC", "637387878524969984") + client.HSet(ctx, "doc2", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "637387875859270016") + + options := &redis.FTAggregateOptions{Apply: []redis.FTAggregateApply{{Field: "@CreatedDateTimeUTC * 10", As: "CreatedDateTimeUTC"}}} + res, err := client.FTAggregateWithArgs(ctx, "idx1", "*", options).RawResult() + rawVal := client.FTAggregateWithArgs(ctx, "idx1", "*", options).RawVal() + + Expect(err).NotTo(HaveOccurred()) + Expect(rawVal).To(BeEquivalentTo(res)) + results := res.(map[interface{}]interface{})["results"].([]interface{}) + Expect(results[0].(map[interface{}]interface{})["extra_attributes"].(map[interface{}]interface{})["CreatedDateTimeUTC"]). + To(Or(BeEquivalentTo("6373878785249699840"), BeEquivalentTo("6373878758592700416"))) + Expect(results[1].(map[interface{}]interface{})["extra_attributes"].(map[interface{}]interface{})["CreatedDateTimeUTC"]). + To(Or(BeEquivalentTo("6373878785249699840"), BeEquivalentTo("6373878758592700416"))) + + // Test with UnstableResp3 false + Expect(func() { + options = &redis.FTAggregateOptions{Apply: []redis.FTAggregateApply{{Field: "@CreatedDateTimeUTC * 10", As: "CreatedDateTimeUTC"}}} + rawRes, _ := client2.FTAggregateWithArgs(ctx, "idx1", "*", options).RawResult() + rawVal = client2.FTAggregateWithArgs(ctx, "idx1", "*", options).RawVal() + Expect(rawRes).To(BeNil()) + Expect(rawVal).To(BeNil()) + }).Should(Panic()) + + }) + + It("should handle FTInfo with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftinfo"), func() { + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText, Sortable: true, NoStem: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + resInfo, err := client.FTInfo(ctx, "idx1").RawResult() + Expect(err).NotTo(HaveOccurred()) + attributes := resInfo.(map[interface{}]interface{})["attributes"].([]interface{}) + flags := attributes[0].(map[interface{}]interface{})["flags"].([]interface{}) + Expect(flags).To(ConsistOf("SORTABLE", "NOSTEM")) + + valInfo := client.FTInfo(ctx, "idx1").RawVal() + attributes = valInfo.(map[interface{}]interface{})["attributes"].([]interface{}) + flags = attributes[0].(map[interface{}]interface{})["flags"].([]interface{}) + Expect(flags).To(ConsistOf("SORTABLE", "NOSTEM")) + + // Test with UnstableResp3 false + Expect(func() { + rawResInfo, _ := client2.FTInfo(ctx, "idx1").RawResult() + rawValInfo := client2.FTInfo(ctx, "idx1").RawVal() + Expect(rawResInfo).To(BeNil()) + Expect(rawValInfo).To(BeNil()) + }).Should(Panic()) + }) + + It("should handle FTSpellCheck with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftspellcheck"), func() { + text1 := &redis.FieldSchema{FieldName: "f1", FieldType: redis.SearchFieldTypeText} + text2 := &redis.FieldSchema{FieldName: "f2", FieldType: redis.SearchFieldTypeText} + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, text2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + client.HSet(ctx, "doc1", "f1", "some valid content", "f2", "this is sample text") + client.HSet(ctx, "doc2", "f1", "very important", "f2", "lorem ipsum") + + resSpellCheck, err := client.FTSpellCheck(ctx, "idx1", "impornant").RawResult() + valSpellCheck := client.FTSpellCheck(ctx, "idx1", "impornant").RawVal() + Expect(err).NotTo(HaveOccurred()) + Expect(valSpellCheck).To(BeEquivalentTo(resSpellCheck)) + results := resSpellCheck.(map[interface{}]interface{})["results"].(map[interface{}]interface{}) + Expect(results["impornant"].([]interface{})[0].(map[interface{}]interface{})["important"]).To(BeEquivalentTo(0.5)) + + // Test with UnstableResp3 false + Expect(func() { + rawResSpellCheck, _ := client2.FTSpellCheck(ctx, "idx1", "impornant").RawResult() + rawValSpellCheck := client2.FTSpellCheck(ctx, "idx1", "impornant").RawVal() + Expect(rawResSpellCheck).To(BeNil()) + Expect(rawValSpellCheck).To(BeNil()) + }).Should(Panic()) + }) + + It("should handle FTSearch with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftsearch"), func() { + val, err := client.FTCreate(ctx, "txt", &redis.FTCreateOptions{StopWords: []interface{}{"foo", "bar", "baz"}}, &redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "txt") + client.HSet(ctx, "doc1", "txt", "foo baz") + client.HSet(ctx, "doc2", "txt", "hello world") + res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{NoContent: true}).RawResult() + val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{NoContent: true}).RawVal() + Expect(err).NotTo(HaveOccurred()) + Expect(val1).To(BeEquivalentTo(res1)) + totalResults := res1.(map[interface{}]interface{})["total_results"] + Expect(totalResults).To(BeEquivalentTo(int64(0))) + res2, err := client.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawResult() + Expect(err).NotTo(HaveOccurred()) + totalResults2 := res2.(map[interface{}]interface{})["total_results"] + Expect(totalResults2).To(BeEquivalentTo(int64(1))) + + // Test with UnstableResp3 false + Expect(func() { + rawRes2, _ := client2.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawResult() + rawVal2 := client2.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawVal() + Expect(rawRes2).To(BeNil()) + Expect(rawVal2).To(BeNil()) + }).Should(Panic()) + }) + It("should handle FTSynDump with Unstable RESP3 Search Module and without stability", Label("search", "ftsyndump"), func() { + text1 := &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText} + text2 := &redis.FieldSchema{FieldName: "body", FieldType: redis.SearchFieldTypeText} + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{OnHash: true}, text1, text2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + resSynUpdate, err := client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"boy", "child", "offspring"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resSynUpdate).To(BeEquivalentTo("OK")) + + resSynUpdate, err = client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"baby", "child"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resSynUpdate).To(BeEquivalentTo("OK")) + + resSynUpdate, err = client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"tree", "wood"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resSynUpdate).To(BeEquivalentTo("OK")) + + resSynDump, err := client.FTSynDump(ctx, "idx1").RawResult() + valSynDump := client.FTSynDump(ctx, "idx1").RawVal() + Expect(err).NotTo(HaveOccurred()) + Expect(valSynDump).To(BeEquivalentTo(resSynDump)) + Expect(resSynDump.(map[interface{}]interface{})["baby"]).To(BeEquivalentTo([]interface{}{"id1"})) + + // Test with UnstableResp3 false + Expect(func() { + rawResSynDump, _ := client2.FTSynDump(ctx, "idx1").RawResult() + rawValSynDump := client2.FTSynDump(ctx, "idx1").RawVal() + Expect(rawResSynDump).To(BeNil()) + Expect(rawValSynDump).To(BeNil()) + }).Should(Panic()) + }) + + It("should test not affected Resp 3 Search method - FTExplain", Label("search", "ftexplain"), func() { + text1 := &redis.FieldSchema{FieldName: "f1", FieldType: redis.SearchFieldTypeText} + text2 := &redis.FieldSchema{FieldName: "f2", FieldType: redis.SearchFieldTypeText} + text3 := &redis.FieldSchema{FieldName: "f3", FieldType: redis.SearchFieldTypeText} + val, err := client.FTCreate(ctx, "txt", &redis.FTCreateOptions{}, text1, text2, text3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "txt") + res1, err := client.FTExplain(ctx, "txt", "@f3:f3_val @f2:f2_val @f1:f1_val").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res1).ToNot(BeEmpty()) + + // Test with UnstableResp3 false + Expect(func() { + res2, err := client2.FTExplain(ctx, "txt", "@f3:f3_val @f2:f2_val @f1:f1_val").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res2).ToNot(BeEmpty()) + }).ShouldNot(Panic()) + }) +}) diff --git a/sentinel.go b/sentinel.go index 188f8849..31569554 100644 --- a/sentinel.go +++ b/sentinel.go @@ -82,6 +82,7 @@ type FailoverOptions struct { DisableIndentity bool IdentitySuffix string + UnstableResp3 bool } func (opt *FailoverOptions) clientOptions() *Options { @@ -119,6 +120,7 @@ func (opt *FailoverOptions) clientOptions() *Options { DisableIndentity: opt.DisableIndentity, IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, } } @@ -156,6 +158,7 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options { DisableIndentity: opt.DisableIndentity, IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, } } diff --git a/universal.go b/universal.go index 275bef3d..f4d2d759 100644 --- a/universal.go +++ b/universal.go @@ -68,6 +68,7 @@ type UniversalOptions struct { DisableIndentity bool IdentitySuffix string + UnstableResp3 bool } // Cluster returns cluster options created from the universal options. @@ -160,6 +161,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions { DisableIndentity: o.DisableIndentity, IdentitySuffix: o.IdentitySuffix, + UnstableResp3: o.UnstableResp3, } } @@ -203,6 +205,7 @@ func (o *UniversalOptions) Simple() *Options { DisableIndentity: o.DisableIndentity, IdentitySuffix: o.IdentitySuffix, + UnstableResp3: o.UnstableResp3, } }