Merge pull request #132 from go-redis/feature/pubsub-ping

pubsub: add PING support.
This commit is contained in:
Vladimir Mihailenco 2015-07-11 13:18:58 +03:00
commit 2cb6b30541
3 changed files with 91 additions and 40 deletions

View File

@ -2,7 +2,6 @@ package redis_test
import ( import (
"fmt" "fmt"
"net"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -179,14 +178,14 @@ func ExamplePubSub() {
panic(err) panic(err)
} }
for { for i := 0; i < 4; i++ {
msgi, err := pubsub.ReceiveTimeout(100 * time.Millisecond) msgi, err := pubsub.ReceiveTimeout(100 * time.Millisecond)
if err != nil { if err != nil {
if neterr, ok := err.(net.Error); ok && neterr.Timeout() { err := pubsub.Ping("")
// There are no more messages to process. Stop. if err != nil {
break panic(err)
} }
panic(err) continue
} }
switch msg := msgi.(type) { switch msg := msgi.(type) {
@ -194,6 +193,8 @@ func ExamplePubSub() {
fmt.Println(msg.Kind, msg.Channel) fmt.Println(msg.Kind, msg.Channel)
case *redis.Message: case *redis.Message:
fmt.Println(msg.Channel, msg.Payload) fmt.Println(msg.Channel, msg.Payload)
case *redis.Pong:
fmt.Println(msg)
default: default:
panic(fmt.Sprintf("unknown message: %#v", msgi)) panic(fmt.Sprintf("unknown message: %#v", msgi))
} }
@ -201,6 +202,7 @@ func ExamplePubSub() {
// Output: subscribe mychannel // Output: subscribe mychannel
// mychannel hello // mychannel hello
// Pong
} }
func ExampleScript() { func ExampleScript() {

View File

@ -26,6 +26,20 @@ func (c *Client) Publish(channel, message string) *IntCmd {
return req return req
} }
func (c *PubSub) Ping(payload string) error {
cn, err := c.conn()
if err != nil {
return err
}
args := []interface{}{"PING"}
if payload != "" {
args = append(args, payload)
}
cmd := NewCmd(args...)
return cn.writeCmds(cmd)
}
// Message received as result of a PUBLISH command issued by another client. // Message received as result of a PUBLISH command issued by another client.
type Message struct { type Message struct {
Channel string Channel string
@ -48,6 +62,18 @@ func (m *PMessage) String() string {
return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload) return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload)
} }
// Pong received as result of a PING command issued by another client.
type Pong struct {
Payload string
}
func (p *Pong) String() string {
if p.Payload != "" {
return fmt.Sprintf("Pong<%s>", p.Payload)
}
return "Pong"
}
// Message received after a successful subscription to channel. // Message received after a successful subscription to channel.
type Subscription struct { type Subscription struct {
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe". // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
@ -66,22 +92,8 @@ func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0) return c.ReceiveTimeout(0)
} }
func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { func newMessage(reply []interface{}) (interface{}, error) {
cn, err := c.conn() switch kind := reply[0].(string); kind {
if err != nil {
return nil, err
}
cn.ReadTimeout = timeout
cmd := NewSliceCmd()
if err := cmd.parseReply(cn.rd); err != nil {
return nil, err
}
reply := cmd.Val()
kind := reply[0].(string)
switch kind {
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe": case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
return &Subscription{ return &Subscription{
Kind: kind, Kind: kind,
@ -99,9 +111,27 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
Channel: reply[2].(string), Channel: reply[2].(string),
Payload: reply[3].(string), Payload: reply[3].(string),
}, nil }, nil
case "pong":
return &Pong{
Payload: reply[1].(string),
}, nil
default:
return nil, fmt.Errorf("redis: unsupported pubsub notification: %q", kind)
} }
}
return nil, fmt.Errorf("redis: unsupported pubsub notification: %q", kind) func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
cn, err := c.conn()
if err != nil {
return nil, err
}
cn.ReadTimeout = timeout
cmd := NewSliceCmd()
if err := cmd.parseReply(cn.rd); err != nil {
return nil, err
}
return newMessage(cmd.Val())
} }
func (c *PubSub) subscribe(cmd string, channels ...string) error { func (c *PubSub) subscribe(cmd string, channels ...string) error {

View File

@ -12,24 +12,22 @@ import (
var _ = Describe("PubSub", func() { var _ = Describe("PubSub", func() {
var client *redis.Client var client *redis.Client
var pubsub *redis.PubSub
BeforeEach(func() { BeforeEach(func() {
client = redis.NewClient(&redis.Options{ client = redis.NewClient(&redis.Options{
Addr: redisAddr, Addr: redisAddr,
}) })
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
pubsub = client.PubSub()
}) })
AfterEach(func() { AfterEach(func() {
Expect(client.FlushDb().Err()).NotTo(HaveOccurred()) Expect(pubsub.Close()).NotTo(HaveOccurred())
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
}) })
It("should support pattern matching", func() { It("should support pattern matching", func() {
pubsub := client.PubSub()
defer func() {
Expect(pubsub.Close()).NotTo(HaveOccurred())
}()
Expect(pubsub.PSubscribe("mychannel*")).NotTo(HaveOccurred()) Expect(pubsub.PSubscribe("mychannel*")).NotTo(HaveOccurred())
pub := client.Publish("mychannel1", "hello") pub := client.Publish("mychannel1", "hello")
@ -77,8 +75,6 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(channels).To(BeEmpty()) Expect(channels).To(BeEmpty())
pubsub := client.PubSub()
defer pubsub.Close()
Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
channels, err = client.PubSubChannels("mychannel*").Result() channels, err = client.PubSubChannels("mychannel*").Result()
@ -95,8 +91,6 @@ var _ = Describe("PubSub", func() {
}) })
It("should return the numbers of subscribers", func() { It("should return the numbers of subscribers", func() {
pubsub := client.PubSub()
defer pubsub.Close()
Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result() channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
@ -113,8 +107,6 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(num).To(Equal(int64(0))) Expect(num).To(Equal(int64(0)))
pubsub := client.PubSub()
defer pubsub.Close()
Expect(pubsub.PSubscribe("*")).NotTo(HaveOccurred()) Expect(pubsub.PSubscribe("*")).NotTo(HaveOccurred())
num, err = client.PubSubNumPat().Result() num, err = client.PubSubNumPat().Result()
@ -123,11 +115,6 @@ var _ = Describe("PubSub", func() {
}) })
It("should pub/sub", func() { It("should pub/sub", func() {
pubsub := client.PubSub()
defer func() {
Expect(pubsub.Close()).NotTo(HaveOccurred())
}()
Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred()) Expect(pubsub.Subscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
pub := client.Publish("mychannel", "hello") pub := client.Publish("mychannel", "hello")
@ -199,4 +186,36 @@ var _ = Describe("PubSub", func() {
} }
}) })
It("should ping/pong", func() {
err := pubsub.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
_, err = pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("")
Expect(err).NotTo(HaveOccurred())
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
pong := msgi.(*redis.Pong)
Expect(pong.Payload).To(Equal(""))
})
It("should ping/pong with payload", func() {
err := pubsub.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
_, err = pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("hello")
Expect(err).NotTo(HaveOccurred())
msgi, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
pong := msgi.(*redis.Pong)
Expect(pong.Payload).To(Equal("hello"))
})
}) })