FailoverClient with read-only support (#1199)

* FailoverClient with read-only support
This commit is contained in:
Martin Mlynář 2020-09-04 11:54:06 +02:00 committed by GitHub
parent 1e8d28243e
commit 49aac99f9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 1 deletions

View File

@ -104,6 +104,9 @@ type Options struct {
// Enables read only queries on slave nodes. // Enables read only queries on slave nodes.
readOnly bool readOnly bool
// Enables read only queries on redis replicas in sentinel mode
sentinelReadOnly bool
// TLS Config to use. When set TLS will be negotiated. // TLS Config to use. When set TLS will be negotiated.
TLSConfig *tls.Config TLSConfig *tls.Config

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"math/rand"
"net" "net"
"strings" "strings"
"sync" "sync"
@ -50,6 +51,9 @@ type FailoverOptions struct {
IdleCheckFrequency time.Duration IdleCheckFrequency time.Duration
TLSConfig *tls.Config TLSConfig *tls.Config
// Enables read-only commands on slave nodes.
ReadOnly bool
} }
func (opt *FailoverOptions) options() *Options { func (opt *FailoverOptions) options() *Options {
@ -79,6 +83,8 @@ func (opt *FailoverOptions) options() *Options {
MaxConnAge: opt.MaxConnAge, MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
sentinelReadOnly: opt.ReadOnly,
} }
} }
@ -325,7 +331,15 @@ func (c *sentinelFailover) Pool() *pool.ConnPool {
} }
func (c *sentinelFailover) dial(ctx context.Context, network, _ string) (net.Conn, error) { func (c *sentinelFailover) dial(ctx context.Context, network, _ string) (net.Conn, error) {
addr, err := c.MasterAddr(ctx) var addr string
var err error
if c.opt.sentinelReadOnly {
addr, err = c.RandomSlaveAddr(ctx)
} else {
addr, err = c.MasterAddr(ctx)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -344,6 +358,17 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
return addr, nil return addr, nil
} }
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
addresses, err := c.slaveAddresses(ctx)
if err != nil {
return "", err
}
if len(addresses) < 1 {
return c.MasterAddr(ctx)
}
return addresses[rand.Intn(len(addresses))], nil
}
func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) { func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
c.mu.RLock() c.mu.RLock()
sentinel := c.sentinel sentinel := c.sentinel
@ -408,6 +433,70 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
return "", errors.New("redis: all sentinels are unreachable") return "", errors.New("redis: all sentinels are unreachable")
} }
func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) {
c.mu.RLock()
sentinel := c.sentinel
c.mu.RUnlock()
if sentinel != nil {
addrs := c.getSlaveAddrs(ctx, sentinel)
if len(addrs) > 0 {
return addrs, nil
}
}
c.mu.Lock()
defer c.mu.Unlock()
if c.sentinel != nil {
addrs := c.getSlaveAddrs(ctx, c.sentinel)
if len(addrs) > 0 {
return addrs, nil
}
_ = c.closeSentinel()
}
for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
Dialer: c.opt.Dialer,
Username: c.opt.Username,
Password: c.opt.Password,
MaxRetries: c.opt.MaxRetries,
DialTimeout: c.opt.DialTimeout,
ReadTimeout: c.opt.ReadTimeout,
WriteTimeout: c.opt.WriteTimeout,
PoolSize: c.opt.PoolSize,
PoolTimeout: c.opt.PoolTimeout,
IdleTimeout: c.opt.IdleTimeout,
IdleCheckFrequency: c.opt.IdleCheckFrequency,
TLSConfig: c.opt.TLSConfig,
})
slaves, err := sentinel.Slaves(ctx, c.masterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
c.masterName, err)
_ = sentinel.Close()
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
addrs := parseSlaveAddresses(slaves)
return addrs, nil
}
return []string{}, errors.New("redis: all sentinels are unreachable")
}
func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string { func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
if err != nil { if err != nil {
@ -418,6 +507,52 @@ func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *Sentinel
return net.JoinHostPort(addr[0], addr[1]) return net.JoinHostPort(addr[0], addr[1])
} }
func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
addrs, err := sentinel.Slaves(ctx, c.masterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
c.masterName, err)
return []string{}
}
return parseSlaveAddresses(addrs)
}
func parseSlaveAddresses(addrs []interface{}) []string {
nodes := []string{}
for _, node := range addrs {
ip := ""
port := ""
flags := []string{}
lastkey := ""
isDown := false
for _, key := range node.([]interface{}) {
switch lastkey {
case "ip":
ip = key.(string)
case "port":
port = key.(string)
case "flags":
flags = strings.Split(key.(string), ",")
}
lastkey = key.(string)
}
for _, flag := range flags {
switch flag {
case "s_down", "o_down", "disconnected":
isDown = true
}
}
if !isDown {
nodes = append(nodes, net.JoinHostPort(ip, port))
}
}
return nodes
}
func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) { func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) {
c.mu.RLock() c.mu.RLock()
masterAddr := c._masterAddr masterAddr := c._masterAddr