diff --git a/commands_test.go b/commands_test.go index 7aca589..0d19b2e 100644 --- a/commands_test.go +++ b/commands_test.go @@ -83,7 +83,7 @@ var _ = Describe("Commands", func() { Consistently(func() error { return client.Ping().Err() - }, "900ms").Should(HaveOccurred()) + }, "400ms").Should(HaveOccurred()) // pause time - read timeout Eventually(func() error { return client.Ping().Err() diff --git a/example_test.go b/example_test.go index 859b645..fb63ecb 100644 --- a/example_test.go +++ b/example_test.go @@ -2,7 +2,9 @@ package redis_test import ( "fmt" + "net" "strconv" + "time" "gopkg.in/redis.v3" ) @@ -125,19 +127,37 @@ func ExamplePubSub() { defer pubsub.Close() err := pubsub.Subscribe("mychannel") - _ = err + if err != nil { + panic(err) + } - msg, err := pubsub.Receive() - fmt.Println(msg, err) + err = client.Publish("mychannel", "hello").Err() + if err != nil { + panic(err) + } - pub := client.Publish("mychannel", "hello") - _ = pub.Err() + for { + msgi, err := pubsub.ReceiveTimeout(100 * time.Millisecond) + if err != nil { + if neterr, ok := err.(net.Error); ok && neterr.Timeout() { + // There are no more messages to process. Stop. + break + } + panic(err) + } - msg, err = pubsub.Receive() - fmt.Println(msg, err) + switch msg := msgi.(type) { + case *redis.Subscription: + fmt.Println(msg.Kind, msg.Channel) + case *redis.Message: + fmt.Println(msg.Channel, msg.Payload) + default: + panic(fmt.Sprintf("unknown message: %#v", msgi)) + } + } - // Output: subscribe: mychannel - // Message + // Output: subscribe mychannel + // mychannel hello } func ExampleScript() { diff --git a/pubsub.go b/pubsub.go index 5a32220..9d63e6b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,7 +5,8 @@ import ( "time" ) -// Not thread-safe. +// PubSub implements Pub/Sub commands as described in +// http://redis.io/topics/pubsub. type PubSub struct { *baseClient }