Add PubSub.ChannelSize

This commit is contained in:
Vladimir Mihailenco 2019-03-12 12:48:32 +02:00
parent 21913a8304
commit b6fdeca648
1 changed files with 19 additions and 3 deletions

View File

@ -400,14 +400,30 @@ func (c *PubSub) ReceiveMessage() (*Message, error) {
//
// If the Go channel is full for 30 seconds the message is dropped.
func (c *PubSub) Channel() <-chan *Message {
c.chOnce.Do(c.initChannel)
return c.channel(100)
}
// ChannelSize is like Channel, but creates a Go channel
// with specified buffer size.
func (c *PubSub) ChannelSize(size int) <-chan *Message {
return c.channel(size)
}
func (c *PubSub) channel(size int) <-chan *Message {
c.chOnce.Do(func() {
c.initChannel(size)
})
if cap(c.ch) != size {
err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size")
panic(err)
}
return c.ch
}
func (c *PubSub) initChannel() {
func (c *PubSub) initChannel(size int) {
const timeout = 30 * time.Second
c.ch = make(chan *Message, 100)
c.ch = make(chan *Message, size)
c.ping = make(chan struct{}, 1)
go func() {