diff --git a/cluster.go b/cluster.go index 47b79244..364e9c06 100644 --- a/cluster.go +++ b/cluster.go @@ -1522,40 +1522,46 @@ func (c *ClusterClient) txPipelineReadQueued( return nil } -func (c *ClusterClient) pubSub(channels []string) *PubSub { +func (c *ClusterClient) pubSub() *PubSub { var node *clusterNode pubsub := &PubSub{ opt: c.opt.clientOptions(), newConn: func(channels []string) (*pool.Conn, error) { - if node == nil { - var slot int - if len(channels) > 0 { - slot = hashtag.Slot(channels[0]) - } else { - slot = -1 - } - - masterNode, err := c.slotMasterNode(slot) - if err != nil { - return nil, err - } - node = masterNode + if node != nil { + panic("node != nil") } - return node.Client.newConn() + + slot := hashtag.Slot(channels[0]) + + var err error + node, err = c.slotMasterNode(slot) + if err != nil { + return nil, err + } + + cn, err := node.Client.newConn() + if err != nil { + return nil, err + } + + return cn, nil }, closeConn: func(cn *pool.Conn) error { - return node.Client.connPool.CloseConn(cn) + err := node.Client.connPool.CloseConn(cn) + node = nil + return err }, } pubsub.init() + return pubsub } // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. func (c *ClusterClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.Subscribe(channels...) } @@ -1565,7 +1571,7 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub { // PSubscribe subscribes the client to the given patterns. // Patterns can be omitted to create empty subscription. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.PSubscribe(channels...) }