From 49aac99f9d9cbb83d9db78ec16c2a81b2f48e994 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Mlyn=C3=A1=C5=99?= Date: Fri, 4 Sep 2020 11:54:06 +0200 Subject: [PATCH] FailoverClient with read-only support (#1199) * FailoverClient with read-only support --- options.go | 3 ++ sentinel.go | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 1 deletion(-) diff --git a/options.go b/options.go index 18dcf285..7ce0aef5 100644 --- a/options.go +++ b/options.go @@ -104,6 +104,9 @@ type Options struct { // Enables read only queries on slave nodes. readOnly bool + // Enables read only queries on redis replicas in sentinel mode + sentinelReadOnly bool + // TLS Config to use. When set TLS will be negotiated. TLSConfig *tls.Config diff --git a/sentinel.go b/sentinel.go index 34351dc6..dd735dc9 100644 --- a/sentinel.go +++ b/sentinel.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "math/rand" "net" "strings" "sync" @@ -50,6 +51,9 @@ type FailoverOptions struct { IdleCheckFrequency time.Duration TLSConfig *tls.Config + + // Enables read-only commands on slave nodes. + ReadOnly bool } func (opt *FailoverOptions) options() *Options { @@ -79,6 +83,8 @@ func (opt *FailoverOptions) options() *Options { MaxConnAge: opt.MaxConnAge, TLSConfig: opt.TLSConfig, + + sentinelReadOnly: opt.ReadOnly, } } @@ -325,7 +331,15 @@ func (c *sentinelFailover) Pool() *pool.ConnPool { } func (c *sentinelFailover) dial(ctx context.Context, network, _ string) (net.Conn, error) { - addr, err := c.MasterAddr(ctx) + var addr string + var err error + + if c.opt.sentinelReadOnly { + addr, err = c.RandomSlaveAddr(ctx) + } else { + addr, err = c.MasterAddr(ctx) + } + if err != nil { return nil, err } @@ -344,6 +358,17 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) { return addr, nil } +func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) { + addresses, err := c.slaveAddresses(ctx) + if err != nil { + return "", err + } + if len(addresses) < 1 { + return c.MasterAddr(ctx) + } + return addresses[rand.Intn(len(addresses))], nil +} + func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) { c.mu.RLock() sentinel := c.sentinel @@ -408,6 +433,70 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) { return "", errors.New("redis: all sentinels are unreachable") } +func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) { + c.mu.RLock() + sentinel := c.sentinel + c.mu.RUnlock() + + if sentinel != nil { + addrs := c.getSlaveAddrs(ctx, sentinel) + if len(addrs) > 0 { + return addrs, nil + } + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.sentinel != nil { + addrs := c.getSlaveAddrs(ctx, c.sentinel) + if len(addrs) > 0 { + return addrs, nil + } + _ = c.closeSentinel() + } + + for i, sentinelAddr := range c.sentinelAddrs { + sentinel := NewSentinelClient(&Options{ + Addr: sentinelAddr, + Dialer: c.opt.Dialer, + + Username: c.opt.Username, + Password: c.opt.Password, + + MaxRetries: c.opt.MaxRetries, + + DialTimeout: c.opt.DialTimeout, + ReadTimeout: c.opt.ReadTimeout, + WriteTimeout: c.opt.WriteTimeout, + + PoolSize: c.opt.PoolSize, + PoolTimeout: c.opt.PoolTimeout, + IdleTimeout: c.opt.IdleTimeout, + IdleCheckFrequency: c.opt.IdleCheckFrequency, + + TLSConfig: c.opt.TLSConfig, + }) + + slaves, err := sentinel.Slaves(ctx, c.masterName).Result() + if err != nil { + internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s", + c.masterName, err) + _ = sentinel.Close() + continue + } + + // Push working sentinel to the top. + c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] + c.setSentinel(ctx, sentinel) + + addrs := parseSlaveAddresses(slaves) + return addrs, nil + } + + return []string{}, errors.New("redis: all sentinels are unreachable") +} + func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string { addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result() if err != nil { @@ -418,6 +507,52 @@ func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *Sentinel return net.JoinHostPort(addr[0], addr[1]) } +func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string { + addrs, err := sentinel.Slaves(ctx, c.masterName).Result() + if err != nil { + internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s", + c.masterName, err) + return []string{} + } + + return parseSlaveAddresses(addrs) +} + +func parseSlaveAddresses(addrs []interface{}) []string { + nodes := []string{} + + for _, node := range addrs { + ip := "" + port := "" + flags := []string{} + lastkey := "" + isDown := false + + for _, key := range node.([]interface{}) { + switch lastkey { + case "ip": + ip = key.(string) + case "port": + port = key.(string) + case "flags": + flags = strings.Split(key.(string), ",") + } + lastkey = key.(string) + } + for _, flag := range flags { + switch flag { + case "s_down", "o_down", "disconnected": + isDown = true + } + } + if !isDown { + nodes = append(nodes, net.JoinHostPort(ip, port)) + } + } + + return nodes +} + func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) { c.mu.RLock() masterAddr := c._masterAddr