From 15f14b83059e8b093b47376e2cd435035bf65970 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Mon, 30 Oct 2017 12:09:57 +0200 Subject: [PATCH] Create PubSub channel once --- pubsub.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/pubsub.go b/pubsub.go index e754a16..01f8a61 100644 --- a/pubsub.go +++ b/pubsub.go @@ -29,6 +29,9 @@ type PubSub struct { closed bool cmd *Cmd + + chOnce sync.Once + ch chan *Message } func (c *PubSub) conn() (*pool.Conn, error) { @@ -346,24 +349,27 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) { } } -// Channel returns a channel for concurrently receiving messages. -// The channel is closed with PubSub. +// Channel returns a Go channel for concurrently receiving messages. +// The channel is closed with PubSub. Receive or ReceiveMessage APIs +// can not be used after channel is created. func (c *PubSub) Channel() <-chan *Message { - ch := make(chan *Message, 100) - go func() { - for { - msg, err := c.ReceiveMessage() - if err != nil { - if err == pool.ErrClosed { - break + c.chOnce.Do(func() { + c.ch = make(chan *Message, 100) + go func() { + for { + msg, err := c.ReceiveMessage() + if err != nil { + if err == pool.ErrClosed { + break + } + continue } - continue + c.ch <- msg } - ch <- msg - } - close(ch) - }() - return ch + close(c.ch) + }() + }) + return c.ch } func appendIfNotExists(ss []string, es ...string) []string {