forked from mirror/redis
Merge pull request #343 from go-redis/fix/publish-cmd
Move Publish channel to cmdable. Remove method that was deprecated in…
This commit is contained in:
commit
86841d3eda
|
@ -1618,6 +1618,13 @@ func (c *cmdable) DebugObject(key string) *StringCmd {
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Publish posts the message to the channel.
|
||||||
|
func (c *cmdable) Publish(channel, message string) *IntCmd {
|
||||||
|
cmd := NewIntCmd("PUBLISH", channel, message)
|
||||||
|
c.process(cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cmdable) PubSubChannels(pattern string) *StringSliceCmd {
|
func (c *cmdable) PubSubChannels(pattern string) *StringSliceCmd {
|
||||||
args := []interface{}{"pubsub", "channels"}
|
args := []interface{}{"pubsub", "channels"}
|
||||||
if pattern != "*" {
|
if pattern != "*" {
|
||||||
|
|
|
@ -81,8 +81,8 @@ var _ = Describe("pool", func() {
|
||||||
connPool.(*pool.ConnPool).DialLimiter = nil
|
connPool.(*pool.ConnPool).DialLimiter = nil
|
||||||
|
|
||||||
perform(1000, func(id int) {
|
perform(1000, func(id int) {
|
||||||
pubsub := client.PubSub()
|
pubsub, err := client.Subscribe()
|
||||||
Expect(pubsub.Subscribe()).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
29
pubsub.go
29
pubsub.go
|
@ -10,13 +10,6 @@ import (
|
||||||
"gopkg.in/redis.v4/internal/pool"
|
"gopkg.in/redis.v4/internal/pool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Posts a message to the given channel.
|
|
||||||
func (c *Client) Publish(channel, message string) *IntCmd {
|
|
||||||
req := NewIntCmd("PUBLISH", channel, message)
|
|
||||||
c.Process(req)
|
|
||||||
return req
|
|
||||||
}
|
|
||||||
|
|
||||||
// PubSub implements Pub/Sub commands as described in
|
// PubSub implements Pub/Sub commands as described in
|
||||||
// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
|
// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
|
||||||
// multiple goroutines.
|
// multiple goroutines.
|
||||||
|
@ -29,28 +22,6 @@ type PubSub struct {
|
||||||
nsub int // number of active subscriptions
|
nsub int // number of active subscriptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated. Use Subscribe/PSubscribe instead.
|
|
||||||
func (c *Client) PubSub() *PubSub {
|
|
||||||
return &PubSub{
|
|
||||||
base: baseClient{
|
|
||||||
opt: c.opt,
|
|
||||||
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribes the client to the specified channels.
|
|
||||||
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
|
|
||||||
pubsub := c.PubSub()
|
|
||||||
return pubsub, pubsub.Subscribe(channels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribes the client to the given patterns.
|
|
||||||
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
|
|
||||||
pubsub := c.PubSub()
|
|
||||||
return pubsub, pubsub.PSubscribe(channels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
|
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
|
||||||
cn, err := c.base.conn()
|
cn, err := c.base.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
21
redis.go
21
redis.go
|
@ -215,3 +215,24 @@ func (c *Client) pipelineExec(cmds []Cmder) error {
|
||||||
}
|
}
|
||||||
return retErr
|
return retErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) pubSub() *PubSub {
|
||||||
|
return &PubSub{
|
||||||
|
base: baseClient{
|
||||||
|
opt: c.opt,
|
||||||
|
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe subscribes the client to the specified channels.
|
||||||
|
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
|
||||||
|
pubsub := c.pubSub()
|
||||||
|
return pubsub, pubsub.Subscribe(channels...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PSubscribe subscribes the client to the given patterns.
|
||||||
|
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
|
||||||
|
pubsub := c.pubSub()
|
||||||
|
return pubsub, pubsub.PSubscribe(channels...)
|
||||||
|
}
|
||||||
|
|
|
@ -57,10 +57,10 @@ var _ = Describe("Client", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should close pubsub without closing the client", func() {
|
It("should close pubsub without closing the client", func() {
|
||||||
pubsub := client.PubSub()
|
pubsub, err := client.Subscribe()
|
||||||
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
_, err := pubsub.Receive()
|
_, err = pubsub.Receive()
|
||||||
Expect(err).To(MatchError("redis: client is closed"))
|
Expect(err).To(MatchError("redis: client is closed"))
|
||||||
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
Expect(client.Ping().Err()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
@ -90,8 +90,12 @@ var _ = Describe("Client", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should close pubsub when client is closed", func() {
|
It("should close pubsub when client is closed", func() {
|
||||||
pubsub := client.PubSub()
|
pubsub, err := client.Subscribe()
|
||||||
Expect(client.Close()).NotTo(HaveOccurred())
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
_, err = pubsub.Receive()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
|
||||||
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
Expect(pubsub.Close()).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue