mirror of https://github.com/go-redis/redis.git
Merge pull request #1485 from go-redis/fix/sentinel-route-by
Add Sentinel RouteBy
This commit is contained in:
commit
30e120f33e
|
@ -4,10 +4,14 @@
|
||||||
|
|
||||||
## v8 (unreleased)
|
## v8 (unreleased)
|
||||||
|
|
||||||
|
- Documentation at https://redis.uptrace.dev/
|
||||||
|
|
||||||
- All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not
|
- All commands require `context.Context` as a first argument, e.g. `rdb.Ping(ctx)`. If you are not
|
||||||
using `context.Context` yet, the simplest option is to define global package variable
|
using `context.Context` yet, the simplest option is to define global package variable
|
||||||
`var ctx = context.TODO()` and use it when `ctx` is required.
|
`var ctx = context.TODO()` and use it when `ctx` is required.
|
||||||
|
|
||||||
|
- Added `redis.NewFailoverClusterClient` that supports routing read-only commands to a slave node.
|
||||||
|
|
||||||
- Added `redisext.OpenTemetryHook` that adds
|
- Added `redisext.OpenTemetryHook` that adds
|
||||||
[Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/).
|
[Redis OpenTelemetry instrumentation](https://redis.uptrace.dev/tracing/).
|
||||||
|
|
||||||
|
|
|
@ -305,31 +305,6 @@ func BenchmarkClusterSetString(b *testing.B) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkClusterReloadState(b *testing.B) {
|
|
||||||
if testing.Short() {
|
|
||||||
b.Skip("skipping in short mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
cluster := newClusterScenario()
|
|
||||||
if err := startCluster(ctx, cluster); err != nil {
|
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
defer cluster.Close()
|
|
||||||
|
|
||||||
client := cluster.newClusterClient(ctx, redisClusterOptions())
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
err := client.ReloadState(ctx)
|
|
||||||
if err != nil {
|
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var clusterSink *redis.ClusterClient
|
var clusterSink *redis.ClusterClient
|
||||||
|
|
||||||
func BenchmarkClusterWithContext(b *testing.B) {
|
func BenchmarkClusterWithContext(b *testing.B) {
|
||||||
|
|
|
@ -727,9 +727,8 @@ func (c *ClusterClient) Options() *ClusterOptions {
|
||||||
|
|
||||||
// ReloadState reloads cluster state. If available it calls ClusterSlots func
|
// ReloadState reloads cluster state. If available it calls ClusterSlots func
|
||||||
// to get cluster slots information.
|
// to get cluster slots information.
|
||||||
func (c *ClusterClient) ReloadState(ctx context.Context) error {
|
func (c *ClusterClient) ReloadState(ctx context.Context) {
|
||||||
_, err := c.state.Reload(ctx)
|
c.state.LazyReload(ctx)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the cluster client, releasing any open resources.
|
// Close closes the cluster client, releasing any open resources.
|
||||||
|
@ -1638,7 +1637,7 @@ func (c *ClusterClient) cmdNode(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
|
if (c.opt.RouteByLatency || c.opt.RouteRandomly) && cmdInfo != nil && cmdInfo.ReadOnly {
|
||||||
return c.slotReadOnlyNode(state, slot)
|
return c.slotReadOnlyNode(state, slot)
|
||||||
}
|
}
|
||||||
return state.slotMasterNode(slot)
|
return state.slotMasterNode(slot)
|
||||||
|
|
|
@ -114,10 +114,7 @@ func ExampleNewClusterClient_manualSetup() {
|
||||||
|
|
||||||
// ReloadState reloads cluster state. It calls ClusterSlots func
|
// ReloadState reloads cluster state. It calls ClusterSlots func
|
||||||
// to get cluster slots information.
|
// to get cluster slots information.
|
||||||
err := rdb.ReloadState(ctx)
|
rdb.ReloadState(ctx)
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExampleNewRing() {
|
func ExampleNewRing() {
|
||||||
|
|
69
sentinel.go
69
sentinel.go
|
@ -26,6 +26,13 @@ type FailoverOptions struct {
|
||||||
// Sentinel password from "requirepass <password>" (if enabled) in Sentinel configuration
|
// Sentinel password from "requirepass <password>" (if enabled) in Sentinel configuration
|
||||||
SentinelPassword string
|
SentinelPassword string
|
||||||
|
|
||||||
|
// Allows routing read-only commands to the closest master or slave node.
|
||||||
|
// This option only works with NewFailoverClusterClient.
|
||||||
|
RouteByLatency bool
|
||||||
|
// Allows routing read-only commands to the random master or slave node.
|
||||||
|
// This option only works with NewFailoverClusterClient.
|
||||||
|
RouteRandomly bool
|
||||||
|
|
||||||
// Route all commands to slave read-only nodes.
|
// Route all commands to slave read-only nodes.
|
||||||
SlaveOnly bool
|
SlaveOnly bool
|
||||||
|
|
||||||
|
@ -123,7 +130,11 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||||
Username: opt.Username,
|
Username: opt.Username,
|
||||||
Password: opt.Password,
|
Password: opt.Password,
|
||||||
|
|
||||||
MaxRedirects: opt.MaxRetries,
|
MaxRedirects: opt.MaxRetries,
|
||||||
|
|
||||||
|
RouteByLatency: opt.RouteByLatency,
|
||||||
|
RouteRandomly: opt.RouteRandomly,
|
||||||
|
|
||||||
MinRetryBackoff: opt.MinRetryBackoff,
|
MinRetryBackoff: opt.MinRetryBackoff,
|
||||||
MaxRetryBackoff: opt.MaxRetryBackoff,
|
MaxRetryBackoff: opt.MaxRetryBackoff,
|
||||||
|
|
||||||
|
@ -146,6 +157,13 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||||
// for automatic failover. It's safe for concurrent use by multiple
|
// for automatic failover. It's safe for concurrent use by multiple
|
||||||
// goroutines.
|
// goroutines.
|
||||||
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
||||||
|
if failoverOpt.RouteByLatency {
|
||||||
|
panic("to route commands by latency, use NewFailoverClusterClient")
|
||||||
|
}
|
||||||
|
if failoverOpt.RouteRandomly {
|
||||||
|
panic("to route commands randomly, use NewFailoverClusterClient")
|
||||||
|
}
|
||||||
|
|
||||||
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
||||||
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
||||||
|
|
||||||
|
@ -378,7 +396,9 @@ type sentinelFailover struct {
|
||||||
opt *FailoverOptions
|
opt *FailoverOptions
|
||||||
|
|
||||||
sentinelAddrs []string
|
sentinelAddrs []string
|
||||||
onFailover func(ctx context.Context, addr string)
|
|
||||||
|
onFailover func(ctx context.Context, addr string)
|
||||||
|
onUpdate func(ctx context.Context)
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
_masterAddr string
|
_masterAddr string
|
||||||
|
@ -409,7 +429,7 @@ func (c *sentinelFailover) closeSentinel() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
|
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
|
||||||
addresses, err := c.slaveAddresses(ctx)
|
addresses, err := c.slaveAddrs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -464,7 +484,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
||||||
return "", errors.New("redis: all sentinels are unreachable")
|
return "", errors.New("redis: all sentinels are unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) {
|
func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
sentinel := c.sentinel
|
sentinel := c.sentinel
|
||||||
c.mu.RUnlock()
|
c.mu.RUnlock()
|
||||||
|
@ -502,7 +522,7 @@ func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error)
|
||||||
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
||||||
c.setSentinel(ctx, sentinel)
|
c.setSentinel(ctx, sentinel)
|
||||||
|
|
||||||
addrs := parseSlaveAddresses(slaves)
|
addrs := parseSlaveAddrs(slaves)
|
||||||
return addrs, nil
|
return addrs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -526,12 +546,11 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel
|
||||||
c.opt.MasterName, err)
|
c.opt.MasterName, err)
|
||||||
return []string{}
|
return []string{}
|
||||||
}
|
}
|
||||||
|
return parseSlaveAddrs(addrs)
|
||||||
return parseSlaveAddresses(addrs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseSlaveAddresses(addrs []interface{}) []string {
|
func parseSlaveAddrs(addrs []interface{}) []string {
|
||||||
nodes := []string{}
|
nodes := make([]string, 0, len(addrs))
|
||||||
|
|
||||||
for _, node := range addrs {
|
for _, node := range addrs {
|
||||||
ip := ""
|
ip := ""
|
||||||
|
@ -551,12 +570,14 @@ func parseSlaveAddresses(addrs []interface{}) []string {
|
||||||
}
|
}
|
||||||
lastkey = key.(string)
|
lastkey = key.(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, flag := range flags {
|
for _, flag := range flags {
|
||||||
switch flag {
|
switch flag {
|
||||||
case "s_down", "o_down", "disconnected":
|
case "s_down", "o_down", "disconnected":
|
||||||
isDown = true
|
isDown = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isDown {
|
if !isDown {
|
||||||
nodes = append(nodes, net.JoinHostPort(ip, port))
|
nodes = append(nodes, net.JoinHostPort(ip, port))
|
||||||
}
|
}
|
||||||
|
@ -584,7 +605,9 @@ func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
|
||||||
|
|
||||||
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
|
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
|
||||||
c.opt.MasterName, addr)
|
c.opt.MasterName, addr)
|
||||||
go c.onFailover(ctx, addr)
|
if c.onFailover != nil {
|
||||||
|
c.onFailover(ctx, addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
|
func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
|
||||||
|
@ -594,7 +617,7 @@ func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelCl
|
||||||
c.sentinel = sentinel
|
c.sentinel = sentinel
|
||||||
c.discoverSentinels(ctx)
|
c.discoverSentinels(ctx)
|
||||||
|
|
||||||
c.pubsub = sentinel.Subscribe(ctx, "+switch-master")
|
c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+slave-reconf-done")
|
||||||
go c.listen(c.pubsub)
|
go c.listen(c.pubsub)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -621,13 +644,13 @@ func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) listen(pubsub *PubSub) {
|
func (c *sentinelFailover) listen(pubsub *PubSub) {
|
||||||
ch := pubsub.Channel()
|
ctx := context.TODO()
|
||||||
for {
|
if c.onUpdate != nil {
|
||||||
msg, ok := <-ch
|
c.onUpdate(ctx)
|
||||||
if !ok {
|
}
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
|
ch := pubsub.Channel()
|
||||||
|
for msg := range ch {
|
||||||
if msg.Channel == "+switch-master" {
|
if msg.Channel == "+switch-master" {
|
||||||
parts := strings.Split(msg.Payload, " ")
|
parts := strings.Split(msg.Payload, " ")
|
||||||
if parts[0] != c.opt.MasterName {
|
if parts[0] != c.opt.MasterName {
|
||||||
|
@ -637,6 +660,10 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
|
||||||
addr := net.JoinHostPort(parts[3], parts[4])
|
addr := net.JoinHostPort(parts[3], parts[4])
|
||||||
c.trySwitchMaster(pubsub.getContext(), addr)
|
c.trySwitchMaster(pubsub.getContext(), addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.onUpdate != nil {
|
||||||
|
c.onUpdate(ctx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,6 +678,8 @@ func contains(slice []string, str string) bool {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// NewFailoverClusterClient returns a client that supports routing read-only commands
|
||||||
|
// to a slave node.
|
||||||
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
||||||
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
|
||||||
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
copy(sentinelAddrs, failoverOpt.SentinelAddrs)
|
||||||
|
@ -671,7 +700,7 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
||||||
Addr: masterAddr,
|
Addr: masterAddr,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
slaveAddrs, err := failover.slaveAddresses(ctx)
|
slaveAddrs, err := failover.slaveAddrs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -693,8 +722,8 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewClusterClient(opt)
|
c := NewClusterClient(opt)
|
||||||
failover.onFailover = func(ctx context.Context, addr string) {
|
failover.onUpdate = func(ctx context.Context) {
|
||||||
_ = c.ReloadState(ctx)
|
c.ReloadState(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c
|
return c
|
||||||
|
|
|
@ -113,6 +113,8 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
|
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
|
||||||
MasterName: sentinelName,
|
MasterName: sentinelName,
|
||||||
SentinelAddrs: sentinelAddrs,
|
SentinelAddrs: sentinelAddrs,
|
||||||
|
|
||||||
|
RouteRandomly: true,
|
||||||
})
|
})
|
||||||
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
@ -152,10 +154,12 @@ var _ = Describe("NewFailoverClusterClient", func() {
|
||||||
err := client.Set(ctx, "foo", "master", 0).Err()
|
err := client.Set(ctx, "foo", "master", 0).Err()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Verify.
|
for i := 0; i < 100; i++ {
|
||||||
val, err := client.Get(ctx, "foo").Result()
|
// Verify.
|
||||||
Expect(err).NotTo(HaveOccurred())
|
val, err := client.Get(ctx, "foo").Result()
|
||||||
Expect(val).To(Equal("master"))
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(val).To(Equal("master"))
|
||||||
|
}
|
||||||
|
|
||||||
// Create subscription.
|
// Create subscription.
|
||||||
ch := client.Subscribe(ctx, "foo").Channel()
|
ch := client.Subscribe(ctx, "foo").Channel()
|
||||||
|
|
Loading…
Reference in New Issue