Merge pull request #658 from go-redis/fix/chan-once

Create PubSub channel once
This commit is contained in:
Vladimir Mihailenco 2017-10-30 13:22:25 +03:00 committed by GitHub
commit 75ca398534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 15 deletions

View File

@ -29,6 +29,9 @@ type PubSub struct {
closed bool closed bool
cmd *Cmd cmd *Cmd
chOnce sync.Once
ch chan *Message
} }
func (c *PubSub) conn() (*pool.Conn, error) { func (c *PubSub) conn() (*pool.Conn, error) {
@ -346,24 +349,27 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
} }
} }
// Channel returns a channel for concurrently receiving messages. // Channel returns a Go channel for concurrently receiving messages.
// The channel is closed with PubSub. // The channel is closed with PubSub. Receive or ReceiveMessage APIs
// can not be used after channel is created.
func (c *PubSub) Channel() <-chan *Message { func (c *PubSub) Channel() <-chan *Message {
ch := make(chan *Message, 100) c.chOnce.Do(func() {
go func() { c.ch = make(chan *Message, 100)
for { go func() {
msg, err := c.ReceiveMessage() for {
if err != nil { msg, err := c.ReceiveMessage()
if err == pool.ErrClosed { if err != nil {
break if err == pool.ErrClosed {
break
}
continue
} }
continue c.ch <- msg
} }
ch <- msg close(c.ch)
} }()
close(ch) })
}() return c.ch
return ch
} }
func appendIfNotExists(ss []string, es ...string) []string { func appendIfNotExists(ss []string, es ...string) []string {