diff --git a/multi.go b/multi.go index 43db5e8f..bb4fafe7 100644 --- a/multi.go +++ b/multi.go @@ -49,7 +49,8 @@ func (c *Client) Multi() *Multi { func (c *Multi) putConn(cn *conn, ei error) { var err error if isBadConn(cn, ei) { - err = c.base.connPool.Remove(nil) // nil to force removal + // Close current connection. + c.base.connPool.(*stickyConnPool).Reset() } else { err = c.base.connPool.Put(cn) } diff --git a/pool.go b/pool.go index 2c387e9c..9968bd7e 100644 --- a/pool.go +++ b/pool.go @@ -137,7 +137,7 @@ func newConnPool(opt *Options) *connPool { p := &connPool{ dialer: newConnDialer(opt), - rl: ratelimit.New(2*opt.getPoolSize(), time.Second), + rl: ratelimit.New(3*opt.getPoolSize(), time.Second), opt: opt, conns: newConnList(opt.getPoolSize()), freeConns: make(chan *conn, opt.getPoolSize()), @@ -458,11 +458,7 @@ func (p *stickyConnPool) Remove(cn *conn) error { if cn != nil && p.cn != cn { panic("p.cn != cn") } - if cn == nil { - return p.remove() - } else { - return nil - } + return nil } func (p *stickyConnPool) Len() int { @@ -483,6 +479,15 @@ func (p *stickyConnPool) FreeLen() int { return 0 } +func (p *stickyConnPool) Reset() (err error) { + p.mx.Lock() + if p.cn != nil { + err = p.remove() + } + p.mx.Unlock() + return err +} + func (p *stickyConnPool) Close() error { defer p.mx.Unlock() p.mx.Lock() diff --git a/pubsub.go b/pubsub.go index 223f8b96..3e20fe72 100644 --- a/pubsub.go +++ b/pubsub.go @@ -235,7 +235,7 @@ func (c *PubSub) Receive() (interface{}, error) { func (c *PubSub) reconnect() { // Close current connection. - c.connPool.Remove(nil) // nil to force removal + c.connPool.(*stickyConnPool).Reset() if len(c.channels) > 0 { if err := c.Subscribe(c.channels...); err != nil { @@ -252,39 +252,41 @@ func (c *PubSub) reconnect() { // ReceiveMessage returns a message or error. It automatically // reconnects to Redis in case of network errors. func (c *PubSub) ReceiveMessage() (*Message, error) { - var badConn bool + var errNum int for { msgi, err := c.ReceiveTimeout(5 * time.Second) if err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - if badConn { - c.reconnect() - badConn = false - continue - } - - err := c.Ping("") - if err != nil { - c.reconnect() - } else { - badConn = true - } - continue + if !isNetworkError(err) { + return nil, err } - if isNetworkError(err) { - c.reconnect() - continue + goodConn := errNum == 0 + errNum++ + + if goodConn { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + err := c.Ping("") + if err == nil { + continue + } + log.Printf("redis: PubSub.Ping failed: %s", err) + } } - return nil, err + if errNum > 2 { + time.Sleep(time.Second) + } + c.reconnect() + continue } + // Reset error number. + errNum = 0 + switch msg := msgi.(type) { case *Subscription: // Ignore. case *Pong: - badConn = false // Ignore. case *Message: return msg, nil diff --git a/pubsub_test.go b/pubsub_test.go index dd24bc64..64938203 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -260,9 +260,9 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) defer pubsub.Close() - cn, _, err := pubsub.Pool().Get() + cn1, _, err := pubsub.Pool().Get() Expect(err).NotTo(HaveOccurred()) - cn.SetNetConn(&badConn{ + cn1.SetNetConn(&badConn{ readErr: errTimeout, writeErr: errTimeout, }) @@ -286,7 +286,7 @@ var _ = Describe("PubSub", func() { wg.Wait() }) - It("should not panic on Close", func() { + It("should return on Close", func() { pubsub, err := client.Subscribe("mychannel") Expect(err).NotTo(HaveOccurred()) defer pubsub.Close() @@ -297,13 +297,20 @@ var _ = Describe("PubSub", func() { defer GinkgoRecover() wg.Done() + _, err := pubsub.ReceiveMessage() Expect(err).To(MatchError("redis: client is closed")) + + wg.Done() }() + wg.Wait() + wg.Add(1) err = pubsub.Close() Expect(err).NotTo(HaveOccurred()) + + wg.Wait() }) })