forked from mirror/redis
fix: reset pubsub state when unsubscribing from all channels
This commit is contained in:
parent
6a8cf3e9d8
commit
331e40dc6c
36
pubsub.go
36
pubsub.go
|
@ -238,9 +238,17 @@ func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
for _, channel := range channels {
|
if len(channels) > 0 {
|
||||||
delete(c.channels, channel)
|
for _, channel := range channels {
|
||||||
|
delete(c.channels, channel)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Unsubscribe from all channels.
|
||||||
|
for channel := range c.channels {
|
||||||
|
delete(c.channels, channel)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.subscribe(ctx, "unsubscribe", channels...)
|
err := c.subscribe(ctx, "unsubscribe", channels...)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -251,9 +259,17 @@ func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
for _, pattern := range patterns {
|
if len(patterns) > 0 {
|
||||||
delete(c.patterns, pattern)
|
for _, pattern := range patterns {
|
||||||
|
delete(c.patterns, pattern)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Unsubscribe from all patterns.
|
||||||
|
for pattern := range c.patterns {
|
||||||
|
delete(c.patterns, pattern)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.subscribe(ctx, "punsubscribe", patterns...)
|
err := c.subscribe(ctx, "punsubscribe", patterns...)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -264,9 +280,17 @@ func (c *PubSub) SUnsubscribe(ctx context.Context, channels ...string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
for _, channel := range channels {
|
if len(channels) > 0 {
|
||||||
delete(c.schannels, channel)
|
for _, channel := range channels {
|
||||||
|
delete(c.schannels, channel)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Unsubscribe from all channels.
|
||||||
|
for channel := range c.schannels {
|
||||||
|
delete(c.schannels, channel)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.subscribe(ctx, "sunsubscribe", channels...)
|
err := c.subscribe(ctx, "sunsubscribe", channels...)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue