forked from mirror/redis
Fix PubSubClient connection pool management.
This commit is contained in:
parent
1d3a223419
commit
ee618a6290
34
pubsub.go
34
pubsub.go
|
@ -7,20 +7,21 @@ import (
|
||||||
|
|
||||||
type PubSubClient struct {
|
type PubSubClient struct {
|
||||||
*Client
|
*Client
|
||||||
conn *Conn
|
|
||||||
ch chan *Message
|
ch chan *Message
|
||||||
once sync.Once
|
once sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPubSubClient(client *Client) (*PubSubClient, error) {
|
func newPubSubClient(client *Client) (*PubSubClient, error) {
|
||||||
conn, _, err := client.ConnPool.Get()
|
pubSubConn, _, err := client.ConnPool.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
client.ConnPool.Remove(pubSubConn)
|
||||||
|
|
||||||
c := &PubSubClient{
|
c := &PubSubClient{
|
||||||
Client: client,
|
Client: &Client{
|
||||||
conn: conn,
|
ConnPool: NewOneConnPool(pubSubConn),
|
||||||
|
},
|
||||||
ch: make(chan *Message),
|
ch: make(chan *Message),
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
|
@ -34,13 +35,17 @@ type Message struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PubSubClient) consumeMessages() {
|
func (c *PubSubClient) consumeMessages() {
|
||||||
|
conn, err := c.conn()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
req := NewMultiBulkReq()
|
req := NewMultiBulkReq()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Replies can arrive in batches.
|
// Replies can arrive in batches.
|
||||||
// Read whole reply and parse messages one by one.
|
// Read whole reply and parse messages one by one.
|
||||||
|
|
||||||
err := c.ReadReply(c.conn)
|
err := c.ReadReply(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := &Message{}
|
msg := &Message{}
|
||||||
msg.Err = err
|
msg.Err = err
|
||||||
|
@ -51,7 +56,7 @@ func (c *PubSubClient) consumeMessages() {
|
||||||
for {
|
for {
|
||||||
msg := &Message{}
|
msg := &Message{}
|
||||||
|
|
||||||
replyI, err := req.ParseReply(c.conn.Rd)
|
replyI, err := req.ParseReply(conn.Rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg.Err = err
|
msg.Err = err
|
||||||
c.ch <- msg
|
c.ch <- msg
|
||||||
|
@ -74,7 +79,7 @@ func (c *PubSubClient) consumeMessages() {
|
||||||
}
|
}
|
||||||
c.ch <- msg
|
c.ch <- msg
|
||||||
|
|
||||||
if !c.conn.Rd.HasUnread() {
|
if !conn.Rd.HasUnread() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,7 +90,12 @@ func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) {
|
||||||
args := append([]string{"SUBSCRIBE"}, channels...)
|
args := append([]string{"SUBSCRIBE"}, channels...)
|
||||||
req := NewMultiBulkReq(args...)
|
req := NewMultiBulkReq(args...)
|
||||||
|
|
||||||
if err := c.WriteReq(req.Req(), c.conn); err != nil {
|
conn, err := c.conn()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.WriteReq(req.Req(), conn); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,5 +109,11 @@ func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) {
|
||||||
func (c *PubSubClient) Unsubscribe(channels ...string) error {
|
func (c *PubSubClient) Unsubscribe(channels ...string) error {
|
||||||
args := append([]string{"UNSUBSCRIBE"}, channels...)
|
args := append([]string{"UNSUBSCRIBE"}, channels...)
|
||||||
req := NewMultiBulkReq(args...)
|
req := NewMultiBulkReq(args...)
|
||||||
return c.WriteReq(req.Req(), c.conn)
|
|
||||||
|
conn, err := c.conn()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.WriteReq(req.Req(), conn)
|
||||||
}
|
}
|
||||||
|
|
11
request.go
11
request.go
|
@ -3,7 +3,6 @@ package redis
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/vmihailenco/bufreader"
|
"github.com/vmihailenco/bufreader"
|
||||||
|
@ -34,17 +33,19 @@ func ParseReq(rd *bufreader.Reader) ([]string, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if line[0] != '*' {
|
if line[0] != '*' {
|
||||||
return []string{string(line)}, nil
|
return []string{string(line)}, nil
|
||||||
}
|
}
|
||||||
|
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
args := make([]string, 0)
|
args := make([]string, 0)
|
||||||
for {
|
for i := int64(0); i < numReplies; i++ {
|
||||||
line, err = rd.ReadLine('\n')
|
line, err = rd.ReadLine('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if line[0] != '$' {
|
if line[0] != '$' {
|
||||||
|
|
Loading…
Reference in New Issue