forked from mirror/redis
Add ParseReq method and tweak benchmarks.
This commit is contained in:
parent
f56748aab9
commit
b6ae953e1c
78
parser.go
78
parser.go
|
@ -55,6 +55,66 @@ func readLine(rd reader) ([]byte, error) {
|
||||||
return line, nil
|
return line, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func readN(rd reader, n int) ([]byte, error) {
|
||||||
|
buf, err := rd.ReadN(n)
|
||||||
|
if err == bufio.ErrBufferFull {
|
||||||
|
newBuf := make([]byte, n)
|
||||||
|
r := copy(newBuf, buf)
|
||||||
|
buf = newBuf
|
||||||
|
|
||||||
|
for r < n {
|
||||||
|
n, err := rd.Read(buf[r:])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r += n
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return buf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
func ParseReq(rd reader) ([]string, error) {
|
||||||
|
line, err := readLine(rd)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if line[0] != '*' {
|
||||||
|
return []string{string(line)}, nil
|
||||||
|
}
|
||||||
|
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
args := make([]string, 0, numReplies)
|
||||||
|
for i := int64(0); i < numReplies; i++ {
|
||||||
|
line, err = readLine(rd)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if line[0] != '$' {
|
||||||
|
return nil, fmt.Errorf("Expected '$', but got %q", line)
|
||||||
|
}
|
||||||
|
|
||||||
|
argLen, err := strconv.ParseInt(string(line[1:]), 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
arg, err := readN(rd, int(argLen)+2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
args = append(args, string(arg[:argLen]))
|
||||||
|
}
|
||||||
|
return args, nil
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -99,24 +159,10 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) {
|
||||||
}
|
}
|
||||||
replyLen := int(replyLenInt32) + 2
|
replyLen := int(replyLenInt32) + 2
|
||||||
|
|
||||||
line, err = rd.ReadN(replyLen)
|
line, err = readN(rd, replyLen)
|
||||||
if err == bufio.ErrBufferFull {
|
if err != nil {
|
||||||
buf := make([]byte, replyLen)
|
|
||||||
r := copy(buf, line)
|
|
||||||
|
|
||||||
for r < replyLen {
|
|
||||||
n, err := rd.Read(buf[r:])
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
r += n
|
|
||||||
}
|
|
||||||
|
|
||||||
line = buf
|
|
||||||
} else if err != nil {
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return string(line[:len(line)-2]), nil
|
return string(line[:len(line)-2]), nil
|
||||||
case '*':
|
case '*':
|
||||||
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
|
if len(line) == 3 && line[1] == '-' && line[2] == '1' {
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
package redis_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
"github.com/vmihailenco/bufio"
|
||||||
|
. "launchpad.net/gocheck"
|
||||||
|
|
||||||
|
"github.com/vmihailenco/redis"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ParserTest struct{}
|
||||||
|
|
||||||
|
var _ = Suite(&ParserTest{})
|
||||||
|
|
||||||
|
func (t *ParserTest) TestParseReq(c *C) {
|
||||||
|
buf := bytes.NewBufferString("*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nhello\r\n")
|
||||||
|
rd := bufio.NewReaderSize(buf, 1024)
|
||||||
|
|
||||||
|
args, err := redis.ParseReq(rd)
|
||||||
|
c.Check(err, IsNil)
|
||||||
|
c.Check(args, DeepEquals, []string{"SET", "key", "hello"})
|
||||||
|
}
|
|
@ -45,7 +45,7 @@ func (c *PubSubClient) consumeMessages(conn *Conn) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg.Err = err
|
msg.Err = err
|
||||||
c.ch <- msg
|
c.ch <- msg
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
reply, ok := replyIface.([]interface{})
|
reply, ok := replyIface.([]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
19
redis.go
19
redis.go
|
@ -79,6 +79,9 @@ func (c *BaseClient) conn() (*Conn, error) {
|
||||||
}
|
}
|
||||||
err = c.InitConn(client)
|
err = c.InitConn(client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err := c.ConnPool.Remove(conn); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,7 +105,9 @@ func (c *BaseClient) Run(req Req) {
|
||||||
|
|
||||||
err = c.WriteReq(conn, req)
|
err = c.WriteReq(conn, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.ConnPool.Remove(conn)
|
if err := c.ConnPool.Remove(conn); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
req.SetErr(err)
|
req.SetErr(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -110,15 +115,21 @@ func (c *BaseClient) Run(req Req) {
|
||||||
val, err := req.ParseReply(conn.Rd)
|
val, err := req.ParseReply(conn.Rd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == Nil {
|
if err == Nil {
|
||||||
c.ConnPool.Add(conn)
|
if err := c.ConnPool.Add(conn); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
c.ConnPool.Remove(conn)
|
if err := c.ConnPool.Remove(conn); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
req.SetErr(err)
|
req.SetErr(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.ConnPool.Add(conn)
|
if err := c.ConnPool.Add(conn); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
req.SetVal(val)
|
req.SetVal(val)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
18
req_test.go
18
req_test.go
|
@ -60,10 +60,18 @@ func (t *RequestTest) BenchmarkStatusReq(c *C) {
|
||||||
t.benchmarkReq(c, "+OK\r\n", redis.NewStatusReq(), Equals, "OK")
|
t.benchmarkReq(c, "+OK\r\n", redis.NewStatusReq(), Equals, "OK")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *RequestTest) BenchmarkIntReq(c *C) {
|
||||||
|
t.benchmarkReq(c, ":1\r\n", redis.NewIntReq(), Equals, int64(1))
|
||||||
|
}
|
||||||
|
|
||||||
func (t *RequestTest) BenchmarkStringReq(c *C) {
|
func (t *RequestTest) BenchmarkStringReq(c *C) {
|
||||||
t.benchmarkReq(c, "$5\r\nhello\r\n", redis.NewStringReq(), Equals, "hello")
|
t.benchmarkReq(c, "$5\r\nhello\r\n", redis.NewStringReq(), Equals, "hello")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *RequestTest) BenchmarkFloatReq(c *C) {
|
||||||
|
t.benchmarkReq(c, "$5\r\n1.111\r\n", redis.NewFloatReq(), Equals, 1.111)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *RequestTest) BenchmarkStringSliceReq(c *C) {
|
func (t *RequestTest) BenchmarkStringSliceReq(c *C) {
|
||||||
t.benchmarkReq(
|
t.benchmarkReq(
|
||||||
c,
|
c,
|
||||||
|
@ -73,3 +81,13 @@ func (t *RequestTest) BenchmarkStringSliceReq(c *C) {
|
||||||
[]string{"hello", "hello"},
|
[]string{"hello", "hello"},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *RequestTest) BenchmarkIfaceSliceReq(c *C) {
|
||||||
|
t.benchmarkReq(
|
||||||
|
c,
|
||||||
|
"*2\r\n$5\r\nhello\r\n$5\r\nhello\r\n",
|
||||||
|
redis.NewIfaceSliceReq(),
|
||||||
|
DeepEquals,
|
||||||
|
[]interface{}{"hello", "hello"},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue