From 48e9afe2a7b24c591a8b878d960ea6a1f45d07de Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 1 Oct 2018 11:38:21 +0300 Subject: [PATCH 1/3] Use PubSub.Channel to not burn CPU on errors --- example_test.go | 1 - pubsub.go | 1 - sentinel.go | 202 ++++++++++++++++++++++++++---------------------- 3 files changed, 108 insertions(+), 96 deletions(-) diff --git a/example_test.go b/example_test.go index 6708e0c..ec8c151 100644 --- a/example_test.go +++ b/example_test.go @@ -20,7 +20,6 @@ func init() { PoolSize: 10, PoolTimeout: 30 * time.Second, }) - // redisdb.FlushDB() } func ExampleNewClient() { diff --git a/pubsub.go b/pubsub.go index b08f34a..b91972a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -51,7 +51,6 @@ func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } - if c.cn != nil { return c.cn, nil } diff --git a/sentinel.go b/sentinel.go index c5f7149..5361a02 100644 --- a/sentinel.go +++ b/sentinel.go @@ -119,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { return c } -func (c *SentinelClient) PubSub() *PubSub { +func (c *SentinelClient) pubSub() *PubSub { pubsub := &PubSub{ opt: c.opt, @@ -132,14 +132,34 @@ func (c *SentinelClient) PubSub() *PubSub { return pubsub } +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *SentinelClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *SentinelClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { - cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name) + cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name) c.Process(cmd) return cmd } func (c *SentinelClient) Sentinels(name string) *SliceCmd { - cmd := NewSliceCmd("SENTINEL", "sentinels", name) + cmd := NewSliceCmd("sentinel", "sentinels", name) c.Process(cmd) return cmd } @@ -158,77 +178,70 @@ type sentinelFailover struct { sentinel *SentinelClient } -func (d *sentinelFailover) Close() error { - return d.resetSentinel() +func (c *sentinelFailover) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.closeSentinel() } -func (d *sentinelFailover) Pool() *pool.ConnPool { - d.poolOnce.Do(func() { - d.opt.Dialer = d.dial - d.pool = newConnPool(d.opt) +func (c *sentinelFailover) Pool() *pool.ConnPool { + c.poolOnce.Do(func() { + c.opt.Dialer = c.dial + c.pool = newConnPool(c.opt) }) - return d.pool + return c.pool } -func (d *sentinelFailover) dial() (net.Conn, error) { - addr, err := d.MasterAddr() +func (c *sentinelFailover) dial() (net.Conn, error) { + addr, err := c.MasterAddr() if err != nil { return nil, err } - return net.DialTimeout("tcp", addr, d.opt.DialTimeout) + return net.DialTimeout("tcp", addr, c.opt.DialTimeout) } -func (d *sentinelFailover) MasterAddr() (string, error) { - d.mu.Lock() - defer d.mu.Unlock() - - addr, err := d.masterAddr() +func (c *sentinelFailover) MasterAddr() (string, error) { + addr, err := c.masterAddr() if err != nil { return "", err } - d._switchMaster(addr) - + c.switchMaster(addr) return addr, nil } -func (d *sentinelFailover) masterAddr() (string, error) { - // Try last working sentinel. - if d.sentinel != nil { - addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result() - if err == nil { - addr := net.JoinHostPort(addr[0], addr[1]) - return addr, nil - } - - internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", - d.masterName, err) - d._resetSentinel() +func (c *sentinelFailover) masterAddr() (string, error) { + addr := c.getMasterAddr() + if addr != "" { + return addr, nil } - for i, sentinelAddr := range d.sentinelAddrs { + c.mu.Lock() + defer c.mu.Unlock() + + for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, - DialTimeout: d.opt.DialTimeout, - ReadTimeout: d.opt.ReadTimeout, - WriteTimeout: d.opt.WriteTimeout, + DialTimeout: c.opt.DialTimeout, + ReadTimeout: c.opt.ReadTimeout, + WriteTimeout: c.opt.WriteTimeout, - PoolSize: d.opt.PoolSize, - PoolTimeout: d.opt.PoolTimeout, - IdleTimeout: d.opt.IdleTimeout, + PoolSize: c.opt.PoolSize, + PoolTimeout: c.opt.PoolTimeout, + IdleTimeout: c.opt.IdleTimeout, }) - masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() + masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result() if err != nil { internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", - d.masterName, err) + c.masterName, err) sentinel.Close() continue } // Push working sentinel to the top. - d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0] - d.setSentinel(sentinel) + c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] + c.setSentinel(sentinel) addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) return addr, nil @@ -237,17 +250,41 @@ func (d *sentinelFailover) masterAddr() (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (c *sentinelFailover) switchMaster(addr string) { - c.mu.Lock() - c._switchMaster(addr) - c.mu.Unlock() +func (c *sentinelFailover) getMasterAddr() string { + c.mu.RLock() + sentinel := c.sentinel + c.mu.RUnlock() + + if sentinel == nil { + return "" + } + + addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() + if err != nil { + internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", + c.masterName, err) + c.mu.Lock() + if c.sentinel == sentinel { + c.closeSentinel() + } + c.mu.Unlock() + return "" + } + + return net.JoinHostPort(addr[0], addr[1]) } -func (c *sentinelFailover) _switchMaster(addr string) { - if c._masterAddr == addr { +func (c *sentinelFailover) switchMaster(addr string) { + c.mu.RLock() + masterAddr := c._masterAddr + c.mu.RUnlock() + if masterAddr == addr { return } + c.mu.Lock() + defer c.mu.Unlock() + internal.Logf("sentinel: new master=%q addr=%q", c.masterName, addr) _ = c.Pool().Filter(func(cn *pool.Conn) bool { @@ -256,32 +293,22 @@ func (c *sentinelFailover) _switchMaster(addr string) { c._masterAddr = addr } -func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) { - d.discoverSentinels(sentinel) - d.sentinel = sentinel - go d.listen(sentinel) +func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { + c.discoverSentinels(sentinel) + c.sentinel = sentinel + go c.listen(sentinel) } -func (d *sentinelFailover) resetSentinel() error { - var err error - d.mu.Lock() - if d.sentinel != nil { - err = d._resetSentinel() - } - d.mu.Unlock() +func (c *sentinelFailover) closeSentinel() error { + err := c.sentinel.Close() + c.sentinel = nil return err } -func (d *sentinelFailover) _resetSentinel() error { - err := d.sentinel.Close() - d.sentinel = nil - return err -} - -func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { - sentinels, err := sentinel.Sentinels(d.masterName).Result() +func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { + sentinels, err := sentinel.Sentinels(c.masterName).Result() if err != nil { - internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err) + internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) return } for _, sentinel := range sentinels { @@ -290,49 +317,36 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { key := vals[i].(string) if key == "name" { sentinelAddr := vals[i+1].(string) - if !contains(d.sentinelAddrs, sentinelAddr) { - internal.Logf( - "sentinel: discovered new sentinel=%q for master=%q", - sentinelAddr, d.masterName, - ) - d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr) + if !contains(c.sentinelAddrs, sentinelAddr) { + internal.Logf("sentinel: discovered new sentinel=%q for master=%q", + sentinelAddr, c.masterName) + c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) } } } } } -func (d *sentinelFailover) listen(sentinel *SentinelClient) { - pubsub := sentinel.PubSub() +func (c *sentinelFailover) listen(sentinel *SentinelClient) { + pubsub := sentinel.Subscribe("+switch-master") defer pubsub.Close() - err := pubsub.Subscribe("+switch-master") - if err != nil { - internal.Logf("sentinel: Subscribe failed: %s", err) - d.resetSentinel() - return - } - + ch := pubsub.Channel() for { - msg, err := pubsub.ReceiveMessage() - if err != nil { - if err == pool.ErrClosed { - d.resetSentinel() - return - } - internal.Logf("sentinel: ReceiveMessage failed: %s", err) - continue + msg, ok := <-ch + if !ok { + break } switch msg.Channel { case "+switch-master": parts := strings.Split(msg.Payload, " ") - if parts[0] != d.masterName { + if parts[0] != c.masterName { internal.Logf("sentinel: ignore addr for master=%q", parts[0]) continue } addr := net.JoinHostPort(parts[3], parts[4]) - d.switchMaster(addr) + c.switchMaster(addr) } } } From e99b0688d63bce86e514c4327c4be46e5aa05708 Mon Sep 17 00:00:00 2001 From: Issac Trotts Date: Wed, 3 Oct 2018 16:36:21 -0700 Subject: [PATCH 2/3] use more idiomatic for loop in ExamplePubSub --- example_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/example_test.go b/example_test.go index ec8c151..291e5b4 100644 --- a/example_test.go +++ b/example_test.go @@ -343,11 +343,7 @@ func ExamplePubSub() { }) // Consume messages. - for { - msg, ok := <-ch - if !ok { - break - } + for msg := range ch { fmt.Println(msg.Channel, msg.Payload) } From b834145dc7905182198b6525c07300c59d46344e Mon Sep 17 00:00:00 2001 From: andriikushch Date: Fri, 5 Oct 2018 08:46:29 +0200 Subject: [PATCH 3/3] Add test for read timeout option (#877) Add test for ReadTimeout option --- options.go | 2 +- options_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/options.go b/options.go index 2b5bcb5..088f697 100644 --- a/options.go +++ b/options.go @@ -48,7 +48,7 @@ type Options struct { // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. If reached, commands will fail - // with a timeout instead of blocking. + // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. If reached, commands will fail diff --git a/options_test.go b/options_test.go index 211f6b1..9b806f4 100644 --- a/options_test.go +++ b/options_test.go @@ -5,6 +5,7 @@ package redis import ( "errors" "testing" + "time" ) func TestParseURL(t *testing.T) { @@ -92,3 +93,27 @@ func TestParseURL(t *testing.T) { }) } } + +// Test ReadTimeout option initialization, including special values -1 and 0. +// And also test behaviour of WriteTimeout option, when it is not explicitly set and use +// ReadTimeout value. +func TestReadTimeoutOptions(t *testing.T) { + testDataInputOutputMap := map[time.Duration]time.Duration{ + -1: 0 * time.Second, + 0: 3 * time.Second, + 1: 1 * time.Nanosecond, + 3: 3 * time.Nanosecond, + } + + for in, out := range testDataInputOutputMap { + o := &Options{ReadTimeout: in} + o.init() + if o.ReadTimeout != out { + t.Errorf("got %d instead of %d as ReadTimeout option", o.ReadTimeout, out) + } + + if o.WriteTimeout != o.ReadTimeout { + t.Errorf("got %d instead of %d as WriteTimeout option", o.WriteTimeout, o.ReadTimeout) + } + } +}