diff --git a/CHANGELOG.md b/CHANGELOG.md index 55b3c3a5..4ad70c8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,14 @@ ## v8 (unreleased) +- Documentation at https://redis.uptrace.dev/ + - All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not using `context.Context` yet, the simplest option is to define global package variable `var ctx = context.TODO()` and use it when `ctx` is required. +- Added `redis.NewFailoverClusterClient` that supports routing read-only commands to a slave node. + - Added `redisext.OpenTemetryHook` that adds [Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/). diff --git a/bench_test.go b/bench_test.go index ed69b31a..5e532ea8 100644 --- a/bench_test.go +++ b/bench_test.go @@ -305,31 +305,6 @@ func BenchmarkClusterSetString(b *testing.B) { }) } -func BenchmarkClusterReloadState(b *testing.B) { - if testing.Short() { - b.Skip("skipping in short mode") - } - - ctx := context.Background() - cluster := newClusterScenario() - if err := startCluster(ctx, cluster); err != nil { - b.Fatal(err) - } - defer cluster.Close() - - client := cluster.newClusterClient(ctx, redisClusterOptions()) - defer client.Close() - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - err := client.ReloadState(ctx) - if err != nil { - b.Fatal(err) - } - } -} - var clusterSink *redis.ClusterClient func BenchmarkClusterWithContext(b *testing.B) { diff --git a/cluster.go b/cluster.go index e6fc2a3e..b6c4daac 100644 --- a/cluster.go +++ b/cluster.go @@ -727,9 +727,8 @@ func (c *ClusterClient) Options() *ClusterOptions { // ReloadState reloads cluster state. If available it calls ClusterSlots func // to get cluster slots information. -func (c *ClusterClient) ReloadState(ctx context.Context) error { - _, err := c.state.Reload(ctx) - return err +func (c *ClusterClient) ReloadState(ctx context.Context) { + c.state.LazyReload(ctx) } // Close closes the cluster client, releasing any open resources. @@ -1638,7 +1637,7 @@ func (c *ClusterClient) cmdNode( return nil, err } - if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { + if (c.opt.RouteByLatency || c.opt.RouteRandomly) && cmdInfo != nil && cmdInfo.ReadOnly { return c.slotReadOnlyNode(state, slot) } return state.slotMasterNode(slot) diff --git a/example_test.go b/example_test.go index e24698ea..638baa45 100644 --- a/example_test.go +++ b/example_test.go @@ -114,10 +114,7 @@ func ExampleNewClusterClient_manualSetup() { // ReloadState reloads cluster state. It calls ClusterSlots func // to get cluster slots information. - err := rdb.ReloadState(ctx) - if err != nil { - panic(err) - } + rdb.ReloadState(ctx) } func ExampleNewRing() { diff --git a/sentinel.go b/sentinel.go index b4f82448..f58970f8 100644 --- a/sentinel.go +++ b/sentinel.go @@ -26,6 +26,13 @@ type FailoverOptions struct { // Sentinel password from "requirepass " (if enabled) in Sentinel configuration SentinelPassword string + // Allows routing read-only commands to the closest master or slave node. + // This option only works with NewFailoverClusterClient. + RouteByLatency bool + // Allows routing read-only commands to the random master or slave node. + // This option only works with NewFailoverClusterClient. + RouteRandomly bool + // Route all commands to slave read-only nodes. SlaveOnly bool @@ -123,7 +130,11 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { Username: opt.Username, Password: opt.Password, - MaxRedirects: opt.MaxRetries, + MaxRedirects: opt.MaxRetries, + + RouteByLatency: opt.RouteByLatency, + RouteRandomly: opt.RouteRandomly, + MinRetryBackoff: opt.MinRetryBackoff, MaxRetryBackoff: opt.MaxRetryBackoff, @@ -146,6 +157,13 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions { // for automatic failover. It's safe for concurrent use by multiple // goroutines. func NewFailoverClient(failoverOpt *FailoverOptions) *Client { + if failoverOpt.RouteByLatency { + panic("to route commands by latency, use NewFailoverClusterClient") + } + if failoverOpt.RouteRandomly { + panic("to route commands randomly, use NewFailoverClusterClient") + } + sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) copy(sentinelAddrs, failoverOpt.SentinelAddrs) @@ -378,7 +396,9 @@ type sentinelFailover struct { opt *FailoverOptions sentinelAddrs []string - onFailover func(ctx context.Context, addr string) + + onFailover func(ctx context.Context, addr string) + onUpdate func(ctx context.Context) mu sync.RWMutex _masterAddr string @@ -409,7 +429,7 @@ func (c *sentinelFailover) closeSentinel() error { } func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) { - addresses, err := c.slaveAddresses(ctx) + addresses, err := c.slaveAddrs(ctx) if err != nil { return "", err } @@ -464,7 +484,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) { +func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) { c.mu.RLock() sentinel := c.sentinel c.mu.RUnlock() @@ -502,7 +522,7 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] c.setSentinel(ctx, sentinel) - addrs := parseSlaveAddresses(slaves) + addrs := parseSlaveAddrs(slaves) return addrs, nil } @@ -526,12 +546,11 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel c.opt.MasterName, err) return []string{} } - - return parseSlaveAddresses(addrs) + return parseSlaveAddrs(addrs) } -func parseSlaveAddresses(addrs []interface{}) []string { - nodes := []string{} +func parseSlaveAddrs(addrs []interface{}) []string { + nodes := make([]string, 0, len(addrs)) for _, node := range addrs { ip := "" @@ -551,12 +570,14 @@ func parseSlaveAddresses(addrs []interface{}) []string { } lastkey = key.(string) } + for _, flag := range flags { switch flag { case "s_down", "o_down", "disconnected": isDown = true } } + if !isDown { nodes = append(nodes, net.JoinHostPort(ip, port)) } @@ -584,7 +605,9 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) { internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q", c.opt.MasterName, addr) - go c.onFailover(ctx, addr) + if c.onFailover != nil { + c.onFailover(ctx, addr) + } } func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) { @@ -594,7 +617,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl c.sentinel = sentinel c.discoverSentinels(ctx) - c.pubsub = sentinel.Subscribe(ctx, "+switch-master") + c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done") go c.listen(c.pubsub) } @@ -621,13 +644,13 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) { } func (c *sentinelFailover) listen(pubsub *PubSub) { - ch := pubsub.Channel() - for { - msg, ok := <-ch - if !ok { - break - } + ctx := context.TODO() + if c.onUpdate != nil { + c.onUpdate(ctx) + } + ch := pubsub.Channel() + for msg := range ch { if msg.Channel == "+switch-master" { parts := strings.Split(msg.Payload, " ") if parts[0] != c.opt.MasterName { @@ -637,6 +660,10 @@ func (c *sentinelFailover) listen(pubsub *PubSub) { addr := net.JoinHostPort(parts[3], parts[4]) c.trySwitchMaster(pubsub.getContext(), addr) } + + if c.onUpdate != nil { + c.onUpdate(ctx) + } } } @@ -651,6 +678,8 @@ func contains(slice []string, str string) bool { //------------------------------------------------------------------------------ +// NewFailoverClusterClient returns a client that supports routing read-only commands +// to a slave node. func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) copy(sentinelAddrs, failoverOpt.SentinelAddrs) @@ -671,7 +700,7 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { Addr: masterAddr, }} - slaveAddrs, err := failover.slaveAddresses(ctx) + slaveAddrs, err := failover.slaveAddrs(ctx) if err != nil { return nil, err } @@ -693,8 +722,8 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { } c := NewClusterClient(opt) - failover.onFailover = func(ctx context.Context, addr string) { - _ = c.ReloadState(ctx) + failover.onUpdate = func(ctx context.Context) { + c.ReloadState(ctx) } return c diff --git a/sentinel_test.go b/sentinel_test.go index 2645c438..46263a86 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -113,6 +113,8 @@ var _ = Describe("NewFailoverClusterClient", func() { client = redis.NewFailoverClusterClient(&redis.FailoverOptions{ MasterName: sentinelName, SentinelAddrs: sentinelAddrs, + + RouteRandomly: true, }) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) @@ -152,10 +154,12 @@ var _ = Describe("NewFailoverClusterClient", func() { err := client.Set(ctx, "foo", "master", 0).Err() Expect(err).NotTo(HaveOccurred()) - // Verify. - val, err := client.Get(ctx, "foo").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("master")) + for i := 0; i < 100; i++ { + // Verify. + val, err := client.Get(ctx, "foo").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("master")) + } // Create subscription. ch := client.Subscribe(ctx, "foo").Channel()