From 34ea5f98ebe4e3e4df85809ecc5818bb9f5e77c3 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 11 Apr 2017 16:18:35 +0300 Subject: [PATCH] Add Channel helper --- pubsub.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pubsub.go b/pubsub.go index b90d727..e681949 100644 --- a/pubsub.go +++ b/pubsub.go @@ -280,6 +280,26 @@ func (c *PubSub) resubscribe() { } } +// Channel returns a channel for concurrently receiving messages. +// The channel is closed with PubSub. +func (c *PubSub) Channel() <-chan *Message { + ch := make(chan *Message, 100) + go func() { + for { + msg, err := c.ReceiveMessage() + if err != nil { + if err == pool.ErrClosed { + break + } + continue + } + ch <- msg + } + close(ch) + }() + return ch +} + func remove(ss []string, es ...string) []string { if len(es) == 0 { return ss[:0]