From 331e40dc6ccde2297df3e1b3c6b747dc4c6cc83a Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 14 Oct 2022 15:15:06 +0300 Subject: [PATCH] fix: reset pubsub state when unsubscribing from all channels --- pubsub.go | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/pubsub.go b/pubsub.go index 2bd156d..b64c8a4 100644 --- a/pubsub.go +++ b/pubsub.go @@ -238,9 +238,17 @@ func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error { c.mu.Lock() defer c.mu.Unlock() - for _, channel := range channels { - delete(c.channels, channel) + if len(channels) > 0 { + for _, channel := range channels { + delete(c.channels, channel) + } + } else { + // Unsubscribe from all channels. + for channel := range c.channels { + delete(c.channels, channel) + } } + err := c.subscribe(ctx, "unsubscribe", channels...) return err } @@ -251,9 +259,17 @@ func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error { c.mu.Lock() defer c.mu.Unlock() - for _, pattern := range patterns { - delete(c.patterns, pattern) + if len(patterns) > 0 { + for _, pattern := range patterns { + delete(c.patterns, pattern) + } + } else { + // Unsubscribe from all patterns. + for pattern := range c.patterns { + delete(c.patterns, pattern) + } } + err := c.subscribe(ctx, "punsubscribe", patterns...) return err } @@ -264,9 +280,17 @@ func (c *PubSub) SUnsubscribe(ctx context.Context, channels ...string) error { c.mu.Lock() defer c.mu.Unlock() - for _, channel := range channels { - delete(c.schannels, channel) + if len(channels) > 0 { + for _, channel := range channels { + delete(c.schannels, channel) + } + } else { + // Unsubscribe from all channels. + for channel := range c.schannels { + delete(c.schannels, channel) + } } + err := c.subscribe(ctx, "sunsubscribe", channels...) return err }