diff --git a/example_test.go b/example_test.go index 94090416..319ea0ca 100644 --- a/example_test.go +++ b/example_test.go @@ -278,7 +278,14 @@ func ExamplePubSub() { pubsub := client.Subscribe("mychannel1") defer pubsub.Close() - err := client.Publish("mychannel1", "hello").Err() + // Wait for subscription to be created before publishing message. + subscr, err := pubsub.ReceiveTimeout(time.Second) + if err != nil { + panic(err) + } + fmt.Println(subscr) + + err = client.Publish("mychannel1", "hello").Err() if err != nil { panic(err) } @@ -289,22 +296,17 @@ func ExamplePubSub() { } fmt.Println(msg.Channel, msg.Payload) - // Output: mychannel1 hello + // Output: subscribe: mychannel1 + // mychannel1 hello } func ExamplePubSub_Receive() { pubsub := client.Subscribe("mychannel2") defer pubsub.Close() - n, err := client.Publish("mychannel2", "hello").Result() - if err != nil { - panic(err) - } - fmt.Println(n, "clients received message") - for i := 0; i < 2; i++ { // ReceiveTimeout is a low level API. Use ReceiveMessage instead. - msgi, err := pubsub.ReceiveTimeout(5 * time.Second) + msgi, err := pubsub.ReceiveTimeout(time.Second) if err != nil { break } @@ -312,15 +314,19 @@ func ExamplePubSub_Receive() { switch msg := msgi.(type) { case *redis.Subscription: fmt.Println("subscribed to", msg.Channel) + + _, err := client.Publish("mychannel2", "hello").Result() + if err != nil { + panic(err) + } case *redis.Message: fmt.Println("received", msg.Payload, "from", msg.Channel) default: - panic(fmt.Errorf("unknown message: %#v", msgi)) + panic("unreached") } } - // Output: 1 clients received message - // subscribed to mychannel2 + // sent message to 1 client // received hello from mychannel2 } diff --git a/pubsub_test.go b/pubsub_test.go index d03270c3..e8589f46 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -230,7 +230,15 @@ var _ = Describe("PubSub", func() { pubsub := client.Subscribe("mychannel") defer pubsub.Close() - err := client.Publish("mychannel", "hello").Err() + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "subscribe", + Channel: "mychannel", + Count: 1, + })) + + err = client.Publish("mychannel", "hello").Err() Expect(err).NotTo(HaveOccurred()) err = client.Publish("mychannel", "world").Err() @@ -253,6 +261,14 @@ var _ = Describe("PubSub", func() { pubsub := client.Subscribe("mychannel") defer pubsub.Close() + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "subscribe", + Channel: "mychannel", + Count: 1, + })) + done := make(chan bool, 1) go func() { defer GinkgoRecover() @@ -308,6 +324,14 @@ var _ = Describe("PubSub", func() { pubsub := client.Subscribe("mychannel") defer pubsub.Close() + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "subscribe", + Channel: "mychannel", + Count: 1, + })) + expectReceiveMessageOnError(pubsub) }) @@ -315,6 +339,14 @@ var _ = Describe("PubSub", func() { pubsub := client.PSubscribe("mychannel") defer pubsub.Close() + subscr, err := pubsub.ReceiveTimeout(time.Second) + Expect(err).NotTo(HaveOccurred()) + Expect(subscr).To(Equal(&redis.Subscription{ + Kind: "psubscribe", + Channel: "mychannel", + Count: 1, + })) + expectReceiveMessageOnError(pubsub) }) @@ -356,9 +388,12 @@ var _ = Describe("PubSub", func() { defer GinkgoRecover() time.Sleep(2 * timeout) + err := pubsub.Subscribe("mychannel") Expect(err).NotTo(HaveOccurred()) + time.Sleep(timeout) + err = client.Publish("mychannel", "hello").Err() Expect(err).NotTo(HaveOccurred()) }()