diff --git a/pubsub.go b/pubsub.go index 23f66616..93d5b388 100644 --- a/pubsub.go +++ b/pubsub.go @@ -28,14 +28,15 @@ func newPubSubClient(client *Client) (*PubSubClient, error) { } type Message struct { - Name, Channel, Message string - Number int64 + Name, Channel, ChannelPattern, Message string + Number int64 Err error } func (c *PubSubClient) consumeMessages() { conn, err := c.conn() + // SignleConnPool never returns error. if err != nil { panic(err) } @@ -55,7 +56,7 @@ func (c *PubSubClient) consumeMessages() { msgName := reply[0].(string) switch msgName { - case "subscribe", "unsubscribe": + case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": msg.Name = msgName msg.Channel = reply[1].(string) msg.Number = reply[2].(int64) @@ -63,6 +64,11 @@ func (c *PubSubClient) consumeMessages() { msg.Name = msgName msg.Channel = reply[1].(string) msg.Message = reply[2].(string) + case "pmessage": + msg.Name = msgName + msg.ChannelPattern = reply[1].(string) + msg.Channel = reply[2].(string) + msg.Message = reply[3].(string) default: msg.Err = fmt.Errorf("Unsupported message name: %q.", msgName) } @@ -75,8 +81,8 @@ func (c *PubSubClient) consumeMessages() { } } -func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) { - args := append([]string{"SUBSCRIBE"}, channels...) +func (c *PubSubClient) subscribe(cmd string, channels ...string) (chan *Message, error) { + args := append([]string{cmd}, channels...) req := NewMultiBulkReq(args...) conn, err := c.conn() @@ -95,8 +101,16 @@ func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) { return c.ch, nil } -func (c *PubSubClient) Unsubscribe(channels ...string) error { - args := append([]string{"UNSUBSCRIBE"}, channels...) +func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) { + return c.subscribe("SUBSCRIBE", channels...) +} + +func (c *PubSubClient) PSubscribe(patterns ...string) (chan *Message, error) { + return c.subscribe("PSUBSCRIBE", patterns...) +} + +func (c *PubSubClient) unsubscribe(cmd string, channels ...string) error { + args := append([]string{cmd}, channels...) req := NewMultiBulkReq(args...) conn, err := c.conn() @@ -106,3 +120,11 @@ func (c *PubSubClient) Unsubscribe(channels ...string) error { return c.WriteReq(req.Req(), conn) } + +func (c *PubSubClient) Unsubscribe(channels ...string) error { + return c.unsubscribe("UNSUBSCRIBE", channels...) +} + +func (c *PubSubClient) PUnsubscribe(patterns ...string) error { + return c.unsubscribe("PUNSUBSCRIBE", patterns...) +} diff --git a/redis_test.go b/redis_test.go index 26985b0a..90abd961 100644 --- a/redis_test.go +++ b/redis_test.go @@ -13,7 +13,7 @@ import ( "github.com/vmihailenco/redis" ) -const redisAddr = ":6379" +const redisAddr = ":8888" //------------------------------------------------------------------------------ @@ -1608,17 +1608,64 @@ func (t *RedisTest) TestZUnionStore(c *C) { //------------------------------------------------------------------------------ +func (t *RedisTest) TestPatternPubSub(c *C) { + pubsub, err := t.client.PubSubClient() + c.Check(err, IsNil) + + ch, err := pubsub.PSubscribe("mychannel*") + c.Check(err, IsNil) + c.Check(ch, Not(IsNil)) + + pub := t.client.Publish("mychannel1", "hello") + c.Check(pub.Err(), IsNil) + c.Check(pub.Val(), Equals, int64(1)) + + err = pubsub.PUnsubscribe("mychannel*") + c.Check(err, IsNil) + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "psubscribe") + c.Check(msg.Channel, Equals, "mychannel*") + c.Check(msg.Number, Equals, int64(1)) + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "pmessage") + c.Check(msg.ChannelPattern, Equals, "mychannel*") + c.Check(msg.Channel, Equals, "mychannel1") + c.Check(msg.Message, Equals, "hello") + case <-time.After(time.Second): + c.Error("Channel is empty.") + } + + select { + case msg := <-ch: + c.Check(msg.Err, Equals, nil) + c.Check(msg.Name, Equals, "punsubscribe") + c.Check(msg.Channel, Equals, "mychannel*") + c.Check(msg.Number, Equals, int64(0)) + case <-time.After(time.Second): + c.Error("Channel is empty.") + } +} + func (t *RedisTest) TestPubSub(c *C) { pubsub, err := t.client.PubSubClient() c.Check(err, IsNil) ch, err := pubsub.Subscribe("mychannel") c.Check(err, IsNil) - c.Check(ch, Not(Equals), nil) + c.Check(ch, Not(IsNil)) - ch, err = pubsub.Subscribe("mychannel2") + ch2, err := pubsub.Subscribe("mychannel2") c.Check(err, IsNil) - c.Check(ch, Not(Equals), nil) + c.Check(ch2, Equals, ch) pub := t.client.Publish("mychannel", "hello") c.Check(pub.Err(), IsNil)