diff --git a/pubsub.go b/pubsub.go index b90d7271..e681949b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -280,6 +280,26 @@ func (c *PubSub) resubscribe() { } } +// Channel returns a channel for concurrently receiving messages. +// The channel is closed with PubSub. +func (c *PubSub) Channel() <-chan *Message { + ch := make(chan *Message, 100) + go func() { + for { + msg, err := c.ReceiveMessage() + if err != nil { + if err == pool.ErrClosed { + break + } + continue + } + ch <- msg + } + close(ch) + }() + return ch +} + func remove(ss []string, es ...string) []string { if len(es) == 0 { return ss[:0]