diff --git a/command.go b/command.go index 9ae97a95..7b1d6332 100644 --- a/command.go +++ b/command.go @@ -40,7 +40,7 @@ type Cmder interface { readTimeout() *time.Duration readReply(rd *proto.Reader) error - + readRawReply(rd *proto.Reader) error SetErr(error) Err() error } @@ -122,11 +122,11 @@ func cmdString(cmd Cmder, val interface{}) string { //------------------------------------------------------------------------------ type baseCmd struct { - ctx context.Context - args []interface{} - err error - keyPos int8 - + ctx context.Context + args []interface{} + err error + keyPos int8 + rawVal interface{} _readTimeout *time.Duration } @@ -197,6 +197,11 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) { cmd._readTimeout = &d } +func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) { + cmd.rawVal, err = rd.ReadReply() + return err +} + //------------------------------------------------------------------------------ type Cmd struct { @@ -5550,3 +5555,8 @@ func (cmd *MonitorCmd) Stop() { defer cmd.mu.Unlock() cmd.status = monitorStatusStop } + +type SearchCmd struct { + baseCmd + val interface{} +} diff --git a/redis.go b/redis.go index 527afb67..ba14f1fc 100644 --- a/redis.go +++ b/redis.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "net" + "net" // TODO change to import only specific method (Not necesarry in compiling) "sync" "sync/atomic" "time" @@ -412,6 +412,20 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error { return lastErr } +func (c *baseClient) isProblematicMethodsOfSearchResp3(cmd Cmder) bool { + if c.opt.Protocol != 3 { + return false + } + + switch cmd.(type) { + case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd: + fmt.Println("Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance") + return true + default: + return false + } +} + func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { @@ -427,8 +441,11 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool atomic.StoreUint32(&retryTimeout, 1) return err } - - if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil { + readReplyFunc := cmd.readReply + if c.isProblematicMethodsOfSearchResp3(cmd) { + readReplyFunc = cmd.readRawReply + } + if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil { if cmd.readTimeout() == nil { atomic.StoreUint32(&retryTimeout, 1) } else { diff --git a/search_commands.go b/search_commands.go index f5118c77..1a8a4cfe 100644 --- a/search_commands.go +++ b/search_commands.go @@ -638,6 +638,14 @@ func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) { return cmd.val, cmd.err } +func (cmd *AggregateCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *AggregateCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *AggregateCmd) String() string { return cmdString(cmd, cmd.val) } @@ -1337,6 +1345,13 @@ func (cmd *FTInfoCmd) Val() FTInfoResult { return cmd.val } +func (cmd *FTInfoCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTInfoCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) { n, err := rd.ReadMapLen() if err != nil { @@ -1447,6 +1462,14 @@ func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult { return cmd.val } +func (cmd *FTSpellCheckCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSpellCheckCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) { data, err := rd.ReadSlice() if err != nil { @@ -1628,6 +1651,14 @@ func (cmd *FTSearchCmd) Val() FTSearchResult { return cmd.val } +func (cmd *FTSearchCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSearchCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) { data, err := rd.ReadSlice() if err != nil { @@ -1904,6 +1935,14 @@ func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) { return cmd.val, cmd.err } +func (cmd *FTSynDumpCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSynDumpCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error { termSynonymPairs, err := rd.ReadSlice() if err != nil { diff --git a/search_test.go b/search_test.go index 0e1a473b..b326b0d7 100644 --- a/search_test.go +++ b/search_test.go @@ -22,12 +22,12 @@ func WaitForIndexing(c *redis.Client, index string) { } } -var _ = Describe("RediSearch commands", Label("search"), func() { +var _ = Describe("RediSearch commands Resp 2", Label("search"), func() { ctx := context.TODO() var client *redis.Client BeforeEach(func() { - client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 2}) + client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3}) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) })