diff --git a/example_test.go b/example_test.go index b54e043a..25869bea 100644 --- a/example_test.go +++ b/example_test.go @@ -165,13 +165,11 @@ func ExampleMulti() { } func ExamplePubSub() { - pubsub := client.PubSub() - defer pubsub.Close() - - err := pubsub.Subscribe("mychannel") + pubsub, err := client.Subscribe("mychannel") if err != nil { panic(err) } + defer pubsub.Close() err = client.Publish("mychannel", "hello").Err() if err != nil { diff --git a/pubsub.go b/pubsub.go index e143448b..1f4f5b63 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,12 +5,20 @@ import ( "time" ) +// Posts a message to the given channel. +func (c *Client) Publish(channel, message string) *IntCmd { + req := NewIntCmd("PUBLISH", channel, message) + c.Process(req) + return req +} + // PubSub implements Pub/Sub commands as described in // http://redis.io/topics/pubsub. type PubSub struct { *baseClient } +// Deprecated. Use Subscribe/PSubscribe instead. func (c *Client) PubSub() *PubSub { return &PubSub{ baseClient: &baseClient{ @@ -20,10 +28,16 @@ func (c *Client) PubSub() *PubSub { } } -func (c *Client) Publish(channel, message string) *IntCmd { - req := NewIntCmd("PUBLISH", channel, message) - c.Process(req) - return req +// Subscribes the client to the specified channels. +func (c *Client) Subscribe(channels ...string) (*PubSub, error) { + pubsub := c.PubSub() + return pubsub, pubsub.Subscribe(channels...) +} + +// Subscribes the client to the given patterns. +func (c *Client) PSubscribe(channels ...string) (*PubSub, error) { + pubsub := c.PubSub() + return pubsub, pubsub.PSubscribe(channels...) } func (c *PubSub) Ping(payload string) error { @@ -40,6 +54,20 @@ func (c *PubSub) Ping(payload string) error { return cn.writeCmds(cmd) } +// Message received after a successful subscription to channel. +type Subscription struct { + // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". + Kind string + // Channel name we have subscribed to. + Channel string + // Number of channels we are currently subscribed to. + Count int +} + +func (m *Subscription) String() string { + return fmt.Sprintf("%s: %s", m.Kind, m.Channel) +} + // Message received as result of a PUBLISH command issued by another client. type Message struct { Channel string @@ -74,20 +102,8 @@ func (p *Pong) String() string { return "Pong" } -// Message received after a successful subscription to channel. -type Subscription struct { - // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". - Kind string - // Channel name we have subscribed to. - Channel string - // Number of channels we are currently subscribed to. - Count int -} - -func (m *Subscription) String() string { - return fmt.Sprintf("%s: %s", m.Kind, m.Channel) -} - +// Returns a message as a Subscription, Message, PMessage, Pong or +// error. See PubSub example for details. func (c *PubSub) Receive() (interface{}, error) { return c.ReceiveTimeout(0) } @@ -120,6 +136,8 @@ func newMessage(reply []interface{}) (interface{}, error) { } } +// ReceiveTimeout acts like Receive but returns an error if message +// is not received in time. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { cn, err := c.conn() if err != nil { @@ -149,18 +167,24 @@ func (c *PubSub) subscribe(cmd string, channels ...string) error { return cn.writeCmds(req) } +// Subscribes the client to the specified channels. func (c *PubSub) Subscribe(channels ...string) error { return c.subscribe("SUBSCRIBE", channels...) } +// Subscribes the client to the given patterns. func (c *PubSub) PSubscribe(patterns ...string) error { return c.subscribe("PSUBSCRIBE", patterns...) } +// Unsubscribes the client from the given channels, or from all of +// them if none is given. func (c *PubSub) Unsubscribe(channels ...string) error { return c.subscribe("UNSUBSCRIBE", channels...) } +// Unsubscribes the client from the given patterns, or from all of +// them if none is given. func (c *PubSub) PUnsubscribe(patterns ...string) error { return c.subscribe("PUNSUBSCRIBE", patterns...) } diff --git a/pubsub_test.go b/pubsub_test.go index bf59a2cd..ac1d629b 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -12,27 +12,26 @@ import ( var _ = Describe("PubSub", func() { var client *redis.Client - var pubsub *redis.PubSub BeforeEach(func() { client = redis.NewClient(&redis.Options{ Addr: redisAddr, }) Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) - pubsub = client.PubSub() }) AfterEach(func() { - Expect(pubsub.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred()) }) It("should support pattern matching", func() { - Expect(pubsub.PSubscribe("mychannel*")).NotTo(HaveOccurred()) + pubsub, err := client.PSubscribe("mychannel*") + Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() - pub := client.Publish("mychannel1", "hello") - Expect(pub.Err()).NotTo(HaveOccurred()) - Expect(pub.Val()).To(Equal(int64(1))) + n, err := client.Publish("mychannel1", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred()) @@ -75,7 +74,9 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) Expect(channels).To(BeEmpty()) - Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) + pubsub, err := client.Subscribe("mychannel", "mychannel2") + Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() channels, err = client.PubSubChannels("mychannel*").Result() Expect(err).NotTo(HaveOccurred()) @@ -91,7 +92,9 @@ var _ = Describe("PubSub", func() { }) It("should return the numbers of subscribers", func() { - Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) + pubsub, err := client.Subscribe("mychannel", "mychannel2") + Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result() Expect(err).NotTo(HaveOccurred()) @@ -107,7 +110,9 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) Expect(num).To(Equal(int64(0))) - Expect(pubsub.PSubscribe("*")).NotTo(HaveOccurred()) + pubsub, err := client.PSubscribe("*") + Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() num, err = client.PubSubNumPat().Result() Expect(err).NotTo(HaveOccurred()) @@ -115,15 +120,17 @@ var _ = Describe("PubSub", func() { }) It("should pub/sub", func() { - Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) + pubsub, err := client.Subscribe("mychannel", "mychannel2") + Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() - pub := client.Publish("mychannel", "hello") - Expect(pub.Err()).NotTo(HaveOccurred()) - Expect(pub.Val()).To(Equal(int64(1))) + n, err := client.Publish("mychannel", "hello").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) - pub = client.Publish("mychannel2", "hello2") - Expect(pub.Err()).NotTo(HaveOccurred()) - Expect(pub.Val()).To(Equal(int64(1))) + n, err = client.Publish("mychannel2", "hello2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(1))) Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) @@ -187,8 +194,9 @@ var _ = Describe("PubSub", func() { }) It("should ping/pong", func() { - err := pubsub.Subscribe("mychannel") + pubsub, err := client.Subscribe("mychannel") Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() _, err = pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) @@ -203,8 +211,9 @@ var _ = Describe("PubSub", func() { }) It("should ping/pong with payload", func() { - err := pubsub.Subscribe("mychannel") + pubsub, err := client.Subscribe("mychannel") Expect(err).NotTo(HaveOccurred()) + defer pubsub.Close() _, err = pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred())