Merge pull request #1463 from go-redis/fix/sub-renewal

Faster renew the subscription
This commit is contained in:
Vladimir Mihailenco 2020-09-05 11:16:18 +03:00 committed by GitHub
commit fe55405454
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 13 deletions

View File

@ -163,6 +163,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
} }
} }
p.connsMu.Unlock() p.connsMu.Unlock()
return cn, nil return cn, nil
} }
@ -408,8 +409,10 @@ func (p *ConnPool) closed() bool {
} }
func (p *ConnPool) Filter(fn func(*Conn) bool) error { func (p *ConnPool) Filter(fn func(*Conn) bool) error {
var firstErr error
p.connsMu.Lock() p.connsMu.Lock()
defer p.connsMu.Unlock()
var firstErr error
for _, cn := range p.conns { for _, cn := range p.conns {
if fn(cn) { if fn(cn) {
if err := p.closeConn(cn); err != nil && firstErr == nil { if err := p.closeConn(cn); err != nil && firstErr == nil {
@ -417,7 +420,6 @@ func (p *ConnPool) Filter(fn func(*Conn) bool) error {
} }
} }
} }
p.connsMu.Unlock()
return firstErr return firstErr
} }

View File

@ -13,7 +13,10 @@ import (
"github.com/go-redis/redis/v8/internal/proto" "github.com/go-redis/redis/v8/internal/proto"
) )
const pingTimeout = 30 * time.Second const (
pingTimeout = time.Second
chanSendTimeout = time.Minute
)
var errPingTimeout = errors.New("redis: ping timeout") var errPingTimeout = errors.New("redis: ping timeout")
@ -454,7 +457,6 @@ func (c *PubSub) getContext() context.Context {
if c.cmd != nil { if c.cmd != nil {
return c.cmd.ctx return c.cmd.ctx
} }
return context.Background() return context.Background()
} }
@ -462,7 +464,7 @@ func (c *PubSub) initPing() {
ctx := context.TODO() ctx := context.TODO()
c.ping = make(chan struct{}, 1) c.ping = make(chan struct{}, 1)
go func() { go func() {
timer := time.NewTimer(pingTimeout) timer := time.NewTimer(time.Minute)
timer.Stop() timer.Stop()
healthy := true healthy := true
@ -499,7 +501,7 @@ func (c *PubSub) initMsgChan(size int) {
ctx := context.TODO() ctx := context.TODO()
c.msgCh = make(chan *Message, size) c.msgCh = make(chan *Message, size)
go func() { go func() {
timer := time.NewTimer(pingTimeout) timer := time.NewTimer(time.Minute)
timer.Stop() timer.Stop()
var errCount int var errCount int
@ -531,7 +533,7 @@ func (c *PubSub) initMsgChan(size int) {
case *Pong: case *Pong:
// Ignore. // Ignore.
case *Message: case *Message:
timer.Reset(pingTimeout) timer.Reset(chanSendTimeout)
select { select {
case c.msgCh <- msg: case c.msgCh <- msg:
if !timer.Stop() { if !timer.Stop() {
@ -540,7 +542,10 @@ func (c *PubSub) initMsgChan(size int) {
case <-timer.C: case <-timer.C:
internal.Logger.Printf( internal.Logger.Printf(
c.getContext(), c.getContext(),
"redis: %s channel is full for %s (message is dropped)", c, pingTimeout) "redis: %s channel is full for %s (message is dropped)",
c,
chanSendTimeout,
)
} }
default: default:
internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg) internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg)

View File

@ -70,12 +70,12 @@ var _ = Describe("Sentinel", func() {
return client.Get(ctx, "foo").Err() return client.Get(ctx, "foo").Err()
}, "15s", "100ms").ShouldNot(HaveOccurred()) }, "15s", "100ms").ShouldNot(HaveOccurred())
// Publish message to check if subscription is renewed. // Check if subscription is renewed.
err = client.Publish(ctx, "foo", "hello").Err()
Expect(err).NotTo(HaveOccurred())
var msg *redis.Message var msg *redis.Message
Eventually(ch, "15s").Should(Receive(&msg)) Eventually(func() <-chan *redis.Message {
_ = client.Publish(ctx, "foo", "hello").Err()
return ch
}, "15s").Should(Receive(&msg))
Expect(msg.Channel).To(Equal("foo")) Expect(msg.Channel).To(Equal("foo"))
Expect(msg.Payload).To(Equal("hello")) Expect(msg.Payload).To(Equal("hello"))
}) })