Merge pull request #544 from go-redis/fix/simplify-pubsub-api

Simplify PubSub API
This commit is contained in:
Vladimir Mihailenco 2017-04-11 16:59:38 +03:00 committed by GitHub
commit 313a8d7fab
7 changed files with 41 additions and 78 deletions

View File

@ -258,13 +258,10 @@ func ExampleClient_Watch() {
}
func ExamplePubSub() {
pubsub, err := client.Subscribe("mychannel1")
if err != nil {
panic(err)
}
pubsub := client.Subscribe("mychannel1")
defer pubsub.Close()
err = client.Publish("mychannel1", "hello").Err()
err := client.Publish("mychannel1", "hello").Err()
if err != nil {
panic(err)
}
@ -279,10 +276,7 @@ func ExamplePubSub() {
}
func ExamplePubSub_Receive() {
pubsub, err := client.Subscribe("mychannel2")
if err != nil {
panic(err)
}
pubsub := client.Subscribe("mychannel2")
defer pubsub.Close()
n, err := client.Publish("mychannel2", "hello").Result()

View File

@ -81,8 +81,7 @@ var _ = Describe("pool", func() {
connPool := client.Pool()
perform(1000, func(id int) {
pubsub, err := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("test")
Expect(pubsub.Close()).NotTo(HaveOccurred())
})

View File

@ -57,18 +57,14 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
// Subscribes the client to the specified channels.
func (c *PubSub) Subscribe(channels ...string) error {
err := c.subscribe("SUBSCRIBE", channels...)
if err == nil {
c.channels = appendIfNotExists(c.channels, channels...)
}
c.channels = appendIfNotExists(c.channels, channels...)
return err
}
// Subscribes the client to the given patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
err := c.subscribe("PSUBSCRIBE", patterns...)
if err == nil {
c.patterns = appendIfNotExists(c.patterns, patterns...)
}
c.patterns = appendIfNotExists(c.patterns, patterns...)
return err
}
@ -76,9 +72,7 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
err := c.subscribe("UNSUBSCRIBE", channels...)
if err == nil {
c.channels = remove(c.channels, channels...)
}
c.channels = remove(c.channels, channels...)
return err
}
@ -86,9 +80,7 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
err := c.subscribe("PUNSUBSCRIBE", patterns...)
if err == nil {
c.patterns = remove(c.patterns, patterns...)
}
c.patterns = remove(c.patterns, patterns...)
return err
}

View File

@ -25,8 +25,7 @@ var _ = Describe("PubSub", func() {
})
It("should support pattern matching", func() {
pubsub, err := client.PSubscribe("mychannel*")
Expect(err).NotTo(HaveOccurred())
pubsub := client.PSubscribe("mychannel*")
defer pubsub.Close()
{
@ -77,8 +76,7 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred())
Expect(channels).To(BeEmpty())
pubsub, err := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel", "mychannel2")
defer pubsub.Close()
channels, err = client.PubSubChannels("mychannel*").Result()
@ -95,8 +93,7 @@ var _ = Describe("PubSub", func() {
})
It("should return the numbers of subscribers", func() {
pubsub, err := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel", "mychannel2")
defer pubsub.Close()
channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
@ -113,8 +110,7 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred())
Expect(num).To(Equal(int64(0)))
pubsub, err := client.PSubscribe("*")
Expect(err).NotTo(HaveOccurred())
pubsub := client.PSubscribe("*")
defer pubsub.Close()
num, err = client.PubSubNumPat().Result()
@ -123,8 +119,7 @@ var _ = Describe("PubSub", func() {
})
It("should pub/sub", func() {
pubsub, err := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel", "mychannel2")
defer pubsub.Close()
{
@ -200,11 +195,10 @@ var _ = Describe("PubSub", func() {
})
It("should ping/pong", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()
_, err = pubsub.ReceiveTimeout(time.Second)
_, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("")
@ -217,11 +211,10 @@ var _ = Describe("PubSub", func() {
})
It("should ping/pong with payload", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()
_, err = pubsub.ReceiveTimeout(time.Second)
_, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("hello")
@ -234,11 +227,10 @@ var _ = Describe("PubSub", func() {
})
It("should multi-ReceiveMessage", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()
err = client.Publish("mychannel", "hello").Err()
err := client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred())
err = client.Publish("mychannel", "world").Err()
@ -258,8 +250,7 @@ var _ = Describe("PubSub", func() {
It("should ReceiveMessage after timeout", func() {
timeout := 100 * time.Millisecond
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()
done := make(chan bool, 1)
@ -321,24 +312,21 @@ var _ = Describe("PubSub", func() {
}
It("Subscribe should reconnect on ReceiveMessage error", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()
expectReceiveMessageOnError(pubsub)
})
It("PSubscribe should reconnect on ReceiveMessage error", func() {
pubsub, err := client.PSubscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.PSubscribe("mychannel")
defer pubsub.Close()
expectReceiveMessageOnError(pubsub)
})
It("should return on Close", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe("mychannel")
defer pubsub.Close()
var wg sync.WaitGroup
@ -360,8 +348,7 @@ var _ = Describe("PubSub", func() {
wg.Wait()
wg.Add(1)
err = pubsub.Close()
Expect(err).NotTo(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred())
wg.Wait()
})
@ -369,18 +356,17 @@ var _ = Describe("PubSub", func() {
It("should ReceiveMessage without a subscription", func() {
timeout := 100 * time.Millisecond
pubsub, err := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe()
defer pubsub.Close()
go func() {
defer GinkgoRecover()
time.Sleep(2 * timeout)
err = pubsub.Subscribe("mychannel")
err := pubsub.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
err := client.Publish("mychannel", "hello").Err()
err = client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred())
}()

View File

@ -141,8 +141,7 @@ var _ = Describe("races", func() {
perform(C, func(id int) {
for i := 0; i < N; i++ {
pubsub, err := client.Subscribe(fmt.Sprintf("mychannel%d", id))
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe(fmt.Sprintf("mychannel%d", id))
go func() {
defer GinkgoRecover()
@ -152,7 +151,7 @@ var _ = Describe("races", func() {
Expect(err).NotTo(HaveOccurred())
}()
_, err = pubsub.ReceiveMessage()
_, err := pubsub.ReceiveMessage()
Expect(err.Error()).To(ContainSubstring("closed"))
val := "echo" + strconv.Itoa(i)

View File

@ -359,25 +359,19 @@ func (c *Client) pubSub() *PubSub {
}
// Subscribe subscribes the client to the specified channels.
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
if err := pubsub.Subscribe(channels...); err != nil {
pubsub.Close()
return nil, err
}
_ = pubsub.Subscribe(channels...)
}
return pubsub, nil
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
func (c *Client) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
if err := pubsub.PSubscribe(channels...); err != nil {
pubsub.Close()
return nil, err
}
_ = pubsub.PSubscribe(channels...)
}
return pubsub, nil
return pubsub
}

View File

@ -58,11 +58,10 @@ var _ = Describe("Client", func() {
})
It("should close pubsub without closing the client", func() {
pubsub, err := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe()
Expect(pubsub.Close()).NotTo(HaveOccurred())
_, err = pubsub.Receive()
_, err := pubsub.Receive()
Expect(err).To(MatchError("redis: client is closed"))
Expect(client.Ping().Err()).NotTo(HaveOccurred())
})
@ -92,11 +91,10 @@ var _ = Describe("Client", func() {
})
It("should close pubsub when client is closed", func() {
pubsub, err := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
pubsub := client.Subscribe()
Expect(client.Close()).NotTo(HaveOccurred())
_, err = pubsub.Receive()
_, err := pubsub.Receive()
Expect(err).To(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred())
@ -242,7 +240,8 @@ var _ = Describe("Client timeout", func() {
})
It("Subscribe timeouts", func() {
_, err := client.Subscribe("_")
pubsub := client.Subscribe()
err := pubsub.Subscribe("_")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})