Simplify PubSub API

This commit is contained in:
Vladimir Mihailenco 2017-04-11 16:53:55 +03:00
parent 3b1a641e2c
commit 8d52a95269
7 changed files with 41 additions and 78 deletions

View File

@ -258,13 +258,10 @@ func ExampleClient_Watch() {
} }
func ExamplePubSub() { func ExamplePubSub() {
pubsub, err := client.Subscribe("mychannel1") pubsub := client.Subscribe("mychannel1")
if err != nil {
panic(err)
}
defer pubsub.Close() defer pubsub.Close()
err = client.Publish("mychannel1", "hello").Err() err := client.Publish("mychannel1", "hello").Err()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -279,10 +276,7 @@ func ExamplePubSub() {
} }
func ExamplePubSub_Receive() { func ExamplePubSub_Receive() {
pubsub, err := client.Subscribe("mychannel2") pubsub := client.Subscribe("mychannel2")
if err != nil {
panic(err)
}
defer pubsub.Close() defer pubsub.Close()
n, err := client.Publish("mychannel2", "hello").Result() n, err := client.Publish("mychannel2", "hello").Result()

View File

@ -81,8 +81,7 @@ var _ = Describe("pool", func() {
connPool := client.Pool() connPool := client.Pool()
perform(1000, func(id int) { perform(1000, func(id int) {
pubsub, err := client.Subscribe() pubsub := client.Subscribe("test")
Expect(err).NotTo(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred()) Expect(pubsub.Close()).NotTo(HaveOccurred())
}) })

View File

@ -57,18 +57,14 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
// Subscribes the client to the specified channels. // Subscribes the client to the specified channels.
func (c *PubSub) Subscribe(channels ...string) error { func (c *PubSub) Subscribe(channels ...string) error {
err := c.subscribe("SUBSCRIBE", channels...) err := c.subscribe("SUBSCRIBE", channels...)
if err == nil { c.channels = appendIfNotExists(c.channels, channels...)
c.channels = appendIfNotExists(c.channels, channels...)
}
return err return err
} }
// Subscribes the client to the given patterns. // Subscribes the client to the given patterns.
func (c *PubSub) PSubscribe(patterns ...string) error { func (c *PubSub) PSubscribe(patterns ...string) error {
err := c.subscribe("PSUBSCRIBE", patterns...) err := c.subscribe("PSUBSCRIBE", patterns...)
if err == nil { c.patterns = appendIfNotExists(c.patterns, patterns...)
c.patterns = appendIfNotExists(c.patterns, patterns...)
}
return err return err
} }
@ -76,9 +72,7 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
// them if none is given. // them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error { func (c *PubSub) Unsubscribe(channels ...string) error {
err := c.subscribe("UNSUBSCRIBE", channels...) err := c.subscribe("UNSUBSCRIBE", channels...)
if err == nil { c.channels = remove(c.channels, channels...)
c.channels = remove(c.channels, channels...)
}
return err return err
} }
@ -86,9 +80,7 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
// them if none is given. // them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error { func (c *PubSub) PUnsubscribe(patterns ...string) error {
err := c.subscribe("PUNSUBSCRIBE", patterns...) err := c.subscribe("PUNSUBSCRIBE", patterns...)
if err == nil { c.patterns = remove(c.patterns, patterns...)
c.patterns = remove(c.patterns, patterns...)
}
return err return err
} }

View File

@ -25,8 +25,7 @@ var _ = Describe("PubSub", func() {
}) })
It("should support pattern matching", func() { It("should support pattern matching", func() {
pubsub, err := client.PSubscribe("mychannel*") pubsub := client.PSubscribe("mychannel*")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
{ {
@ -77,8 +76,7 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(channels).To(BeEmpty()) Expect(channels).To(BeEmpty())
pubsub, err := client.Subscribe("mychannel", "mychannel2") pubsub := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
channels, err = client.PubSubChannels("mychannel*").Result() channels, err = client.PubSubChannels("mychannel*").Result()
@ -95,8 +93,7 @@ var _ = Describe("PubSub", func() {
}) })
It("should return the numbers of subscribers", func() { It("should return the numbers of subscribers", func() {
pubsub, err := client.Subscribe("mychannel", "mychannel2") pubsub := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result() channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
@ -113,8 +110,7 @@ var _ = Describe("PubSub", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(num).To(Equal(int64(0))) Expect(num).To(Equal(int64(0)))
pubsub, err := client.PSubscribe("*") pubsub := client.PSubscribe("*")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
num, err = client.PubSubNumPat().Result() num, err = client.PubSubNumPat().Result()
@ -123,8 +119,7 @@ var _ = Describe("PubSub", func() {
}) })
It("should pub/sub", func() { It("should pub/sub", func() {
pubsub, err := client.Subscribe("mychannel", "mychannel2") pubsub := client.Subscribe("mychannel", "mychannel2")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
{ {
@ -200,11 +195,10 @@ var _ = Describe("PubSub", func() {
}) })
It("should ping/pong", func() { It("should ping/pong", func() {
pubsub, err := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
_, err = pubsub.ReceiveTimeout(time.Second) _, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("") err = pubsub.Ping("")
@ -217,11 +211,10 @@ var _ = Describe("PubSub", func() {
}) })
It("should ping/pong with payload", func() { It("should ping/pong with payload", func() {
pubsub, err := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
_, err = pubsub.ReceiveTimeout(time.Second) _, err := pubsub.ReceiveTimeout(time.Second)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = pubsub.Ping("hello") err = pubsub.Ping("hello")
@ -234,11 +227,10 @@ var _ = Describe("PubSub", func() {
}) })
It("should multi-ReceiveMessage", func() { It("should multi-ReceiveMessage", func() {
pubsub, err := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
err = client.Publish("mychannel", "hello").Err() err := client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = client.Publish("mychannel", "world").Err() err = client.Publish("mychannel", "world").Err()
@ -258,8 +250,7 @@ var _ = Describe("PubSub", func() {
It("should ReceiveMessage after timeout", func() { It("should ReceiveMessage after timeout", func() {
timeout := 100 * time.Millisecond timeout := 100 * time.Millisecond
pubsub, err := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
done := make(chan bool, 1) done := make(chan bool, 1)
@ -321,24 +312,21 @@ var _ = Describe("PubSub", func() {
} }
It("Subscribe should reconnect on ReceiveMessage error", func() { It("Subscribe should reconnect on ReceiveMessage error", func() {
pubsub, err := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
expectReceiveMessageOnError(pubsub) expectReceiveMessageOnError(pubsub)
}) })
It("PSubscribe should reconnect on ReceiveMessage error", func() { It("PSubscribe should reconnect on ReceiveMessage error", func() {
pubsub, err := client.PSubscribe("mychannel") pubsub := client.PSubscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
expectReceiveMessageOnError(pubsub) expectReceiveMessageOnError(pubsub)
}) })
It("should return on Close", func() { It("should return on Close", func() {
pubsub, err := client.Subscribe("mychannel") pubsub := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
@ -360,8 +348,7 @@ var _ = Describe("PubSub", func() {
wg.Wait() wg.Wait()
wg.Add(1) wg.Add(1)
err = pubsub.Close() Expect(pubsub.Close()).NotTo(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
wg.Wait() wg.Wait()
}) })
@ -369,18 +356,17 @@ var _ = Describe("PubSub", func() {
It("should ReceiveMessage without a subscription", func() { It("should ReceiveMessage without a subscription", func() {
timeout := 100 * time.Millisecond timeout := 100 * time.Millisecond
pubsub, err := client.Subscribe() pubsub := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close() defer pubsub.Close()
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
time.Sleep(2 * timeout) time.Sleep(2 * timeout)
err = pubsub.Subscribe("mychannel") err := pubsub.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err := client.Publish("mychannel", "hello").Err() err = client.Publish("mychannel", "hello").Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}() }()

View File

@ -141,8 +141,7 @@ var _ = Describe("races", func() {
perform(C, func(id int) { perform(C, func(id int) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
pubsub, err := client.Subscribe(fmt.Sprintf("mychannel%d", id)) pubsub := client.Subscribe(fmt.Sprintf("mychannel%d", id))
Expect(err).NotTo(HaveOccurred())
go func() { go func() {
defer GinkgoRecover() defer GinkgoRecover()
@ -152,7 +151,7 @@ var _ = Describe("races", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}() }()
_, err = pubsub.ReceiveMessage() _, err := pubsub.ReceiveMessage()
Expect(err.Error()).To(ContainSubstring("closed")) Expect(err.Error()).To(ContainSubstring("closed"))
val := "echo" + strconv.Itoa(i) val := "echo" + strconv.Itoa(i)

View File

@ -359,25 +359,19 @@ func (c *Client) pubSub() *PubSub {
} }
// Subscribe subscribes the client to the specified channels. // Subscribe subscribes the client to the specified channels.
func (c *Client) Subscribe(channels ...string) (*PubSub, error) { func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub() pubsub := c.pubSub()
if len(channels) > 0 { if len(channels) > 0 {
if err := pubsub.Subscribe(channels...); err != nil { _ = pubsub.Subscribe(channels...)
pubsub.Close()
return nil, err
}
} }
return pubsub, nil return pubsub
} }
// PSubscribe subscribes the client to the given patterns. // PSubscribe subscribes the client to the given patterns.
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) { func (c *Client) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub() pubsub := c.pubSub()
if len(channels) > 0 { if len(channels) > 0 {
if err := pubsub.PSubscribe(channels...); err != nil { _ = pubsub.PSubscribe(channels...)
pubsub.Close()
return nil, err
}
} }
return pubsub, nil return pubsub
} }

View File

@ -58,11 +58,10 @@ var _ = Describe("Client", func() {
}) })
It("should close pubsub without closing the client", func() { It("should close pubsub without closing the client", func() {
pubsub, err := client.Subscribe() pubsub := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred()) Expect(pubsub.Close()).NotTo(HaveOccurred())
_, err = pubsub.Receive() _, err := pubsub.Receive()
Expect(err).To(MatchError("redis: client is closed")) Expect(err).To(MatchError("redis: client is closed"))
Expect(client.Ping().Err()).NotTo(HaveOccurred()) Expect(client.Ping().Err()).NotTo(HaveOccurred())
}) })
@ -92,11 +91,10 @@ var _ = Describe("Client", func() {
}) })
It("should close pubsub when client is closed", func() { It("should close pubsub when client is closed", func() {
pubsub, err := client.Subscribe() pubsub := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
Expect(client.Close()).NotTo(HaveOccurred()) Expect(client.Close()).NotTo(HaveOccurred())
_, err = pubsub.Receive() _, err := pubsub.Receive()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred()) Expect(pubsub.Close()).NotTo(HaveOccurred())
@ -242,7 +240,8 @@ var _ = Describe("Client timeout", func() {
}) })
It("Subscribe timeouts", func() { It("Subscribe timeouts", func() {
_, err := client.Subscribe("_") pubsub := client.Subscribe()
err := pubsub.Subscribe("_")
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue()) Expect(err.(net.Error).Timeout()).To(BeTrue())
}) })