support raw parsing for problematic Redis Search types

This commit is contained in:
ofekshenawa 2024-08-31 19:10:42 +03:00
parent 1b8afbebc8
commit 3408303c94
4 changed files with 77 additions and 11 deletions

View File

@ -40,7 +40,7 @@ type Cmder interface {
readTimeout() *time.Duration readTimeout() *time.Duration
readReply(rd *proto.Reader) error readReply(rd *proto.Reader) error
readRawReply(rd *proto.Reader) error
SetErr(error) SetErr(error)
Err() error Err() error
} }
@ -126,7 +126,7 @@ type baseCmd struct {
args []interface{} args []interface{}
err error err error
keyPos int8 keyPos int8
rawVal interface{}
_readTimeout *time.Duration _readTimeout *time.Duration
} }
@ -197,6 +197,11 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) {
cmd._readTimeout = &d cmd._readTimeout = &d
} }
func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) {
cmd.rawVal, err = rd.ReadReply()
return err
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Cmd struct { type Cmd struct {
@ -5550,3 +5555,8 @@ func (cmd *MonitorCmd) Stop() {
defer cmd.mu.Unlock() defer cmd.mu.Unlock()
cmd.status = monitorStatusStop cmd.status = monitorStatusStop
} }
type SearchCmd struct {
baseCmd
val interface{}
}

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net" "net" // TODO change to import only specific method (Not necesarry in compiling)
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -412,6 +412,20 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
return lastErr return lastErr
} }
func (c *baseClient) isProblematicMethodsOfSearchResp3(cmd Cmder) bool {
if c.opt.Protocol != 3 {
return false
}
switch cmd.(type) {
case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd:
fmt.Println("Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance")
return true
default:
return false
}
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
if attempt > 0 { if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
@ -427,8 +441,11 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
atomic.StoreUint32(&retryTimeout, 1) atomic.StoreUint32(&retryTimeout, 1)
return err return err
} }
readReplyFunc := cmd.readReply
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil { if c.isProblematicMethodsOfSearchResp3(cmd) {
readReplyFunc = cmd.readRawReply
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil {
if cmd.readTimeout() == nil { if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1) atomic.StoreUint32(&retryTimeout, 1)
} else { } else {

View File

@ -638,6 +638,14 @@ func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) {
return cmd.val, cmd.err return cmd.val, cmd.err
} }
func (cmd *AggregateCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *AggregateCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *AggregateCmd) String() string { func (cmd *AggregateCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
@ -1337,6 +1345,13 @@ func (cmd *FTInfoCmd) Val() FTInfoResult {
return cmd.val return cmd.val
} }
func (cmd *FTInfoCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTInfoCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) { func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen() n, err := rd.ReadMapLen()
if err != nil { if err != nil {
@ -1447,6 +1462,14 @@ func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult {
return cmd.val return cmd.val
} }
func (cmd *FTSpellCheckCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTSpellCheckCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) { func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice() data, err := rd.ReadSlice()
if err != nil { if err != nil {
@ -1628,6 +1651,14 @@ func (cmd *FTSearchCmd) Val() FTSearchResult {
return cmd.val return cmd.val
} }
func (cmd *FTSearchCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTSearchCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) { func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice() data, err := rd.ReadSlice()
if err != nil { if err != nil {
@ -1904,6 +1935,14 @@ func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) {
return cmd.val, cmd.err return cmd.val, cmd.err
} }
func (cmd *FTSynDumpCmd) RawVal() interface{} {
return cmd.rawVal
}
func (cmd *FTSynDumpCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error { func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error {
termSynonymPairs, err := rd.ReadSlice() termSynonymPairs, err := rd.ReadSlice()
if err != nil { if err != nil {

View File

@ -22,12 +22,12 @@ func WaitForIndexing(c *redis.Client, index string) {
} }
} }
var _ = Describe("RediSearch commands", Label("search"), func() { var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
ctx := context.TODO() ctx := context.TODO()
var client *redis.Client var client *redis.Client
BeforeEach(func() { BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 2}) client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
}) })