Fix FailoverClient to close connection to Sentinel. Fixes races build.

This commit is contained in:
Vladimir Mihailenco 2016-03-09 15:14:01 +02:00
parent 0db1d730c8
commit 27635bbe4e
4 changed files with 80 additions and 54 deletions

View File

@ -2,7 +2,6 @@ package redis
import ( import (
"net" "net"
"sync"
"time" "time"
) )
@ -20,18 +19,12 @@ func (cn *conn) SetNetConn(netcn net.Conn) {
cn.netcn = netcn cn.netcn = netcn
} }
var timeMu sync.Mutex
func SetTime(tm time.Time) { func SetTime(tm time.Time) {
timeMu.Lock()
now = func() time.Time { now = func() time.Time {
return tm return tm
} }
timeMu.Unlock()
} }
func RestoreTime() { func RestoreTime() {
timeMu.Lock()
now = time.Now now = time.Now
timeMu.Unlock()
} }

View File

@ -98,9 +98,10 @@ func TestGinkgoSuite(t *testing.T) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func eventually(fn func() error, timeout time.Duration) (err error) { func eventually(fn func() error, timeout time.Duration) error {
done := make(chan struct{}) done := make(chan struct{})
var exit int32 var exit int32
var err error
go func() { go func() {
for atomic.LoadInt32(&exit) == 0 { for atomic.LoadInt32(&exit) == 0 {
err = fn() err = fn()

View File

@ -13,6 +13,8 @@ var Logger = log.New(os.Stderr, "redis: ", log.LstdFlags)
type baseClient struct { type baseClient struct {
connPool pool connPool pool
opt *Options opt *Options
onClose func() error // hook called when client is closed
} }
func (c *baseClient) String() string { func (c *baseClient) String() string {
@ -83,7 +85,16 @@ func (c *baseClient) process(cmd Cmder) {
// It is rare to Close a Client, as the Client is meant to be // It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines. // long-lived and shared between many goroutines.
func (c *baseClient) Close() error { func (c *baseClient) Close() error {
return c.connPool.Close() var retErr error
if c.onClose != nil {
if err := c.onClose(); err != nil && retErr == nil {
retErr = err
}
}
if err := c.connPool.Close(); err != nil && retErr == nil {
retErr = err
}
return retErr
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -187,7 +198,9 @@ func newClient(opt *Options, pool pool) *Client {
base := baseClient{opt: opt, connPool: pool} base := baseClient{opt: opt, connPool: pool}
return &Client{ return &Client{
baseClient: base, baseClient: base,
commandable: commandable{process: base.process}, commandable: commandable{
process: base.process,
},
} }
} }

View File

@ -65,18 +65,31 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt: opt, opt: opt,
} }
return newClient(opt, failover.Pool()) base := baseClient{
opt: opt,
connPool: failover.Pool(),
onClose: func() error {
return failover.Close()
},
}
return &Client{
baseClient: base,
commandable: commandable{
process: base.process,
},
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type sentinelClient struct { type sentinelClient struct {
baseClient
commandable commandable
*baseClient
} }
func newSentinel(opt *Options) *sentinelClient { func newSentinel(opt *Options) *sentinelClient {
base := &baseClient{ base := baseClient{
opt: opt, opt: opt,
connPool: newConnPool(opt), connPool: newConnPool(opt),
} }
@ -116,8 +129,12 @@ type sentinelFailover struct {
pool pool pool pool
poolOnce sync.Once poolOnce sync.Once
lock sync.RWMutex mu sync.RWMutex
_sentinel *sentinelClient sentinel *sentinelClient
}
func (d *sentinelFailover) Close() error {
return d.resetSentinel()
} }
func (d *sentinelFailover) dial() (net.Conn, error) { func (d *sentinelFailover) dial() (net.Conn, error) {
@ -137,15 +154,15 @@ func (d *sentinelFailover) Pool() pool {
} }
func (d *sentinelFailover) MasterAddr() (string, error) { func (d *sentinelFailover) MasterAddr() (string, error) {
defer d.lock.Unlock() defer d.mu.Unlock()
d.lock.Lock() d.mu.Lock()
// Try last working sentinel. // Try last working sentinel.
if d._sentinel != nil { if d.sentinel != nil {
addr, err := d._sentinel.GetMasterAddrByName(d.masterName).Result() addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil { if err != nil {
Logger.Printf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err) Logger.Printf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
d.resetSentinel() d._resetSentinel()
} else { } else {
addr := net.JoinHostPort(addr[0], addr[1]) addr := net.JoinHostPort(addr[0], addr[1])
Logger.Printf("sentinel: %q addr is %s", d.masterName, addr) Logger.Printf("sentinel: %q addr is %s", d.masterName, addr)
@ -186,10 +203,26 @@ func (d *sentinelFailover) MasterAddr() (string, error) {
func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) { func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) {
d.discoverSentinels(sentinel) d.discoverSentinels(sentinel)
d._sentinel = sentinel d.sentinel = sentinel
go d.listen() go d.listen()
} }
func (d *sentinelFailover) resetSentinel() error {
d.mu.Lock()
err := d._resetSentinel()
d.mu.Unlock()
return err
}
func (d *sentinelFailover) _resetSentinel() error {
var err error
if d.sentinel != nil {
err = d.sentinel.Close()
d.sentinel = nil
}
return err
}
func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) { func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
sentinels, err := sentinel.Sentinels(d.masterName).Result() sentinels, err := sentinel.Sentinels(d.masterName).Result()
if err != nil { if err != nil {
@ -247,25 +280,22 @@ func (d *sentinelFailover) listen() {
var pubsub *PubSub var pubsub *PubSub
for { for {
if pubsub == nil { if pubsub == nil {
pubsub = d._sentinel.PubSub() pubsub = d.sentinel.PubSub()
if err := pubsub.Subscribe("+switch-master"); err != nil { if err := pubsub.Subscribe("+switch-master"); err != nil {
Logger.Printf("sentinel: Subscribe failed: %s", err) Logger.Printf("sentinel: Subscribe failed: %s", err)
d.lock.Lock()
d.resetSentinel() d.resetSentinel()
d.lock.Unlock()
return return
} }
} }
msg, err := pubsub.Receive() msg, err := pubsub.ReceiveMessage()
if err != nil { if err != nil {
Logger.Printf("sentinel: Receive failed: %s", err) Logger.Printf("sentinel: ReceiveMessage failed: %s", err)
pubsub.Close() pubsub.Close()
d.resetSentinel()
return return
} }
switch msg := msg.(type) {
case *Message:
switch msg.Channel { switch msg.Channel {
case "+switch-master": case "+switch-master":
parts := strings.Split(msg.Payload, " ") parts := strings.Split(msg.Payload, " ")
@ -273,6 +303,7 @@ func (d *sentinelFailover) listen() {
Logger.Printf("sentinel: ignore new %s addr", parts[0]) Logger.Printf("sentinel: ignore new %s addr", parts[0])
continue continue
} }
addr := net.JoinHostPort(parts[3], parts[4]) addr := net.JoinHostPort(parts[3], parts[4])
Logger.Printf( Logger.Printf(
"sentinel: new %q addr is %s", "sentinel: new %q addr is %s",
@ -280,22 +311,10 @@ func (d *sentinelFailover) listen() {
) )
d.closeOldConns(addr) d.closeOldConns(addr)
default:
Logger.Printf("sentinel: unsupported message: %s", msg)
}
case *Subscription:
// Ignore.
default:
Logger.Printf("sentinel: unsupported message: %s", msg)
} }
} }
} }
func (d *sentinelFailover) resetSentinel() {
d._sentinel.Close()
d._sentinel = nil
}
func contains(slice []string, str string) bool { func contains(slice []string, str string) bool {
for _, s := range slice { for _, s := range slice {
if s == str { if s == str {