Merge pull request #593 from go-redis/fix/pubsub-example

Fix PubSub example. Fixes #575
This commit is contained in:
Vladimir Mihailenco 2017-07-01 13:54:10 +03:00 committed by GitHub
commit 564772f045
2 changed files with 54 additions and 13 deletions

View File

@ -278,7 +278,14 @@ func ExamplePubSub() {
pubsub := client.Subscribe("mychannel1") pubsub := client.Subscribe("mychannel1")
defer pubsub.Close() defer pubsub.Close()
err := client.Publish("mychannel1", "hello").Err() // Wait for subscription to be created before publishing message.
subscr, err := pubsub.ReceiveTimeout(time.Second)
if err != nil {
panic(err)
}
fmt.Println(subscr)
err = client.Publish("mychannel1", "hello").Err()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -289,22 +296,17 @@ func ExamplePubSub() {
} }
fmt.Println(msg.Channel, msg.Payload) fmt.Println(msg.Channel, msg.Payload)
// Output: mychannel1 hello // Output: subscribe: mychannel1
// mychannel1 hello
} }
func ExamplePubSub_Receive() { func ExamplePubSub_Receive() {
pubsub := client.Subscribe("mychannel2") pubsub := client.Subscribe("mychannel2")
defer pubsub.Close() defer pubsub.Close()
n, err := client.Publish("mychannel2", "hello").Result()
if err != nil {
panic(err)
}
fmt.Println(n, "clients received message")
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
// ReceiveTimeout is a low level API. Use ReceiveMessage instead. // ReceiveTimeout is a low level API. Use ReceiveMessage instead.
msgi, err := pubsub.ReceiveTimeout(5 * time.Second) msgi, err := pubsub.ReceiveTimeout(time.Second)
if err != nil { if err != nil {
break break
} }
@ -312,15 +314,19 @@ func ExamplePubSub_Receive() {
switch msg := msgi.(type) { switch msg := msgi.(type) {
case *redis.Subscription: case *redis.Subscription:
fmt.Println("subscribed to", msg.Channel) fmt.Println("subscribed to", msg.Channel)
_, err := client.Publish("mychannel2", "hello").Result()
if err != nil {
panic(err)
}
case *redis.Message: case *redis.Message:
fmt.Println("received", msg.Payload, "from", msg.Channel) fmt.Println("received", msg.Payload, "from", msg.Channel)
default: default:
panic(fmt.Errorf("unknown message: %#v", msgi)) panic("unreached")
} }
} }
// Output: 1 clients received message // sent message to 1 client
// subscribed to mychannel2
// received hello from mychannel2 // received hello from mychannel2
} }

View File

@ -230,7 +230,15 @@ var _ = Describe("PubSub", func() {
pubsub := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
defer pubsub.Close() defer pubsub.Close()
err := client.Publish("mychannel", "hello").Err() subscr, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
Expect(subscr).To(Equal(&redis.Subscription{
Kind: "subscribe",
Channel: "mychannel",
Count: 1,
}))
err = client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = client.Publish("mychannel", "world").Err() err = client.Publish("mychannel", "world").Err()
@ -253,6 +261,14 @@ var _ = Describe("PubSub", func() {
pubsub := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
defer pubsub.Close() defer pubsub.Close()
subscr, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
Expect(subscr).To(Equal(&redis.Subscription{
Kind: "subscribe",
Channel: "mychannel",
Count: 1,
}))
done := make(chan bool, 1) done := make(chan bool, 1)
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
@ -308,6 +324,14 @@ var _ = Describe("PubSub", func() {
pubsub := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
defer pubsub.Close() defer pubsub.Close()
subscr, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
Expect(subscr).To(Equal(&redis.Subscription{
Kind: "subscribe",
Channel: "mychannel",
Count: 1,
}))
expectReceiveMessageOnError(pubsub) expectReceiveMessageOnError(pubsub)
}) })
@ -315,6 +339,14 @@ var _ = Describe("PubSub", func() {
pubsub := client.PSubscribe("mychannel") pubsub := client.PSubscribe("mychannel")
defer pubsub.Close() defer pubsub.Close()
subscr, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
Expect(subscr).To(Equal(&redis.Subscription{
Kind: "psubscribe",
Channel: "mychannel",
Count: 1,
}))
expectReceiveMessageOnError(pubsub) expectReceiveMessageOnError(pubsub)
}) })
@ -356,9 +388,12 @@ var _ = Describe("PubSub", func() {
defer GinkgoRecover() defer GinkgoRecover()
time.Sleep(2 * timeout) time.Sleep(2 * timeout)
err := pubsub.Subscribe("mychannel") err := pubsub.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
time.Sleep(timeout)
err = client.Publish("mychannel", "hello").Err() err = client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}() }()