Support Resp 3 Redis Search Unstable Mode (#3098)

* Updated module version that points to retracted package version (#3074)

* Updated module version that points to retracted package version

* Updated testing image to latest

* support raw parsing for problematic Redis Search types

* Add UnstableResp3SearchModule to client options

* Add tests for Resp3 Search unstable mode

* Add tests for Resp3 Search unstable mode

* Add readme note

* Add words to spellcheck

* Add UnstableResp3SearchModule check to assertStableCommand

* Fix assertStableCommand logic

* remove go.mod changes

* Check panic occur on tests

* rename method

* update errors

* Rename flag to UnstableResp3

---------

Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
Co-authored-by: vladvildanov <divinez122@outlook.com>
This commit is contained in:
ofekshenawa 2024-09-12 11:26:10 +03:00 committed by GitHub
parent 9f6171dbf6
commit 04005cbdc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 273 additions and 9 deletions

View File

@ -1,4 +1,5 @@
ACLs
APIs
autoload
autoloader
autoloading
@ -46,9 +47,11 @@ runtime
SHA
sharding
SETNAME
SpellCheck
SSL
struct
stunnel
SynDump
TCP
TLS
uri

View File

@ -183,6 +183,9 @@ rdb := redis.NewClient(&redis.Options{
})
```
#### Unstable RESP3 Structures for RediSearch Commands
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

View File

@ -40,7 +40,7 @@ type Cmder interface {
readTimeout() *time.Duration
readReply(rd *proto.Reader) error
readRawReply(rd *proto.Reader) error
SetErr(error)
Err() error
}
@ -122,11 +122,11 @@ func cmdString(cmd Cmder, val interface{}) string {
//------------------------------------------------------------------------------
type baseCmd struct {
ctx context.Context
args []interface{}
err error
keyPos int8
ctx context.Context
args []interface{}
err error
keyPos int8
rawVal interface{}
_readTimeout *time.Duration
}
@ -197,6 +197,11 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) {
cmd._readTimeout = &d
}
func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) {
cmd.rawVal, err = rd.ReadReply()
return err
}
//------------------------------------------------------------------------------
type Cmd struct {

View File

@ -153,6 +153,9 @@ type Options struct {
// Add suffix to client name. Default is empty.
IdentitySuffix string
// Enable Unstable mode for Redis Search module with RESP3.
UnstableResp3 bool
}
func (opt *Options) init() {

View File

@ -412,6 +412,19 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
return lastErr
}
func (c *baseClient) assertUnstableCommand(cmd Cmder) bool {
switch cmd.(type) {
case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd:
if c.opt.UnstableResp3 {
return true
} else {
panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.")
}
default:
return false
}
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
@ -427,8 +440,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
atomic.StoreUint32(&retryTimeout, 1)
return err
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
readReplyFunc := cmd.readReply
// Apply unstable RESP3 search module.
if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) {
readReplyFunc = cmd.readRawReply
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil {
if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1)
} else {

View File

@ -100,6 +100,7 @@ type RingOptions struct {
DisableIndentity bool
IdentitySuffix string
UnstableResp3 bool
}
func (opt *RingOptions) init() {
@ -168,6 +169,7 @@ func (opt *RingOptions) clientOptions() *Options {
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
}
}

View File

@ -638,6 +638,14 @@ func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) {
return cmd.val, cmd.err
}
func (cmd *AggregateCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *AggregateCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *AggregateCmd) String() string {
return cmdString(cmd, cmd.val)
}
@ -1337,6 +1345,13 @@ func (cmd *FTInfoCmd) Val() FTInfoResult {
return cmd.val
}
func (cmd *FTInfoCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTInfoCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
if err != nil {
@ -1447,6 +1462,14 @@ func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult {
return cmd.val
}
func (cmd *FTSpellCheckCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTSpellCheckCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice()
if err != nil {
@ -1628,6 +1651,14 @@ func (cmd *FTSearchCmd) Val() FTSearchResult {
return cmd.val
}
func (cmd *FTSearchCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTSearchCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice()
if err != nil {
@ -1904,6 +1935,14 @@ func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) {
return cmd.val, cmd.err
}
func (cmd *FTSynDumpCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTSynDumpCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error {
termSynonymPairs, err := rd.ReadSlice()
if err != nil {

View File

@ -18,11 +18,13 @@ func WaitForIndexing(c *redis.Client, index string) {
return
}
time.Sleep(100 * time.Millisecond)
} else {
return
}
}
}
var _ = Describe("RediSearch commands", Label("search"), func() {
var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
ctx := context.TODO()
var client *redis.Client
@ -1415,3 +1417,187 @@ func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []str
// Expect(results0["id"]).To(BeEquivalentTo("a"))
// Expect(results0["extra_attributes"].(map[interface{}]interface{})["__v_score"]).To(BeEquivalentTo("0"))
// })
var _ = Describe("RediSearch commands Resp 3", Label("search"), func() {
ctx := context.TODO()
var client *redis.Client
var client2 *redis.Client
BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3, UnstableResp3: true})
client2 = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should handle FTAggregate with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftaggregate"), func() {
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, num1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
client.HSet(ctx, "doc1", "PrimaryKey", "9::362330", "CreatedDateTimeUTC", "637387878524969984")
client.HSet(ctx, "doc2", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "637387875859270016")
options := &redis.FTAggregateOptions{Apply: []redis.FTAggregateApply{{Field: "@CreatedDateTimeUTC * 10", As: "CreatedDateTimeUTC"}}}
res, err := client.FTAggregateWithArgs(ctx, "idx1", "*", options).RawResult()
rawVal := client.FTAggregateWithArgs(ctx, "idx1", "*", options).RawVal()
Expect(err).NotTo(HaveOccurred())
Expect(rawVal).To(BeEquivalentTo(res))
results := res.(map[interface{}]interface{})["results"].([]interface{})
Expect(results[0].(map[interface{}]interface{})["extra_attributes"].(map[interface{}]interface{})["CreatedDateTimeUTC"]).
To(Or(BeEquivalentTo("6373878785249699840"), BeEquivalentTo("6373878758592700416")))
Expect(results[1].(map[interface{}]interface{})["extra_attributes"].(map[interface{}]interface{})["CreatedDateTimeUTC"]).
To(Or(BeEquivalentTo("6373878785249699840"), BeEquivalentTo("6373878758592700416")))
// Test with UnstableResp3 false
Expect(func() {
options = &redis.FTAggregateOptions{Apply: []redis.FTAggregateApply{{Field: "@CreatedDateTimeUTC * 10", As: "CreatedDateTimeUTC"}}}
rawRes, _ := client2.FTAggregateWithArgs(ctx, "idx1", "*", options).RawResult()
rawVal = client2.FTAggregateWithArgs(ctx, "idx1", "*", options).RawVal()
Expect(rawRes).To(BeNil())
Expect(rawVal).To(BeNil())
}).Should(Panic())
})
It("should handle FTInfo with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftinfo"), func() {
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText, Sortable: true, NoStem: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
resInfo, err := client.FTInfo(ctx, "idx1").RawResult()
Expect(err).NotTo(HaveOccurred())
attributes := resInfo.(map[interface{}]interface{})["attributes"].([]interface{})
flags := attributes[0].(map[interface{}]interface{})["flags"].([]interface{})
Expect(flags).To(ConsistOf("SORTABLE", "NOSTEM"))
valInfo := client.FTInfo(ctx, "idx1").RawVal()
attributes = valInfo.(map[interface{}]interface{})["attributes"].([]interface{})
flags = attributes[0].(map[interface{}]interface{})["flags"].([]interface{})
Expect(flags).To(ConsistOf("SORTABLE", "NOSTEM"))
// Test with UnstableResp3 false
Expect(func() {
rawResInfo, _ := client2.FTInfo(ctx, "idx1").RawResult()
rawValInfo := client2.FTInfo(ctx, "idx1").RawVal()
Expect(rawResInfo).To(BeNil())
Expect(rawValInfo).To(BeNil())
}).Should(Panic())
})
It("should handle FTSpellCheck with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftspellcheck"), func() {
text1 := &redis.FieldSchema{FieldName: "f1", FieldType: redis.SearchFieldTypeText}
text2 := &redis.FieldSchema{FieldName: "f2", FieldType: redis.SearchFieldTypeText}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, text2).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
client.HSet(ctx, "doc1", "f1", "some valid content", "f2", "this is sample text")
client.HSet(ctx, "doc2", "f1", "very important", "f2", "lorem ipsum")
resSpellCheck, err := client.FTSpellCheck(ctx, "idx1", "impornant").RawResult()
valSpellCheck := client.FTSpellCheck(ctx, "idx1", "impornant").RawVal()
Expect(err).NotTo(HaveOccurred())
Expect(valSpellCheck).To(BeEquivalentTo(resSpellCheck))
results := resSpellCheck.(map[interface{}]interface{})["results"].(map[interface{}]interface{})
Expect(results["impornant"].([]interface{})[0].(map[interface{}]interface{})["important"]).To(BeEquivalentTo(0.5))
// Test with UnstableResp3 false
Expect(func() {
rawResSpellCheck, _ := client2.FTSpellCheck(ctx, "idx1", "impornant").RawResult()
rawValSpellCheck := client2.FTSpellCheck(ctx, "idx1", "impornant").RawVal()
Expect(rawResSpellCheck).To(BeNil())
Expect(rawValSpellCheck).To(BeNil())
}).Should(Panic())
})
It("should handle FTSearch with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftsearch"), func() {
val, err := client.FTCreate(ctx, "txt", &redis.FTCreateOptions{StopWords: []interface{}{"foo", "bar", "baz"}}, &redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "txt")
client.HSet(ctx, "doc1", "txt", "foo baz")
client.HSet(ctx, "doc2", "txt", "hello world")
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{NoContent: true}).RawResult()
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{NoContent: true}).RawVal()
Expect(err).NotTo(HaveOccurred())
Expect(val1).To(BeEquivalentTo(res1))
totalResults := res1.(map[interface{}]interface{})["total_results"]
Expect(totalResults).To(BeEquivalentTo(int64(0)))
res2, err := client.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawResult()
Expect(err).NotTo(HaveOccurred())
totalResults2 := res2.(map[interface{}]interface{})["total_results"]
Expect(totalResults2).To(BeEquivalentTo(int64(1)))
// Test with UnstableResp3 false
Expect(func() {
rawRes2, _ := client2.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawResult()
rawVal2 := client2.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawVal()
Expect(rawRes2).To(BeNil())
Expect(rawVal2).To(BeNil())
}).Should(Panic())
})
It("should handle FTSynDump with Unstable RESP3 Search Module and without stability", Label("search", "ftsyndump"), func() {
text1 := &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText}
text2 := &redis.FieldSchema{FieldName: "body", FieldType: redis.SearchFieldTypeText}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{OnHash: true}, text1, text2).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")
resSynUpdate, err := client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"boy", "child", "offspring"}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resSynUpdate).To(BeEquivalentTo("OK"))
resSynUpdate, err = client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"baby", "child"}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resSynUpdate).To(BeEquivalentTo("OK"))
resSynUpdate, err = client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"tree", "wood"}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resSynUpdate).To(BeEquivalentTo("OK"))
resSynDump, err := client.FTSynDump(ctx, "idx1").RawResult()
valSynDump := client.FTSynDump(ctx, "idx1").RawVal()
Expect(err).NotTo(HaveOccurred())
Expect(valSynDump).To(BeEquivalentTo(resSynDump))
Expect(resSynDump.(map[interface{}]interface{})["baby"]).To(BeEquivalentTo([]interface{}{"id1"}))
// Test with UnstableResp3 false
Expect(func() {
rawResSynDump, _ := client2.FTSynDump(ctx, "idx1").RawResult()
rawValSynDump := client2.FTSynDump(ctx, "idx1").RawVal()
Expect(rawResSynDump).To(BeNil())
Expect(rawValSynDump).To(BeNil())
}).Should(Panic())
})
It("should test not affected Resp 3 Search method - FTExplain", Label("search", "ftexplain"), func() {
text1 := &redis.FieldSchema{FieldName: "f1", FieldType: redis.SearchFieldTypeText}
text2 := &redis.FieldSchema{FieldName: "f2", FieldType: redis.SearchFieldTypeText}
text3 := &redis.FieldSchema{FieldName: "f3", FieldType: redis.SearchFieldTypeText}
val, err := client.FTCreate(ctx, "txt", &redis.FTCreateOptions{}, text1, text2, text3).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "txt")
res1, err := client.FTExplain(ctx, "txt", "@f3:f3_val @f2:f2_val @f1:f1_val").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res1).ToNot(BeEmpty())
// Test with UnstableResp3 false
Expect(func() {
res2, err := client2.FTExplain(ctx, "txt", "@f3:f3_val @f2:f2_val @f1:f1_val").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res2).ToNot(BeEmpty())
}).ShouldNot(Panic())
})
})

View File

@ -82,6 +82,7 @@ type FailoverOptions struct {
DisableIndentity bool
IdentitySuffix string
UnstableResp3 bool
}
func (opt *FailoverOptions) clientOptions() *Options {
@ -119,6 +120,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
}
}
@ -156,6 +158,7 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
DisableIndentity: opt.DisableIndentity,
IdentitySuffix: opt.IdentitySuffix,
UnstableResp3: opt.UnstableResp3,
}
}

View File

@ -68,6 +68,7 @@ type UniversalOptions struct {
DisableIndentity bool
IdentitySuffix string
UnstableResp3 bool
}
// Cluster returns cluster options created from the universal options.
@ -160,6 +161,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
}
}
@ -203,6 +205,7 @@ func (o *UniversalOptions) Simple() *Options {
DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
}
}