forked from mirror/redis
v2: Remove timed out connection from the pool.
This commit is contained in:
parent
dc90e359e9
commit
73b92efa94
24
v2/parser.go
24
v2/parser.go
|
@ -29,16 +29,6 @@ var (
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type parserError struct {
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *parserError) Error() string {
|
|
||||||
return e.err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
func appendReq(buf []byte, args []string) []byte {
|
func appendReq(buf []byte, args []string) []byte {
|
||||||
buf = append(buf, '*')
|
buf = append(buf, '*')
|
||||||
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
|
buf = strconv.AppendUint(buf, uint64(len(args)), 10)
|
||||||
|
@ -163,7 +153,7 @@ func parseStringFloatMapReply(rd reader) (interface{}, error) {
|
||||||
func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
||||||
line, err := readLine(rd)
|
line, err := readLine(rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, &parserError{err}
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch line[0] {
|
switch line[0] {
|
||||||
|
@ -174,7 +164,7 @@ func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
||||||
case ':':
|
case ':':
|
||||||
v, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
v, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, &parserError{err}
|
return 0, err
|
||||||
}
|
}
|
||||||
return v, nil
|
return v, nil
|
||||||
case '$':
|
case '$':
|
||||||
|
@ -184,13 +174,13 @@ func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
||||||
|
|
||||||
replyLenInt32, err := strconv.ParseInt(string(line[1:]), 10, 32)
|
replyLenInt32, err := strconv.ParseInt(string(line[1:]), 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", &parserError{err}
|
return "", err
|
||||||
}
|
}
|
||||||
replyLen := int(replyLenInt32) + 2
|
replyLen := int(replyLenInt32) + 2
|
||||||
|
|
||||||
line, err = readN(rd, replyLen)
|
line, err = readN(rd, replyLen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", &parserError{err}
|
return "", err
|
||||||
}
|
}
|
||||||
return string(line[:len(line)-2]), nil
|
return string(line[:len(line)-2]), nil
|
||||||
case '*':
|
case '*':
|
||||||
|
@ -200,7 +190,7 @@ func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
||||||
|
|
||||||
repliesNum, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
repliesNum, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &parserError{err}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch typ {
|
switch typ {
|
||||||
|
@ -278,7 +268,7 @@ func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
||||||
}
|
}
|
||||||
value, err := strconv.ParseFloat(valueS, 64)
|
value, err := strconv.ParseFloat(valueS, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &parserError{err}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m[key] = value
|
m[key] = value
|
||||||
|
@ -299,6 +289,6 @@ func _parseReply(rd reader, typ replyType) (interface{}, error) {
|
||||||
return vals, nil
|
return vals, nil
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, &parserError{fmt.Errorf("redis: can't parse %q", line)}
|
return nil, fmt.Errorf("redis: can't parse %q", line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,8 @@ type PipelineClient struct {
|
||||||
*Client
|
*Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: rename to Pipeline
|
||||||
|
// TODO: return just *PipelineClient
|
||||||
func (c *Client) PipelineClient() (*PipelineClient, error) {
|
func (c *Client) PipelineClient() (*PipelineClient, error) {
|
||||||
return &PipelineClient{
|
return &PipelineClient{
|
||||||
Client: &Client{
|
Client: &Client{
|
||||||
|
@ -39,6 +41,8 @@ func (c *PipelineClient) DiscardQueued() {
|
||||||
c.reqsMtx.Unlock()
|
c.reqsMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: rename to Run or ...
|
||||||
|
// TODO: should return error if one of the commands failed
|
||||||
func (c *PipelineClient) RunQueued() ([]Req, error) {
|
func (c *PipelineClient) RunQueued() ([]Req, error) {
|
||||||
c.reqsMtx.Lock()
|
c.reqsMtx.Lock()
|
||||||
reqs := c.reqs
|
reqs := c.reqs
|
||||||
|
@ -49,18 +53,17 @@ func (c *PipelineClient) RunQueued() ([]Req, error) {
|
||||||
return []Req{}, nil
|
return []Req{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := c.conn()
|
cn, err := c.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.runReqs(reqs, conn)
|
if err := c.runReqs(reqs, cn); err != nil {
|
||||||
if err != nil {
|
c.removeConn(cn)
|
||||||
c.removeConn(conn)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.putConn(conn)
|
c.putConn(cn)
|
||||||
return reqs, nil
|
return reqs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -119,10 +119,10 @@ func (c *baseClient) run(req Req) {
|
||||||
|
|
||||||
val, err := req.ParseReply(cn.Rd)
|
val, err := req.ParseReply(cn.Rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(*parserError); ok {
|
if err == Nil {
|
||||||
c.removeConn(cn)
|
|
||||||
} else {
|
|
||||||
c.putConn(cn)
|
c.putConn(cn)
|
||||||
|
} else {
|
||||||
|
c.removeConn(cn)
|
||||||
}
|
}
|
||||||
req.SetErr(err)
|
req.SetErr(err)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue