From b94bde306e64bcbcede4601103f1f272d6159a74 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 3 Sep 2021 12:57:34 +0300 Subject: [PATCH] Fix/pubsub ping mutex (#1878) * Fix PubSub.Ping to hold the lock * Fix PubSub.Ping to hold the lock * add write cmd data-race test Signed-off-by: monkey92t Co-authored-by: monkey92t --- pubsub.go | 10 ++++++++-- race_test.go | 12 ++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pubsub.go b/pubsub.go index c6ffb256..c5912407 100644 --- a/pubsub.go +++ b/pubsub.go @@ -252,13 +252,16 @@ func (c *PubSub) Ping(ctx context.Context, payload ...string) error { } cmd := NewCmd(ctx, args...) - cn, err := c.connWithLock(ctx) + c.mu.Lock() + defer c.mu.Unlock() + + cn, err := c.conn(ctx, nil) if err != nil { return err } err = c.writeCmd(ctx, cn, cmd) - c.releaseConnWithLock(ctx, cn, err, false) + c.releaseConn(ctx, cn, err, false) return err } @@ -361,6 +364,8 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int c.cmd = NewCmd(ctx) } + // Don't hold the lock to allow subscriptions and pings. + cn, err := c.connWithLock(ctx) if err != nil { return nil, err @@ -371,6 +376,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int }) c.releaseConnWithLock(ctx, cn, err, timeout > 0) + if err != nil { return nil, err } diff --git a/race_test.go b/race_test.go index a1c2a080..c9428d91 100644 --- a/race_test.go +++ b/race_test.go @@ -373,6 +373,18 @@ var _ = Describe("cluster races", func() { Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal(int64(C * N))) }) + + It("write cmd data-race", func() { + pubsub := client.Subscribe(ctx) + defer pubsub.Close() + + pubsub.Channel(redis.WithChannelHealthCheckInterval(time.Millisecond)) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("channel_%d", i) + pubsub.Subscribe(ctx, key) + pubsub.Unsubscribe(ctx, key) + } + }) }) func bigVal() []byte {