Add Sentinel RouteBy

This commit is contained in:
Vladimir Mihailenco 2020-09-11 15:52:38 +03:00
parent afb0064872
commit cbce5dbfa2
6 changed files with 65 additions and 57 deletions

View File

@ -4,10 +4,14 @@
## v8 (unreleased) ## v8 (unreleased)
- Documentation at https://redis.uptrace.dev/
- All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not - All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not
using `context.Context` yet, the simplest option is to define global package variable using `context.Context` yet, the simplest option is to define global package variable
`var ctx = context.TODO()` and use it when `ctx` is required. `var ctx = context.TODO()` and use it when `ctx` is required.
- Added `redis.NewFailoverClusterClient` that supports routing read-only commands to a slave node.
- Added `redisext.OpenTemetryHook` that adds - Added `redisext.OpenTemetryHook` that adds
[Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/). [Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/).

View File

@ -305,31 +305,6 @@ func BenchmarkClusterSetString(b *testing.B) {
}) })
} }
func BenchmarkClusterReloadState(b *testing.B) {
if testing.Short() {
b.Skip("skipping in short mode")
}
ctx := context.Background()
cluster := newClusterScenario()
if err := startCluster(ctx, cluster); err != nil {
b.Fatal(err)
}
defer cluster.Close()
client := cluster.newClusterClient(ctx, redisClusterOptions())
defer client.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.ReloadState(ctx)
if err != nil {
b.Fatal(err)
}
}
}
var clusterSink *redis.ClusterClient var clusterSink *redis.ClusterClient
func BenchmarkClusterWithContext(b *testing.B) { func BenchmarkClusterWithContext(b *testing.B) {

View File

@ -727,9 +727,8 @@ func (c *ClusterClient) Options() *ClusterOptions {
// ReloadState reloads cluster state. If available it calls ClusterSlots func // ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information. // to get cluster slots information.
func (c *ClusterClient) ReloadState(ctx context.Context) error { func (c *ClusterClient) ReloadState(ctx context.Context) {
_, err := c.state.Reload(ctx) c.state.LazyReload(ctx)
return err
} }
// Close closes the cluster client, releasing any open resources. // Close closes the cluster client, releasing any open resources.
@ -1638,7 +1637,7 @@ func (c *ClusterClient) cmdNode(
return nil, err return nil, err
} }
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if (c.opt.RouteByLatency || c.opt.RouteRandomly) && cmdInfo != nil && cmdInfo.ReadOnly {
return c.slotReadOnlyNode(state, slot) return c.slotReadOnlyNode(state, slot)
} }
return state.slotMasterNode(slot) return state.slotMasterNode(slot)

View File

@ -114,10 +114,7 @@ func ExampleNewClusterClient_manualSetup() {
// ReloadState reloads cluster state. It calls ClusterSlots func // ReloadState reloads cluster state. It calls ClusterSlots func
// to get cluster slots information. // to get cluster slots information.
err := rdb.ReloadState(ctx) rdb.ReloadState(ctx)
if err != nil {
panic(err)
}
} }
func ExampleNewRing() { func ExampleNewRing() {

View File

@ -26,6 +26,13 @@ type FailoverOptions struct {
// Sentinel password from "requirepass <password>" (if enabled) in Sentinel configuration // Sentinel password from "requirepass <password>" (if enabled) in Sentinel configuration
SentinelPassword string SentinelPassword string
// Allows routing read-only commands to the closest master or slave node.
// This option only works with NewFailoverClusterClient.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
// This option only works with NewFailoverClusterClient.
RouteRandomly bool
// Route all commands to slave read-only nodes. // Route all commands to slave read-only nodes.
SlaveOnly bool SlaveOnly bool
@ -124,6 +131,10 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
Password: opt.Password, Password: opt.Password,
MaxRedirects: opt.MaxRetries, MaxRedirects: opt.MaxRetries,
RouteByLatency: opt.RouteByLatency,
RouteRandomly: opt.RouteRandomly,
MinRetryBackoff: opt.MinRetryBackoff, MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff, MaxRetryBackoff: opt.MaxRetryBackoff,
@ -146,6 +157,13 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
// for automatic failover. It's safe for concurrent use by multiple // for automatic failover. It's safe for concurrent use by multiple
// goroutines. // goroutines.
func NewFailoverClient(failoverOpt *FailoverOptions) *Client { func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
if failoverOpt.RouteByLatency {
panic("to route commands by latency, use NewFailoverClusterClient")
}
if failoverOpt.RouteRandomly {
panic("to route commands randomly, use NewFailoverClusterClient")
}
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
copy(sentinelAddrs, failoverOpt.SentinelAddrs) copy(sentinelAddrs, failoverOpt.SentinelAddrs)
@ -378,7 +396,9 @@ type sentinelFailover struct {
opt *FailoverOptions opt *FailoverOptions
sentinelAddrs []string sentinelAddrs []string
onFailover func(ctx context.Context, addr string) onFailover func(ctx context.Context, addr string)
onUpdate func(ctx context.Context)
mu sync.RWMutex mu sync.RWMutex
_masterAddr string _masterAddr string
@ -409,7 +429,7 @@ func (c *sentinelFailover) closeSentinel() error {
} }
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) { func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
addresses, err := c.slaveAddresses(ctx) addresses, err := c.slaveAddrs(ctx)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -464,7 +484,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
return "", errors.New("redis: all sentinels are unreachable") return "", errors.New("redis: all sentinels are unreachable")
} }
func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) { func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
c.mu.RLock() c.mu.RLock()
sentinel := c.sentinel sentinel := c.sentinel
c.mu.RUnlock() c.mu.RUnlock()
@ -502,7 +522,7 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error)
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel) c.setSentinel(ctx, sentinel)
addrs := parseSlaveAddresses(slaves) addrs := parseSlaveAddrs(slaves)
return addrs, nil return addrs, nil
} }
@ -526,12 +546,11 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel
c.opt.MasterName, err) c.opt.MasterName, err)
return []string{} return []string{}
} }
return parseSlaveAddrs(addrs)
return parseSlaveAddresses(addrs)
} }
func parseSlaveAddresses(addrs []interface{}) []string { func parseSlaveAddrs(addrs []interface{}) []string {
nodes := []string{} nodes := make([]string, 0, len(addrs))
for _, node := range addrs { for _, node := range addrs {
ip := "" ip := ""
@ -551,12 +570,14 @@ func parseSlaveAddresses(addrs []interface{}) []string {
} }
lastkey = key.(string) lastkey = key.(string)
} }
for _, flag := range flags { for _, flag := range flags {
switch flag { switch flag {
case "s_down", "o_down", "disconnected": case "s_down", "o_down", "disconnected":
isDown = true isDown = true
} }
} }
if !isDown { if !isDown {
nodes = append(nodes, net.JoinHostPort(ip, port)) nodes = append(nodes, net.JoinHostPort(ip, port))
} }
@ -584,7 +605,9 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q", internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
c.opt.MasterName, addr) c.opt.MasterName, addr)
go c.onFailover(ctx, addr) if c.onFailover != nil {
c.onFailover(ctx, addr)
}
} }
func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) { func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
@ -594,7 +617,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
c.sentinel = sentinel c.sentinel = sentinel
c.discoverSentinels(ctx) c.discoverSentinels(ctx)
c.pubsub = sentinel.Subscribe(ctx, "+switch-master") c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done")
go c.listen(c.pubsub) go c.listen(c.pubsub)
} }
@ -621,13 +644,13 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
} }
func (c *sentinelFailover) listen(pubsub *PubSub) { func (c *sentinelFailover) listen(pubsub *PubSub) {
ch := pubsub.Channel() ctx := context.TODO()
for { if c.onUpdate != nil {
msg, ok := <-ch c.onUpdate(ctx)
if !ok {
break
} }
ch := pubsub.Channel()
for msg := range ch {
if msg.Channel == "+switch-master" { if msg.Channel == "+switch-master" {
parts := strings.Split(msg.Payload, " ") parts := strings.Split(msg.Payload, " ")
if parts[0] != c.opt.MasterName { if parts[0] != c.opt.MasterName {
@ -637,6 +660,10 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
addr := net.JoinHostPort(parts[3], parts[4]) addr := net.JoinHostPort(parts[3], parts[4])
c.trySwitchMaster(pubsub.getContext(), addr) c.trySwitchMaster(pubsub.getContext(), addr)
} }
if c.onUpdate != nil {
c.onUpdate(ctx)
}
} }
} }
@ -651,6 +678,8 @@ func contains(slice []string, str string) bool {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// NewFailoverClusterClient returns a client that supports routing read-only commands
// to a slave node.
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient { func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs)) sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
copy(sentinelAddrs, failoverOpt.SentinelAddrs) copy(sentinelAddrs, failoverOpt.SentinelAddrs)
@ -671,7 +700,7 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
Addr: masterAddr, Addr: masterAddr,
}} }}
slaveAddrs, err := failover.slaveAddresses(ctx) slaveAddrs, err := failover.slaveAddrs(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -693,8 +722,8 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
} }
c := NewClusterClient(opt) c := NewClusterClient(opt)
failover.onFailover = func(ctx context.Context, addr string) { failover.onUpdate = func(ctx context.Context) {
_ = c.ReloadState(ctx) c.ReloadState(ctx)
} }
return c return c

View File

@ -113,6 +113,8 @@ var _ = Describe("NewFailoverClusterClient", func() {
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{ client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
MasterName: sentinelName, MasterName: sentinelName,
SentinelAddrs: sentinelAddrs, SentinelAddrs: sentinelAddrs,
RouteRandomly: true,
}) })
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
@ -152,10 +154,12 @@ var _ = Describe("NewFailoverClusterClient", func() {
err := client.Set(ctx, "foo", "master", 0).Err() err := client.Set(ctx, "foo", "master", 0).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
for i := 0; i < 100; i++ {
// Verify. // Verify.
val, err := client.Get(ctx, "foo").Result() val, err := client.Get(ctx, "foo").Result()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("master")) Expect(val).To(Equal("master"))
}
// Create subscription. // Create subscription.
ch := client.Subscribe(ctx, "foo").Channel() ch := client.Subscribe(ctx, "foo").Channel()