From 8d52a95269db9577017b2827d8ca238388ed202f Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 11 Apr 2017 16:53:55 +0300 Subject: [PATCH] Simplify PubSub API --- example_test.go | 12 +++--------- pool_test.go | 3 +-- pubsub.go | 16 ++++----------- pubsub_test.go | 52 ++++++++++++++++++------------------------------- race_test.go | 5 ++--- redis.go | 18 ++++++----------- redis_test.go | 13 ++++++------- 7 files changed, 41 insertions(+), 78 deletions(-) diff --git a/example_test.go b/example_test.go index 7899ab0..98ee2dd 100644 --- a/example_test.go +++ b/example_test.go @@ -258,13 +258,10 @@ func ExampleClient_Watch() { } func ExamplePubSub() { - pubsub, err := client.Subscribe("mychannel1") - if err != nil { - panic(err) - } + pubsub := client.Subscribe("mychannel1") defer pubsub.Close() - err = client.Publish("mychannel1", "hello").Err() + err := client.Publish("mychannel1", "hello").Err() if err != nil { panic(err) } @@ -279,10 +276,7 @@ func ExamplePubSub() { } func ExamplePubSub_Receive() { - pubsub, err := client.Subscribe("mychannel2") - if err != nil { - panic(err) - } + pubsub := client.Subscribe("mychannel2") defer pubsub.Close() n, err := client.Publish("mychannel2", "hello").Result() diff --git a/pool_test.go b/pool_test.go index 3cd5585..c6731e4 100644 --- a/pool_test.go +++ b/pool_test.go @@ -81,8 +81,7 @@ var _ = Describe("pool", func() { connPool := client.Pool() perform(1000, func(id int) { - pubsub, err := client.Subscribe() - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("test") Expect(pubsub.Close()).NotTo(HaveOccurred()) }) diff --git a/pubsub.go b/pubsub.go index e681949..8705a13 100644 --- a/pubsub.go +++ b/pubsub.go @@ -57,18 +57,14 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error { // Subscribes the client to the specified channels. func (c *PubSub) Subscribe(channels ...string) error { err := c.subscribe("SUBSCRIBE", channels...) - if err == nil { - c.channels = appendIfNotExists(c.channels, channels...) - } + c.channels = appendIfNotExists(c.channels, channels...) return err } // Subscribes the client to the given patterns. func (c *PubSub) PSubscribe(patterns ...string) error { err := c.subscribe("PSUBSCRIBE", patterns...) - if err == nil { - c.patterns = appendIfNotExists(c.patterns, patterns...) - } + c.patterns = appendIfNotExists(c.patterns, patterns...) return err } @@ -76,9 +72,7 @@ func (c *PubSub) PSubscribe(patterns ...string) error { // them if none is given. func (c *PubSub) Unsubscribe(channels ...string) error { err := c.subscribe("UNSUBSCRIBE", channels...) - if err == nil { - c.channels = remove(c.channels, channels...) - } + c.channels = remove(c.channels, channels...) return err } @@ -86,9 +80,7 @@ func (c *PubSub) Unsubscribe(channels ...string) error { // them if none is given. func (c *PubSub) PUnsubscribe(patterns ...string) error { err := c.subscribe("PUNSUBSCRIBE", patterns...) - if err == nil { - c.patterns = remove(c.patterns, patterns...) - } + c.patterns = remove(c.patterns, patterns...) return err } diff --git a/pubsub_test.go b/pubsub_test.go index 4c74845..0164805 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -25,8 +25,7 @@ var _ = Describe("PubSub", func() { }) It("should support pattern matching", func() { - pubsub, err := client.PSubscribe("mychannel*") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.PSubscribe("mychannel*") defer pubsub.Close() { @@ -77,8 +76,7 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) Expect(channels).To(BeEmpty()) - pubsub, err := client.Subscribe("mychannel", "mychannel2") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel", "mychannel2") defer pubsub.Close() channels, err = client.PubSubChannels("mychannel*").Result() @@ -95,8 +93,7 @@ var _ = Describe("PubSub", func() { }) It("should return the numbers of subscribers", func() { - pubsub, err := client.Subscribe("mychannel", "mychannel2") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel", "mychannel2") defer pubsub.Close() channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result() @@ -113,8 +110,7 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) Expect(num).To(Equal(int64(0))) - pubsub, err := client.PSubscribe("*") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.PSubscribe("*") defer pubsub.Close() num, err = client.PubSubNumPat().Result() @@ -123,8 +119,7 @@ var _ = Describe("PubSub", func() { }) It("should pub/sub", func() { - pubsub, err := client.Subscribe("mychannel", "mychannel2") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel", "mychannel2") defer pubsub.Close() { @@ -200,11 +195,10 @@ var _ = Describe("PubSub", func() { }) It("should ping/pong", func() { - pubsub, err := client.Subscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel") defer pubsub.Close() - _, err = pubsub.ReceiveTimeout(time.Second) + _, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) err = pubsub.Ping("") @@ -217,11 +211,10 @@ var _ = Describe("PubSub", func() { }) It("should ping/pong with payload", func() { - pubsub, err := client.Subscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel") defer pubsub.Close() - _, err = pubsub.ReceiveTimeout(time.Second) + _, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) err = pubsub.Ping("hello") @@ -234,11 +227,10 @@ var _ = Describe("PubSub", func() { }) It("should multi-ReceiveMessage", func() { - pubsub, err := client.Subscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel") defer pubsub.Close() - err = client.Publish("mychannel", "hello").Err() + err := client.Publish("mychannel", "hello").Err() Expect(err).NotTo(HaveOccurred()) err = client.Publish("mychannel", "world").Err() @@ -258,8 +250,7 @@ var _ = Describe("PubSub", func() { It("should ReceiveMessage after timeout", func() { timeout := 100 * time.Millisecond - pubsub, err := client.Subscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel") defer pubsub.Close() done := make(chan bool, 1) @@ -321,24 +312,21 @@ var _ = Describe("PubSub", func() { } It("Subscribe should reconnect on ReceiveMessage error", func() { - pubsub, err := client.Subscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel") defer pubsub.Close() expectReceiveMessageOnError(pubsub) }) It("PSubscribe should reconnect on ReceiveMessage error", func() { - pubsub, err := client.PSubscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.PSubscribe("mychannel") defer pubsub.Close() expectReceiveMessageOnError(pubsub) }) It("should return on Close", func() { - pubsub, err := client.Subscribe("mychannel") - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe("mychannel") defer pubsub.Close() var wg sync.WaitGroup @@ -360,8 +348,7 @@ var _ = Describe("PubSub", func() { wg.Wait() wg.Add(1) - err = pubsub.Close() - Expect(err).NotTo(HaveOccurred()) + Expect(pubsub.Close()).NotTo(HaveOccurred()) wg.Wait() }) @@ -369,18 +356,17 @@ var _ = Describe("PubSub", func() { It("should ReceiveMessage without a subscription", func() { timeout := 100 * time.Millisecond - pubsub, err := client.Subscribe() - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe() defer pubsub.Close() go func() { defer GinkgoRecover() time.Sleep(2 * timeout) - err = pubsub.Subscribe("mychannel") + err := pubsub.Subscribe("mychannel") Expect(err).NotTo(HaveOccurred()) - err := client.Publish("mychannel", "hello").Err() + err = client.Publish("mychannel", "hello").Err() Expect(err).NotTo(HaveOccurred()) }() diff --git a/race_test.go b/race_test.go index 4529cf4..3af7226 100644 --- a/race_test.go +++ b/race_test.go @@ -141,8 +141,7 @@ var _ = Describe("races", func() { perform(C, func(id int) { for i := 0; i < N; i++ { - pubsub, err := client.Subscribe(fmt.Sprintf("mychannel%d", id)) - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe(fmt.Sprintf("mychannel%d", id)) go func() { defer GinkgoRecover() @@ -152,7 +151,7 @@ var _ = Describe("races", func() { Expect(err).NotTo(HaveOccurred()) }() - _, err = pubsub.ReceiveMessage() + _, err := pubsub.ReceiveMessage() Expect(err.Error()).To(ContainSubstring("closed")) val := "echo" + strconv.Itoa(i) diff --git a/redis.go b/redis.go index 262e676..b20b34b 100644 --- a/redis.go +++ b/redis.go @@ -359,25 +359,19 @@ func (c *Client) pubSub() *PubSub { } // Subscribe subscribes the client to the specified channels. -func (c *Client) Subscribe(channels ...string) (*PubSub, error) { +func (c *Client) Subscribe(channels ...string) *PubSub { pubsub := c.pubSub() if len(channels) > 0 { - if err := pubsub.Subscribe(channels...); err != nil { - pubsub.Close() - return nil, err - } + _ = pubsub.Subscribe(channels...) } - return pubsub, nil + return pubsub } // PSubscribe subscribes the client to the given patterns. -func (c *Client) PSubscribe(channels ...string) (*PubSub, error) { +func (c *Client) PSubscribe(channels ...string) *PubSub { pubsub := c.pubSub() if len(channels) > 0 { - if err := pubsub.PSubscribe(channels...); err != nil { - pubsub.Close() - return nil, err - } + _ = pubsub.PSubscribe(channels...) } - return pubsub, nil + return pubsub } diff --git a/redis_test.go b/redis_test.go index 64cb2a4..f044722 100644 --- a/redis_test.go +++ b/redis_test.go @@ -58,11 +58,10 @@ var _ = Describe("Client", func() { }) It("should close pubsub without closing the client", func() { - pubsub, err := client.Subscribe() - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe() Expect(pubsub.Close()).NotTo(HaveOccurred()) - _, err = pubsub.Receive() + _, err := pubsub.Receive() Expect(err).To(MatchError("redis: client is closed")) Expect(client.Ping().Err()).NotTo(HaveOccurred()) }) @@ -92,11 +91,10 @@ var _ = Describe("Client", func() { }) It("should close pubsub when client is closed", func() { - pubsub, err := client.Subscribe() - Expect(err).NotTo(HaveOccurred()) + pubsub := client.Subscribe() Expect(client.Close()).NotTo(HaveOccurred()) - _, err = pubsub.Receive() + _, err := pubsub.Receive() Expect(err).To(HaveOccurred()) Expect(pubsub.Close()).NotTo(HaveOccurred()) @@ -242,7 +240,8 @@ var _ = Describe("Client timeout", func() { }) It("Subscribe timeouts", func() { - _, err := client.Subscribe("_") + pubsub := client.Subscribe() + err := pubsub.Subscribe("_") Expect(err).To(HaveOccurred()) Expect(err.(net.Error).Timeout()).To(BeTrue()) })