forked from mirror/redis
pubsub: log an error on reconnect
This commit is contained in:
parent
96d1b85009
commit
316917d99f
42
pubsub.go
42
pubsub.go
|
@ -131,24 +131,27 @@ func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if internal.IsBadConn(err, allowTimeout) {
|
if internal.IsBadConn(err, allowTimeout) {
|
||||||
c._reconnect()
|
c._reconnect(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PubSub) _closeTheCn() error {
|
func (c *PubSub) _reconnect(reason error) {
|
||||||
var err error
|
_ = c._closeTheCn(reason)
|
||||||
if c.cn != nil {
|
|
||||||
err = c.closeConn(c.cn)
|
|
||||||
c.cn = nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *PubSub) _reconnect() {
|
|
||||||
_ = c._closeTheCn()
|
|
||||||
_, _ = c._conn(nil)
|
_, _ = c._conn(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *PubSub) _closeTheCn(reason error) error {
|
||||||
|
if c.cn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !c.closed {
|
||||||
|
internal.Logf("redis: discarding bad PubSub connection: %s", reason)
|
||||||
|
}
|
||||||
|
err := c.closeConn(c.cn)
|
||||||
|
c.cn = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *PubSub) Close() error {
|
func (c *PubSub) Close() error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
@ -159,7 +162,7 @@ func (c *PubSub) Close() error {
|
||||||
c.closed = true
|
c.closed = true
|
||||||
close(c.exit)
|
close(c.exit)
|
||||||
|
|
||||||
err := c._closeTheCn()
|
err := c._closeTheCn(pool.ErrClosed)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,22 +435,23 @@ func (c *PubSub) initChannel() {
|
||||||
timer := time.NewTimer(timeout)
|
timer := time.NewTimer(timeout)
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
|
||||||
var hasPing bool
|
var healthy bool
|
||||||
|
var pingErr error
|
||||||
for {
|
for {
|
||||||
timer.Reset(timeout)
|
timer.Reset(timeout)
|
||||||
select {
|
select {
|
||||||
case <-c.ping:
|
case <-c.ping:
|
||||||
hasPing = true
|
healthy = true
|
||||||
if !timer.Stop() {
|
if !timer.Stop() {
|
||||||
<-timer.C
|
<-timer.C
|
||||||
}
|
}
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
if hasPing {
|
if healthy {
|
||||||
hasPing = false
|
healthy = false
|
||||||
_ = c.Ping()
|
pingErr = c.Ping()
|
||||||
} else {
|
} else {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
c._reconnect()
|
c._reconnect(pingErr)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
case <-c.exit:
|
case <-c.exit:
|
||||||
|
|
Loading…
Reference in New Issue