Merge branch 'master' into search-support

This commit is contained in:
Chayim 2023-12-17 15:37:49 +02:00 committed by GitHub
commit a5fad36ae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 288 additions and 51 deletions

View File

@ -28,7 +28,7 @@ jobs:
steps:
- name: Set up ${{ matrix.go-version }}
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}

View File

@ -29,7 +29,7 @@ jobs:
steps:
- name: Set up ${{ matrix.go-version }}
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}

View File

@ -8,7 +8,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.34.0
uses: rojopolis/spellcheck-github-actions@0.35.0
with:
config_path: .github/spellcheck-settings.yml
task_name: Markdown

View File

@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v8
- uses: actions/stale@v9
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
stale-issue-message: 'This issue is marked stale. It will be closed in 30 days if it is not updated.'

View File

@ -152,7 +152,7 @@ import (
var ctx = context.Background()
func ExampleClient() {
url := "redis://localhost:6379?password=hello&protocol=3"
url := "redis://user:password@localhost:6379/0?protocol=3"
opts, err := redis.ParseURL(url)
if err != nil {
panic(err)

View File

@ -59,14 +59,6 @@ func NewClusterClientStub(resp []byte) *ClientStub {
},
})
// init command.
tmpClient := NewClient(&Options{Addr: ":6379"})
cmdsInfo, err := tmpClient.Command(ctx).Result()
_ = tmpClient.Close()
client.cmdsInfoCache = newCmdsInfoCache(func(_ context.Context) (map[string]*CommandInfo, error) {
return cmdsInfo, err
})
stub.Cmdable = client
return stub
}

View File

@ -1,6 +1,8 @@
package redis
import "context"
import (
"context"
)
type BitMapCmdable interface {
GetBit(ctx context.Context, key string, offset int64) *IntCmd
@ -127,3 +129,21 @@ func (c cmdable) BitField(ctx context.Context, key string, values ...interface{}
_ = c(ctx, cmd)
return cmd
}
// BitFieldRO - Read-only variant of the BITFIELD command.
// It is like the original BITFIELD but only accepts GET subcommand and can safely be used in read-only replicas.
// - BitFieldRO(ctx, key, "<Encoding0>", "<Offset0>", "<Encoding1>","<Offset1>")
func (c cmdable) BitFieldRO(ctx context.Context, key string, values ...interface{}) *IntSliceCmd {
args := make([]interface{}, 2, 2+len(values))
args[0] = "BITFIELD_RO"
args[1] = key
if len(values)%2 != 0 {
panic("BitFieldRO: invalid number of arguments, must be even")
}
for i := 0; i < len(values); i += 2 {
args = append(args, "GET", values[i], values[i+1])
}
cmd := NewIntSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

View File

@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/redis/go-redis/v9/internal"
@ -17,10 +18,22 @@ import (
)
type Cmder interface {
// command name.
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
Name() string
// full command name.
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster info".
FullName() string
// all args of the command.
// e.g. "set k v ex 10" -> "[set k v ex 10]".
Args() []interface{}
// format request and response string.
// e.g. "set k v ex 10" -> "set k v ex 10: OK", "get k" -> "get k: v".
String() string
stringArg(int) string
firstKeyPos() int8
SetFirstKeyPos(int8)
@ -62,7 +75,7 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
return wr.WriteArgs(cmd.Args())
}
func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
func cmdFirstKeyPos(cmd Cmder) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}
@ -82,10 +95,6 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
return 2
}
}
if info != nil {
return int(info.FirstKeyPos)
}
return 1
}
@ -5381,3 +5390,85 @@ func (cmd *InfoCmd) Item(section, key string) string {
return cmd.val[section][key]
}
}
type MonitorStatus int
const (
monitorStatusIdle MonitorStatus = iota
monitorStatusStart
monitorStatusStop
)
type MonitorCmd struct {
baseCmd
ch chan string
status MonitorStatus
mu sync.Mutex
}
func newMonitorCmd(ctx context.Context, ch chan string) *MonitorCmd {
return &MonitorCmd{
baseCmd: baseCmd{
ctx: ctx,
args: []interface{}{"monitor"},
},
ch: ch,
status: monitorStatusIdle,
mu: sync.Mutex{},
}
}
func (cmd *MonitorCmd) String() string {
return cmdString(cmd, nil)
}
func (cmd *MonitorCmd) readReply(rd *proto.Reader) error {
ctx, cancel := context.WithCancel(cmd.ctx)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
err := cmd.readMonitor(rd, cancel)
if err != nil {
cmd.err = err
return
}
}
}
}(ctx)
return nil
}
func (cmd *MonitorCmd) readMonitor(rd *proto.Reader, cancel context.CancelFunc) error {
for {
cmd.mu.Lock()
st := cmd.status
cmd.mu.Unlock()
if pk, _ := rd.Peek(1); len(pk) != 0 && st == monitorStatusStart {
line, err := rd.ReadString()
if err != nil {
return err
}
cmd.ch <- line
}
if st == monitorStatusStop {
cancel()
break
}
}
return nil
}
func (cmd *MonitorCmd) Start() {
cmd.mu.Lock()
defer cmd.mu.Unlock()
cmd.status = monitorStatusStart
}
func (cmd *MonitorCmd) Stop() {
cmd.mu.Lock()
defer cmd.mu.Unlock()
cmd.status = monitorStatusStop
}

View File

@ -204,7 +204,6 @@ type Cmdable interface {
SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
Time(ctx context.Context) *TimeCmd
DebugObject(ctx context.Context, key string) *StringCmd
MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd
ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
@ -701,3 +700,20 @@ func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *St
_ = c(ctx, cmd)
return cmd
}
/*
Monitor - represents a Redis MONITOR command, allowing the user to capture
and process all commands sent to a Redis server. This mimics the behavior of
MONITOR in the redis-cli.
Notes:
- Using MONITOR blocks the connection to the server for itself. It needs a dedicated connection
- The user should create a channel of type string
- This runs concurrently in the background. Trigger via the Start and Stop functions
See further: Redis MONITOR command: https://redis.io/commands/monitor
*/
func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd {
cmd := newMonitorCmd(ctx, ch)
_ = c(ctx, cmd)
return cmd
}

View File

@ -1279,6 +1279,20 @@ var _ = Describe("Commands", func() {
Expect(nn).To(Equal([]int64{0, 4}))
})
It("should BitFieldRO", func() {
nn, err := client.BitField(ctx, "mykey", "SET", "u8", 8, 255).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nn).To(Equal([]int64{0}))
nn, err = client.BitFieldRO(ctx, "mykey", "u8", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nn).To(Equal([]int64{0}))
nn, err = client.BitFieldRO(ctx, "mykey", "u8", 0, "u4", 8, "u4", 12, "u4", 13).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nn).To(Equal([]int64{0, 15, 15, 14}))
})
It("should Decr", func() {
set := client.Set(ctx, "key", "10", 0)
Expect(set.Err()).NotTo(HaveOccurred())
@ -3708,28 +3722,28 @@ var _ = Describe("Commands", func() {
It("should ZAdd bytes", func() {
added, err := client.ZAdd(ctx, "zset", redis.Z{
Score: 1,
Member: []byte("one"),
Member: "one",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(added).To(Equal(int64(1)))
added, err = client.ZAdd(ctx, "zset", redis.Z{
Score: 1,
Member: []byte("uno"),
Member: "uno",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(added).To(Equal(int64(1)))
added, err = client.ZAdd(ctx, "zset", redis.Z{
Score: 2,
Member: []byte("two"),
Member: "two",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(added).To(Equal(int64(1)))
added, err = client.ZAdd(ctx, "zset", redis.Z{
Score: 3,
Member: []byte("two"),
Member: "two",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(added).To(Equal(int64(0)))

View File

@ -112,7 +112,7 @@ var _ = Describe("WriteArg", func() {
})
args := map[any]string{
"hello": "$1\r\nhello\r\n",
"hello": "$5\r\nhello\r\n",
int(10): "$2\r\n10\r\n",
util.ToPtr(int(10)): "$2\r\n10\r\n",
int8(10): "$2\r\n10\r\n",
@ -133,8 +133,8 @@ var _ = Describe("WriteArg", func() {
util.ToPtr(uint32(10)): "$2\r\n10\r\n",
uint64(10): "$2\r\n10\r\n",
util.ToPtr(uint64(10)): "$2\r\n10\r\n",
float32(10.3): "$4\r\n10.3\r\n",
util.ToPtr(float32(10.3)): "$4\r\n10.3\r\n",
float32(10.3): "$18\r\n10.300000190734863\r\n",
util.ToPtr(float32(10.3)): "$18\r\n10.300000190734863\r\n",
float64(10.3): "$4\r\n10.3\r\n",
util.ToPtr(float64(10.3)): "$4\r\n10.3\r\n",
bool(true): "$1\r\n1\r\n",
@ -144,6 +144,7 @@ var _ = Describe("WriteArg", func() {
}
for arg, expect := range args {
arg, expect := arg, expect
It(fmt.Sprintf("should write arg of type %T", arg), func() {
err := wr.WriteArg(arg)
Expect(err).NotTo(HaveOccurred())

View File

@ -41,6 +41,11 @@ var (
redisAddr = ":" + redisPort
)
var (
rediStackPort = "6379"
rediStackAddr = ":" + rediStackPort
)
var (
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}

48
monitor_test.go Normal file
View File

@ -0,0 +1,48 @@
package redis_test
import (
"context"
"time"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)
var _ = Describe("Monitor command", Label("monitor"), func() {
ctx := context.TODO()
var client *redis.Client
BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should monitor", Label("monitor"), func() {
ress := make(chan string)
client1 := redis.NewClient(&redis.Options{Addr: rediStackAddr})
mn := client1.Monitor(ctx, ress)
mn.Start()
// Wait for the Redis server to be in monitoring mode.
time.Sleep(100 * time.Millisecond)
client.Set(ctx, "foo", "bar", 0)
client.Set(ctx, "bar", "baz", 0)
client.Set(ctx, "bap", 8, 0)
client.Get(ctx, "bap")
lst := []string{}
for i := 0; i < 5; i++ {
s := <-ress
lst = append(lst, s)
}
mn.Stop()
Expect(lst[0]).To(ContainSubstring("OK"))
Expect(lst[1]).To(ContainSubstring(`"set" "foo" "bar"`))
Expect(lst[2]).To(ContainSubstring(`"set" "bar" "baz"`))
Expect(lst[3]).To(ContainSubstring(`"set" "bap" "8"`))
})
})

View File

@ -907,7 +907,6 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
}
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
cmdInfo := c.cmdInfo(ctx, cmd.Name())
slot := c.cmdSlot(ctx, cmd)
var node *clusterNode
var ask bool
@ -921,7 +920,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
if node == nil {
var err error
node, err = c.cmdNode(ctx, cmdInfo, slot)
node, err = c.cmdNode(ctx, cmd.Name(), slot)
if err != nil {
return err
}
@ -1783,8 +1782,7 @@ func (c *ClusterClient) cmdSlot(ctx context.Context, cmd Cmder) int {
return args[2].(int)
}
cmdInfo := c.cmdInfo(ctx, cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
return cmdSlot(cmd, cmdFirstKeyPos(cmd))
}
func cmdSlot(cmd Cmder, pos int) int {
@ -1797,7 +1795,7 @@ func cmdSlot(cmd Cmder, pos int) int {
func (c *ClusterClient) cmdNode(
ctx context.Context,
cmdInfo *CommandInfo,
cmdName string,
slot int,
) (*clusterNode, error) {
state, err := c.state.Get(ctx)
@ -1805,9 +1803,12 @@ func (c *ClusterClient) cmdNode(
return nil, err
}
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.ReadOnly {
cmdInfo := c.cmdInfo(ctx, cmdName)
if cmdInfo != nil && cmdInfo.ReadOnly {
return c.slotReadOnlyNode(state, slot)
}
}
return state.slotMasterNode(slot)
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
@ -40,12 +41,15 @@ type (
)
type hooksMixin struct {
hooksMu *sync.Mutex
slice []Hook
initial hooks
current hooks
}
func (hs *hooksMixin) initHooks(hooks hooks) {
hs.hooksMu = new(sync.Mutex)
hs.initial = hooks
hs.chain()
}
@ -116,6 +120,9 @@ func (hs *hooksMixin) AddHook(hook Hook) {
func (hs *hooksMixin) chain() {
hs.initial.setDefaults()
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
hs.current.dial = hs.initial.dial
hs.current.process = hs.initial.process
hs.current.pipeline = hs.initial.pipeline
@ -138,9 +145,13 @@ func (hs *hooksMixin) chain() {
}
func (hs *hooksMixin) clone() hooksMixin {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
clone := *hs
l := len(clone.slice)
clone.slice = clone.slice[:l:l]
clone.hooksMu = new(sync.Mutex)
return clone
}
@ -165,6 +176,8 @@ func (hs *hooksMixin) withProcessPipelineHook(
}
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr)
}

View File

@ -579,3 +579,53 @@ var _ = Describe("Hook", func() {
Expect(cmd.Val()).To(Equal("Script and hook"))
})
})
var _ = Describe("Hook with MinIdleConns", func() {
var client *redis.Client
BeforeEach(func() {
options := redisOptions()
options.MinIdleConns = 1
client = redis.NewClient(options)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})
AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
It("fifo", func() {
var res []string
client.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
res = append(res, "hook-1-process-start")
err := hook(ctx, cmd)
res = append(res, "hook-1-process-end")
return err
}
},
})
client.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
res = append(res, "hook-2-process-start")
err := hook(ctx, cmd)
res = append(res, "hook-2-process-end")
return err
}
},
})
err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]string{
"hook-1-process-start",
"hook-2-process-start",
"hook-2-process-end",
"hook-1-process-end",
}))
})
})

18
ring.go
View File

@ -678,21 +678,8 @@ func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
return nil, firstErr
}
func (c *Ring) cmdInfo(ctx context.Context, name string) *CommandInfo {
cmdsInfo, err := c.cmdsInfoCache.Get(ctx)
if err != nil {
return nil
}
info := cmdsInfo[name]
if info == nil {
internal.Logger.Printf(ctx, "info for cmd=%s not found", name)
}
return info
}
func (c *Ring) cmdShard(ctx context.Context, cmd Cmder) (*ringShard, error) {
cmdInfo := c.cmdInfo(ctx, cmd.Name())
pos := cmdFirstKeyPos(cmd, cmdInfo)
pos := cmdFirstKeyPos(cmd)
if pos == 0 {
return c.sharding.Random()
}
@ -760,8 +747,7 @@ func (c *Ring) generalProcessPipeline(
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(ctx, cmd.Name())
hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
hash := cmd.stringArg(cmdFirstKeyPos(cmd))
if hash != "" {
hash = c.sharding.Hash(hash)
}

View File

@ -727,7 +727,7 @@ func (c cmdable) ZScan(ctx context.Context, key string, cursor uint64, match str
// Z represents sorted set member.
type Z struct {
Score float64
Member interface{}
Member string
}
// ZWithKey represents sorted set member including the name of the key where it was popped.