diff --git a/pubsub.go b/pubsub.go index 0afb47cd..e1855119 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,6 +3,7 @@ package redis import ( "errors" "fmt" + "strings" "sync" "time" @@ -29,8 +30,9 @@ type PubSub struct { cn *pool.Conn channels map[string]struct{} patterns map[string]struct{} - closed bool - exit chan struct{} + + closed bool + exit chan struct{} cmd *Cmd @@ -39,6 +41,12 @@ type PubSub struct { ping chan struct{} } +func (c *PubSub) String() string { + channels := mapKeys(c.channels) + channels = append(channels, mapKeys(c.patterns)...) + return fmt.Sprintf("PubSub(%s)", strings.Join(channels, ", ")) +} + func (c *PubSub) init() { c.exit = make(chan struct{}) } @@ -389,16 +397,23 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { // It periodically sends Ping messages to test connection health. // The channel is closed with PubSub. Receive* APIs can not be used // after channel is created. +// +// If the Go channel is full for 30 seconds the message is dropped. func (c *PubSub) Channel() <-chan *Message { c.chOnce.Do(c.initChannel) return c.ch } func (c *PubSub) initChannel() { + const timeout = 30 * time.Second + c.ch = make(chan *Message, 100) - c.ping = make(chan struct{}, 10) + c.ping = make(chan struct{}, 1) go func() { + timer := time.NewTimer(timeout) + timer.Stop() + var errCount int for { msg, err := c.Receive() @@ -413,6 +428,7 @@ func (c *PubSub) initChannel() { errCount++ continue } + errCount = 0 // Any message is as good as a ping. @@ -427,16 +443,24 @@ func (c *PubSub) initChannel() { case *Pong: // Ignore. case *Message: - c.ch <- msg + timer.Reset(timeout) + select { + case c.ch <- msg: + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + internal.Logf( + "redis: %s channel is full for %s (message is dropped)", + c, timeout) + } default: - internal.Logf("redis: unknown message: %T", msg) + internal.Logf("redis: unknown message type: %T", msg) } } }() go func() { - const timeout = 5 * time.Second - timer := time.NewTimer(timeout) timer.Stop() diff --git a/pubsub_test.go b/pubsub_test.go index 7f0021fd..2d072fa2 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -27,6 +27,13 @@ var _ = Describe("PubSub", func() { Expect(client.Close()).NotTo(HaveOccurred()) }) + It("implements Stringer", func() { + pubsub := client.PSubscribe("mychannel*") + defer pubsub.Close() + + Expect(pubsub.String()).To(Equal("PubSub(mychannel*)")) + }) + It("should support pattern matching", func() { pubsub := client.PSubscribe("mychannel*") defer pubsub.Close()