diff --git a/pubsub.go b/pubsub.go index 0afb47c..f2a03bc 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,6 +3,7 @@ package redis import ( "errors" "fmt" + "strings" "sync" "time" @@ -29,8 +30,9 @@ type PubSub struct { cn *pool.Conn channels map[string]struct{} patterns map[string]struct{} - closed bool - exit chan struct{} + + closed bool + exit chan struct{} cmd *Cmd @@ -39,6 +41,12 @@ type PubSub struct { ping chan struct{} } +func (c *PubSub) String() string { + channels := mapKeys(c.channels) + channels = append(channels, mapKeys(c.patterns)...) + return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", ")) +} + func (c *PubSub) init() { c.exit = make(chan struct{}) } @@ -389,16 +397,39 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { // It periodically sends Ping messages to test connection health. // The channel is closed with PubSub. Receive* APIs can not be used // after channel is created. +// +// If the Go channel is full for 30 seconds the message is dropped. func (c *PubSub) Channel() <-chan *Message { - c.chOnce.Do(c.initChannel) + return c.channel(100) +} + +// ChannelSize is like Channel, but creates a Go channel +// with specified buffer size. +func (c *PubSub) ChannelSize(size int) <-chan *Message { + return c.channel(size) +} + +func (c *PubSub) channel(size int) <-chan *Message { + c.chOnce.Do(func() { + c.initChannel(size) + }) + if cap(c.ch) != size { + err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size") + panic(err) + } return c.ch } -func (c *PubSub) initChannel() { - c.ch = make(chan *Message, 100) - c.ping = make(chan struct{}, 10) +func (c *PubSub) initChannel(size int) { + const timeout = 30 * time.Second + + c.ch = make(chan *Message, size) + c.ping = make(chan struct{}, 1) go func() { + timer := time.NewTimer(timeout) + timer.Stop() + var errCount int for { msg, err := c.Receive() @@ -413,6 +444,7 @@ func (c *PubSub) initChannel() { errCount++ continue } + errCount = 0 // Any message is as good as a ping. @@ -427,16 +459,24 @@ func (c *PubSub) initChannel() { case *Pong: // Ignore. case *Message: - c.ch <- msg + timer.Reset(timeout) + select { + case c.ch <- msg: + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + internal.Logf( + "redis: %s channel is full for %s (message is dropped)", + c, timeout) + } default: - internal.Logf("redis: unknown message: %T", msg) + internal.Logf("redis: unknown message type: %T", msg) } } }() go func() { - const timeout = 5 * time.Second - timer := time.NewTimer(timeout) timer.Stop() diff --git a/pubsub_test.go b/pubsub_test.go index 7f0021f..2d072fa 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -27,6 +27,13 @@ var _ = Describe("PubSub", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) + It("implements Stringer", func() { + pubsub := client.PSubscribe("mychannel*") + defer pubsub.Close() + + Expect(pubsub.String()).To(Equal("PubSub(mychannel*)")) + }) + It("should support pattern matching", func() { pubsub := client.PSubscribe("mychannel*") defer pubsub.Close()