From c357d186240858f2f43bfbc8a0afe019982216ed Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 5 Sep 2020 10:56:09 +0300 Subject: [PATCH] Faster renew the subscription --- internal/pool/pool.go | 6 ++++-- pubsub.go | 17 +++++++++++------ sentinel_test.go | 10 +++++----- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/internal/pool/pool.go b/internal/pool/pool.go index d01a4e1e..355742bf 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -163,6 +163,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { } } p.connsMu.Unlock() + return cn, nil } @@ -408,8 +409,10 @@ func (p *ConnPool) closed() bool { } func (p *ConnPool) Filter(fn func(*Conn) bool) error { - var firstErr error p.connsMu.Lock() + defer p.connsMu.Unlock() + + var firstErr error for _, cn := range p.conns { if fn(cn) { if err := p.closeConn(cn); err != nil && firstErr == nil { @@ -417,7 +420,6 @@ func (p *ConnPool) Filter(fn func(*Conn) bool) error { } } } - p.connsMu.Unlock() return firstErr } diff --git a/pubsub.go b/pubsub.go index 61735032..1dee8eb2 100644 --- a/pubsub.go +++ b/pubsub.go @@ -13,7 +13,10 @@ import ( "github.com/go-redis/redis/v8/internal/proto" ) -const pingTimeout = 30 * time.Second +const ( + pingTimeout = time.Second + chanSendTimeout = time.Minute +) var errPingTimeout = errors.New("redis: ping timeout") @@ -454,7 +457,6 @@ func (c *PubSub) getContext() context.Context { if c.cmd != nil { return c.cmd.ctx } - return context.Background() } @@ -462,7 +464,7 @@ func (c *PubSub) initPing() { ctx := context.TODO() c.ping = make(chan struct{}, 1) go func() { - timer := time.NewTimer(pingTimeout) + timer := time.NewTimer(time.Minute) timer.Stop() healthy := true @@ -499,7 +501,7 @@ func (c *PubSub) initMsgChan(size int) { ctx := context.TODO() c.msgCh = make(chan *Message, size) go func() { - timer := time.NewTimer(pingTimeout) + timer := time.NewTimer(time.Minute) timer.Stop() var errCount int @@ -531,7 +533,7 @@ func (c *PubSub) initMsgChan(size int) { case *Pong: // Ignore. case *Message: - timer.Reset(pingTimeout) + timer.Reset(chanSendTimeout) select { case c.msgCh <- msg: if !timer.Stop() { @@ -540,7 +542,10 @@ func (c *PubSub) initMsgChan(size int) { case <-timer.C: internal.Logger.Printf( c.getContext(), - "redis: %s channel is full for %s (message is dropped)", c, pingTimeout) + "redis: %s channel is full for %s (message is dropped)", + c, + chanSendTimeout, + ) } default: internal.Logger.Printf(c.getContext(), "redis: unknown message type: %T", msg) diff --git a/sentinel_test.go b/sentinel_test.go index 80395c9e..a4d0fe22 100644 --- a/sentinel_test.go +++ b/sentinel_test.go @@ -70,12 +70,12 @@ var _ = Describe("Sentinel", func() { return client.Get(ctx, "foo").Err() }, "15s", "100ms").ShouldNot(HaveOccurred()) - // Publish message to check if subscription is renewed. - err = client.Publish(ctx, "foo", "hello").Err() - Expect(err).NotTo(HaveOccurred()) - + // Check if subscription is renewed. var msg *redis.Message - Eventually(ch, "15s").Should(Receive(&msg)) + Eventually(func() <-chan *redis.Message { + _ = client.Publish(ctx, "foo", "hello").Err() + return ch + }, "15s").Should(Receive(&msg)) Expect(msg.Channel).To(Equal("foo")) Expect(msg.Payload).To(Equal("hello")) })