Merge pull request #2252 from go-redis/fix/unsub-all

fix: reset pubsub state when unsubscribing from all channels
This commit is contained in:
Vladimir Mihailenco 2022-10-14 15:17:02 +03:00 committed by GitHub
commit c306a51f21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 30 additions and 6 deletions

View File

@ -238,9 +238,17 @@ func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, channel := range channels {
delete(c.channels, channel)
if len(channels) > 0 {
for _, channel := range channels {
delete(c.channels, channel)
}
} else {
// Unsubscribe from all channels.
for channel := range c.channels {
delete(c.channels, channel)
}
}
err := c.subscribe(ctx, "unsubscribe", channels...)
return err
}
@ -251,9 +259,17 @@ func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, pattern := range patterns {
delete(c.patterns, pattern)
if len(patterns) > 0 {
for _, pattern := range patterns {
delete(c.patterns, pattern)
}
} else {
// Unsubscribe from all patterns.
for pattern := range c.patterns {
delete(c.patterns, pattern)
}
}
err := c.subscribe(ctx, "punsubscribe", patterns...)
return err
}
@ -264,9 +280,17 @@ func (c *PubSub) SUnsubscribe(ctx context.Context, channels ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, channel := range channels {
delete(c.schannels, channel)
if len(channels) > 0 {
for _, channel := range channels {
delete(c.schannels, channel)
}
} else {
// Unsubscribe from all channels.
for channel := range c.schannels {
delete(c.schannels, channel)
}
}
err := c.subscribe(ctx, "sunsubscribe", channels...)
return err
}