Merge pull request #208 from go-redis/fix/import-receive-message

Improve ReceiveMessage.
This commit is contained in:
Vladimir Mihailenco 2015-12-02 16:23:50 +02:00
commit ba9dda7567
4 changed files with 46 additions and 31 deletions

View File

@ -49,7 +49,8 @@ func (c *Client) Multi() *Multi {
func (c *Multi) putConn(cn *conn, ei error) {
var err error
if isBadConn(cn, ei) {
err = c.base.connPool.Remove(nil) // nil to force removal
// Close current connection.
c.base.connPool.(*stickyConnPool).Reset()
} else {
err = c.base.connPool.Put(cn)
}

17
pool.go
View File

@ -137,7 +137,7 @@ func newConnPool(opt *Options) *connPool {
p := &connPool{
dialer: newConnDialer(opt),
rl: ratelimit.New(2*opt.getPoolSize(), time.Second),
rl: ratelimit.New(3*opt.getPoolSize(), time.Second),
opt: opt,
conns: newConnList(opt.getPoolSize()),
freeConns: make(chan *conn, opt.getPoolSize()),
@ -458,11 +458,7 @@ func (p *stickyConnPool) Remove(cn *conn) error {
if cn != nil && p.cn != cn {
panic("p.cn != cn")
}
if cn == nil {
return p.remove()
} else {
return nil
}
return nil
}
func (p *stickyConnPool) Len() int {
@ -483,6 +479,15 @@ func (p *stickyConnPool) FreeLen() int {
return 0
}
func (p *stickyConnPool) Reset() (err error) {
p.mx.Lock()
if p.cn != nil {
err = p.remove()
}
p.mx.Unlock()
return err
}
func (p *stickyConnPool) Close() error {
defer p.mx.Unlock()
p.mx.Lock()

View File

@ -235,7 +235,7 @@ func (c *PubSub) Receive() (interface{}, error) {
func (c *PubSub) reconnect() {
// Close current connection.
c.connPool.Remove(nil) // nil to force removal
c.connPool.(*stickyConnPool).Reset()
if len(c.channels) > 0 {
if err := c.Subscribe(c.channels...); err != nil {
@ -252,39 +252,41 @@ func (c *PubSub) reconnect() {
// ReceiveMessage returns a message or error. It automatically
// reconnects to Redis in case of network errors.
func (c *PubSub) ReceiveMessage() (*Message, error) {
var badConn bool
var errNum int
for {
msgi, err := c.ReceiveTimeout(5 * time.Second)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
if badConn {
c.reconnect()
badConn = false
continue
}
err := c.Ping("")
if err != nil {
c.reconnect()
} else {
badConn = true
}
continue
if !isNetworkError(err) {
return nil, err
}
if isNetworkError(err) {
c.reconnect()
continue
goodConn := errNum == 0
errNum++
if goodConn {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err := c.Ping("")
if err == nil {
continue
}
log.Printf("redis: PubSub.Ping failed: %s", err)
}
}
return nil, err
if errNum > 2 {
time.Sleep(time.Second)
}
c.reconnect()
continue
}
// Reset error number.
errNum = 0
switch msg := msgi.(type) {
case *Subscription:
// Ignore.
case *Pong:
badConn = false
// Ignore.
case *Message:
return msg, nil

View File

@ -260,9 +260,9 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
cn, _, err := pubsub.Pool().Get()
cn1, _, err := pubsub.Pool().Get()
Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{
cn1.SetNetConn(&badConn{
readErr: errTimeout,
writeErr: errTimeout,
})
@ -286,7 +286,7 @@ var _ = Describe("PubSub", func() {
wg.Wait()
})
It("should not panic on Close", func() {
It("should return on Close", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
@ -297,13 +297,20 @@ var _ = Describe("PubSub", func() {
defer GinkgoRecover()
wg.Done()
_, err := pubsub.ReceiveMessage()
Expect(err).To(MatchError("redis: client is closed"))
wg.Done()
}()
wg.Wait()
wg.Add(1)
err = pubsub.Close()
Expect(err).NotTo(HaveOccurred())
wg.Wait()
})
})