Merge pull request #955 from go-redis/fix/dont-remember-node

Don't remember cluster node for the PubSub
This commit is contained in:
Vladimir Mihailenco 2019-02-08 15:37:55 +02:00 committed by GitHub
commit 473d039f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 24 additions and 18 deletions

View File

@ -1527,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil return nil
} }
func (c *ClusterClient) pubSub(channels []string) *PubSub { func (c *ClusterClient) pubSub() *PubSub {
var node *clusterNode var node *clusterNode
pubsub := &PubSub{ pubsub := &PubSub{
opt: c.opt.clientOptions(), opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) { newConn: func(channels []string) (*pool.Conn, error) {
if node == nil { if node != nil {
var slot int panic("node != nil")
if len(channels) > 0 {
slot = hashtag.Slot(channels[0])
} else {
slot = -1
}
masterNode, err := c.slotMasterNode(slot)
if err != nil {
return nil, err
}
node = masterNode
} }
return node.Client.newConn()
slot := hashtag.Slot(channels[0])
var err error
node, err = c.slotMasterNode(slot)
if err != nil {
return nil, err
}
cn, err := node.Client.newConn()
if err != nil {
return nil, err
}
return cn, nil
}, },
closeConn: func(cn *pool.Conn) error { closeConn: func(cn *pool.Conn) error {
return node.Client.connPool.CloseConn(cn) err := node.Client.connPool.CloseConn(cn)
node = nil
return err
}, },
} }
pubsub.init() pubsub.init()
return pubsub return pubsub
} }
// Subscribe subscribes the client to the specified channels. // Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription. // Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub { func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels) pubsub := c.pubSub()
if len(channels) > 0 { if len(channels) > 0 {
_ = pubsub.Subscribe(channels...) _ = pubsub.Subscribe(channels...)
} }
@ -1570,7 +1576,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns. // PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription. // Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels) pubsub := c.pubSub()
if len(channels) > 0 { if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...) _ = pubsub.PSubscribe(channels...)
} }