Compare commits

...

8 Commits

Author SHA1 Message Date
Monkey 9a24a888b3
Merge e9158aaee5 into 930d904205 2024-11-20 14:05:25 +08:00
ofekshenawa 930d904205
Add guidance on unstable RESP3 support for RediSearch commands to README (#3177)
* Add UnstableResp3 to docs

* Add RawVal and RawResult to wordlist

* Explain more about SetVal

* Add UnstableResp to wordlist
2024-11-13 13:20:59 +02:00
ofekshenawa 8b1073d2d6
Support Probabilistic commands with RESP 2 protocol (#3176)
* Support bloom resp 2

* Support Resp2 for BF.Info

* simplify BFInfoCmd field assignment using map-based key-to-field references
2024-11-13 11:15:19 +02:00
ofekshenawa d1b4eaed41
Support TimeSeries commands with RESP 2 protocol (#3184)
* Support Timeseries resp 2

* Change to resp 2

* Support Resp2 for TimeSeries commands
2024-11-13 10:27:00 +02:00
andy-stark-redis 80c9f5bb77
DOC-4345 added JSON samples for home page (#3183) 2024-11-06 17:25:46 +02:00
andy-stark-redis 1ed936eb09
DOC-4232 stream code examples (#3128)
* DOC-4232 added first stream example

* DOC-4232 examples up to xadd_7

* DOC-4232 examples up to xread

* DOC-4232 examples up to xclaim

* DOC-4232 added remaining examples

* DOC-4232 more fixes

* DOC-4232 fix for test fail on CI build

---------

Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
2024-10-23 17:47:25 +03:00
monkey92t e9158aaee5 add Broadcasting listen and read push
Signed-off-by: monkey92t <golang@88.com>
2023-04-24 22:16:43 +08:00
monkey92t 0761b27368 feat: cache init
Signed-off-by: monkey92t <golang@88.com>
2023-04-05 23:46:58 +08:00
9 changed files with 3606 additions and 1805 deletions

View File

@ -54,6 +54,7 @@ stunnel
SynDump
TCP
TLS
UnstableResp
uri
URI
url
@ -62,3 +63,5 @@ RedisStack
RedisGears
RedisTimeseries
RediSearch
RawResult
RawVal

View File

@ -186,6 +186,21 @@ rdb := redis.NewClient(&redis.Options{
#### Unstable RESP3 Structures for RediSearch Commands
When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes.
To enable unstable RESP3, set the option in your client configuration:
```go
redis.NewClient(&redis.Options{
UnstableResp3: true,
})
```
**Note:** When UnstableResp3 mode is enabled, it's necessary to use RawResult() and RawVal() to retrieve a raw data.
Since, raw response is the only option for unstable search commands Val() and Result() calls wouldn't have any affect on them:
```go
res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawResult()
val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{}).RawVal()
```
## Contributing
Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library!

263
cache.go Normal file
View File

@ -0,0 +1,263 @@
package redis
import (
"context"
"fmt"
"net"
"strconv"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
type Cache interface{}
type cache struct {
client *Client
// cluster? sentinel?
conn *Conn
prefix []string
closed int32 // atomic
}
func newCache() Cache {
// ?
return &cache{}
}
// ------------------------------------------------------------------------------------------
// extension method
func (c *Conn) readReply(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
return c.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error {
return conn.WithReader(ctx, timeout, fn)
})
}
// ------------------------------------------------------------------------------------------
// Client-side caching command.
type trackingArgs struct {
redirect int
prefixes []string
broadcast bool
optIn bool
optOut bool
noLoop bool
}
func (c *cache) clientTracking(ctx context.Context, t *trackingArgs) *StringCmd {
args := make([]any, 0, 7+len(t.prefixes))
args = append(args, "CLIENT", "TRACKING", "ON")
if t.redirect > 0 {
args = append(args, "REDIRECT", t.redirect)
}
if len(t.prefixes) > 0 {
for _, prefix := range t.prefixes {
args = append(args, "PREFIX", prefix)
}
}
if t.optIn {
args = append(args, "OPTIN")
}
if t.optOut {
args = append(args, "OPTOUT")
}
if t.noLoop {
args = append(args, "NOLOOP")
}
cmd := NewStringCmd(ctx, args...)
_ = c.conn.Process(ctx, cmd)
return cmd
}
func (c *cache) trackingClose(ctx context.Context) error {
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "TRACKING", "OFF"))
}
func (c *cache) cachingYes(ctx context.Context) error {
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "YES"))
}
func (c *cache) cachingNo(ctx context.Context) error {
return c.conn.Process(ctx, NewStringCmd(ctx, "CLIENT", "CACHING", "NO"))
}
// ------------------------------------------------------------------------------------
// readInvalidate To read the expired message push from redis-server,
// we only read for invalidate messages, and consider any other data that is read as an error.
func (c *cache) readInvalidate(rd *proto.Reader) ([]string, error) {
line, err := rd.ReadLine()
if err != nil {
return nil, err
}
if line[0] != proto.RespPush {
return nil, fmt.Errorf("invalid data-%s", string(line))
}
n, err := strconv.Atoi(string(line[1:]))
if err != nil {
return nil, err
}
if n != 2 {
return nil, fmt.Errorf("got %d elements in the map, wanted %d", n, 2)
}
// read `invalidate`
s, err := rd.ReadString()
if err != nil {
return nil, err
}
if s != "invalidate" {
return nil, fmt.Errorf("not a client-side caching push message, data-%s", s)
}
n, err = rd.ReadArrayLen()
if err != nil {
return nil, err
}
keys := make([]string, 0, n)
for i := 0; i < n; i++ {
key, err := rd.ReadString()
if err != nil {
return nil, err
}
keys = append(keys, key)
}
return keys, nil
}
// ------------------------------------- Broadcasting -------------------------------------
func (c *cache) listen(timeout time.Duration) {
ctx := context.Background()
defer func() {
if err := recover(); err != nil {
internal.Logger.Printf(ctx, "redis cache: panic - %v", err)
}
}()
if timeout == 0 {
timeout = 30 * time.Second
}
internal.Logger.Printf(ctx, "redis cache: listen working, read timeout-%d second", int(timeout/time.Second))
// state, 0-normal, 1-need init track
const (
normal = 0
bad = 1
)
var state = normal
for {
if atomic.LoadInt32(&c.closed) == 1 {
_ = c.conn.Close()
internal.Logger.Printf(ctx, "redis cache: close, quit listen")
return
}
if state == bad {
internal.Logger.Printf(ctx, "redis cache: state bad")
if err := c.initTrack(ctx); err != nil {
internal.Logger.Printf(ctx, "redis cache: listen init track error-%s", err.Error())
time.Sleep(1 * time.Second)
continue
}
}
if err := c.conn.Ping(ctx).Err(); err != nil {
internal.Logger.Printf(ctx, "redis cache: listen ping error-%s", err.Error())
state = bad
continue
}
state = normal
var keys []string
err := c.conn.withConn(ctx, func(ctx context.Context, conn *pool.Conn) error {
return conn.WithReader(ctx, timeout, func(rd *proto.Reader) (err error) {
keys, err = c.readInvalidate(rd)
if err == nil {
return nil
}
// The timeout error is considered normal, and it is triggered when we fail
// to receive a notification. We handle it as nil.
// We cannot return the timeout error, as go-redis would consider it a network
// problem and close the network connection.
if isNetTimeout(err) {
err = nil
return err
}
// We only listen for redis-push notifications, so under normal circumstances,
// we should not receive any redis-error notifications.
// If we do, we need to handle them as errors; otherwise,
// go-redis may consider redis errors as normal occurrences.
if isRedisError(err) {
err = fmt.Errorf("redis cache: unexpected response redis-error-msg-%s", err.Error())
}
return err
})
})
// under normal circumstances, we should not receive any errors, including redis errors.
if err != nil {
state = bad
internal.Logger.Printf(ctx, "redis cache: read push data error-%s", err.Error())
continue
}
// it's possible that we may not receive any notifications for keys.
if len(keys) > 0 {
// handle keys
}
}
}
func (c *cache) initTrack(ctx context.Context) error {
internal.Logger.Printf(ctx, "redis cache: init track")
if c.conn != nil {
_ = c.conn.Close()
}
c.conn = c.client.Conn()
args := make([]any, 0, 3+2*len(c.prefix)+1)
args = append(args, "CLIENT", "TRACKING", "ON")
for _, prefix := range c.prefix {
args = append(args, "PREFIX", prefix)
}
args = append(args, "BCAST")
cmd := NewStringCmd(ctx, args...)
if err := c.conn.Process(ctx, cmd); err != nil {
_ = c.conn.Close()
return err
}
return nil
}
// isNetTimeout check err == net timeout
func isNetTimeout(err error) bool {
if err == nil {
return false
}
netErr, ok := err.(net.Error)
if !ok {
return false
}
return netErr.Timeout()
}

View File

@ -1403,27 +1403,63 @@ func (cmd *MapStringSliceInterfaceCmd) Val() map[string][]interface{} {
}
func (cmd *MapStringSliceInterfaceCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
cmd.val = make(map[string][]interface{}, n)
for i := 0; i < n; i++ {
k, err := rd.ReadString()
cmd.val = make(map[string][]interface{})
if readType == proto.RespMap {
n, err := rd.ReadMapLen()
if err != nil {
return err
}
nn, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
for i := 0; i < n; i++ {
k, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[k][j] = value
nn, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val[k] = make([]interface{}, nn)
for j := 0; j < nn; j++ {
value, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[k][j] = value
}
}
} else if readType == proto.RespArray {
// RESP2 response
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
// Each entry in this array is itself an array with key details
itemLen, err := rd.ReadArrayLen()
if err != nil {
return err
}
key, err := rd.ReadString()
if err != nil {
return err
}
cmd.val[key] = make([]interface{}, 0, itemLen-1)
for j := 1; j < itemLen; j++ {
// Read the inner array for timestamp-value pairs
data, err := rd.ReadReply()
if err != nil {
return err
}
cmd.val[key] = append(cmd.val[key], data)
}
}
}

View File

@ -0,0 +1,199 @@
// EXAMPLE: go_home_json
// HIDE_START
package example_commands_test
// HIDE_END
// STEP_START import
import (
"context"
"fmt"
"sort"
"github.com/redis/go-redis/v9"
)
// STEP_END
func ExampleClient_search_json() {
// STEP_START connect
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password docs
DB: 0, // use default DB
Protocol: 2,
})
// STEP_END
// REMOVE_START
rdb.Del(ctx, "user:1", "user:2", "user:3")
rdb.FTDropIndex(ctx, "idx:users")
// REMOVE_END
// STEP_START create_data
user1 := map[string]interface{}{
"name": "Paul John",
"email": "paul.john@example.com",
"age": 42,
"city": "London",
}
user2 := map[string]interface{}{
"name": "Eden Zamir",
"email": "eden.zamir@example.com",
"age": 29,
"city": "Tel Aviv",
}
user3 := map[string]interface{}{
"name": "Paul Zamir",
"email": "paul.zamir@example.com",
"age": 35,
"city": "Tel Aviv",
}
// STEP_END
// STEP_START make_index
_, err := rdb.FTCreate(
ctx,
"idx:users",
// Options:
&redis.FTCreateOptions{
OnJSON: true,
Prefix: []interface{}{"user:"},
},
// Index schema fields:
&redis.FieldSchema{
FieldName: "$.name",
As: "name",
FieldType: redis.SearchFieldTypeText,
},
&redis.FieldSchema{
FieldName: "$.city",
As: "city",
FieldType: redis.SearchFieldTypeTag,
},
&redis.FieldSchema{
FieldName: "$.age",
As: "age",
FieldType: redis.SearchFieldTypeNumeric,
},
).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START add_data
_, err = rdb.JSONSet(ctx, "user:1", "$", user1).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:2", "$", user2).Result()
if err != nil {
panic(err)
}
_, err = rdb.JSONSet(ctx, "user:3", "$", user3).Result()
if err != nil {
panic(err)
}
// STEP_END
// STEP_START query1
findPaulResult, err := rdb.FTSearch(
ctx,
"idx:users",
"Paul @age:[30 40]",
).Result()
if err != nil {
panic(err)
}
fmt.Println(findPaulResult)
// >>> {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv"...
// STEP_END
// STEP_START query2
citiesResult, err := rdb.FTSearchWithArgs(
ctx,
"idx:users",
"Paul",
&redis.FTSearchOptions{
Return: []redis.FTSearchReturn{
{
FieldName: "$.city",
As: "city",
},
},
},
).Result()
if err != nil {
panic(err)
}
sort.Slice(citiesResult.Docs, func(i, j int) bool {
return citiesResult.Docs[i].Fields["city"] < citiesResult.Docs[j].Fields["city"]
})
for _, result := range citiesResult.Docs {
fmt.Println(result.Fields["city"])
}
// >>> London
// >>> Tel Aviv
// STEP_END
// STEP_START query3
aggOptions := redis.FTAggregateOptions{
GroupBy: []redis.FTAggregateGroupBy{
{
Fields: []interface{}{"@city"},
Reduce: []redis.FTAggregateReducer{
{
Reducer: redis.SearchCount,
As: "count",
},
},
},
},
}
aggResult, err := rdb.FTAggregateWithArgs(
ctx,
"idx:users",
"*",
&aggOptions,
).Result()
if err != nil {
panic(err)
}
sort.Slice(aggResult.Rows, func(i, j int) bool {
return aggResult.Rows[i].Fields["city"].(string) <
aggResult.Rows[j].Fields["city"].(string)
})
for _, row := range aggResult.Rows {
fmt.Printf("%v - %v\n",
row.Fields["city"], row.Fields["count"],
)
}
// >>> City: London - 1
// >>> City: Tel Aviv - 2
// STEP_END
// Output:
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"paul.zamir@example.com","name":"Paul Zamir"}]}]}
// London
// Tel Aviv
// London - 1
// Tel Aviv - 2
}

File diff suppressed because it is too large Load Diff

View File

@ -319,37 +319,69 @@ func (cmd *BFInfoCmd) Result() (BFInfo, error) {
}
func (cmd *BFInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
result := BFInfo{}
// Create a mapping from key names to pointers of struct fields
respMapping := map[string]*int64{
"Capacity": &result.Capacity,
"CAPACITY": &result.Capacity,
"Size": &result.Size,
"SIZE": &result.Size,
"Number of filters": &result.Filters,
"FILTERS": &result.Filters,
"Number of items inserted": &result.ItemsInserted,
"ITEMS": &result.ItemsInserted,
"Expansion rate": &result.ExpansionRate,
"EXPANSION": &result.ExpansionRate,
}
// Helper function to read and assign a value based on the key
readAndAssignValue := func(key string) error {
fieldPtr, exists := respMapping[key]
if !exists {
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
}
// Read the integer and assign to the field via pointer dereferencing
val, err := rd.ReadInt()
if err != nil {
return err
}
*fieldPtr = val
return nil
}
readType, err := rd.PeekReplyType()
if err != nil {
return err
}
var key string
var result BFInfo
for f := 0; f < n; f++ {
key, err = rd.ReadString()
if len(cmd.args) > 2 && readType == proto.RespArray {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
switch key {
case "Capacity":
result.Capacity, err = rd.ReadInt()
case "Size":
result.Size, err = rd.ReadInt()
case "Number of filters":
result.Filters, err = rd.ReadInt()
case "Number of items inserted":
result.ItemsInserted, err = rd.ReadInt()
case "Expansion rate":
result.ExpansionRate, err = rd.ReadInt()
default:
return fmt.Errorf("redis: BLOOM.INFO unexpected key %s", key)
if key, ok := cmd.args[2].(string); ok && n == 1 {
if err := readAndAssignValue(key); err != nil {
return err
}
} else {
return fmt.Errorf("redis: BLOOM.INFO invalid argument key type")
}
} else {
n, err := rd.ReadMapLen()
if err != nil {
return err
}
for i := 0; i < n; i++ {
key, err := rd.ReadString()
if err != nil {
return err
}
if err := readAndAssignValue(key); err != nil {
return err
}
}
}
cmd.val = result

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff