From b6ae953e1c28a81727d649ea02e21006cfea1f5a Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Fri, 24 Aug 2012 15:16:12 +0300 Subject: [PATCH] Add ParseReq method and tweak benchmarks. --- parser.go | 78 +++++++++++++++++++++++++++++++++++++++----------- parser_test.go | 23 +++++++++++++++ pubsub.go | 2 +- redis.go | 19 +++++++++--- req_test.go | 18 ++++++++++++ 5 files changed, 119 insertions(+), 21 deletions(-) create mode 100644 parser_test.go diff --git a/parser.go b/parser.go index 7523fb63..ddf7e719 100644 --- a/parser.go +++ b/parser.go @@ -55,6 +55,66 @@ func readLine(rd reader) ([]byte, error) { return line, nil } +func readN(rd reader, n int) ([]byte, error) { + buf, err := rd.ReadN(n) + if err == bufio.ErrBufferFull { + newBuf := make([]byte, n) + r := copy(newBuf, buf) + buf = newBuf + + for r < n { + n, err := rd.Read(buf[r:]) + if err != nil { + return nil, err + } + r += n + } + } else if err != nil { + return nil, err + } + return buf, nil +} + +//------------------------------------------------------------------------------ + +func ParseReq(rd reader) ([]string, error) { + line, err := readLine(rd) + if err != nil { + return nil, err + } + + if line[0] != '*' { + return []string{string(line)}, nil + } + numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64) + if err != nil { + return nil, err + } + + args := make([]string, 0, numReplies) + for i := int64(0); i < numReplies; i++ { + line, err = readLine(rd) + if err != nil { + return nil, err + } + if line[0] != '$' { + return nil, fmt.Errorf("Expected '$', but got %q", line) + } + + argLen, err := strconv.ParseInt(string(line[1:]), 10, 32) + if err != nil { + return nil, err + } + + arg, err := readN(rd, int(argLen)+2) + if err != nil { + return nil, err + } + args = append(args, string(arg[:argLen])) + } + return args, nil +} + //------------------------------------------------------------------------------ const ( @@ -99,24 +159,10 @@ func _parseReply(rd reader, multiBulkType int) (interface{}, error) { } replyLen := int(replyLenInt32) + 2 - line, err = rd.ReadN(replyLen) - if err == bufio.ErrBufferFull { - buf := make([]byte, replyLen) - r := copy(buf, line) - - for r < replyLen { - n, err := rd.Read(buf[r:]) - if err != nil { - return "", err - } - r += n - } - - line = buf - } else if err != nil { + line, err = readN(rd, replyLen) + if err != nil { return "", err } - return string(line[:len(line)-2]), nil case '*': if len(line) == 3 && line[1] == '-' && line[2] == '1' { diff --git a/parser_test.go b/parser_test.go new file mode 100644 index 00000000..17eefcd6 --- /dev/null +++ b/parser_test.go @@ -0,0 +1,23 @@ +package redis_test + +import ( + "bytes" + + "github.com/vmihailenco/bufio" + . "launchpad.net/gocheck" + + "github.com/vmihailenco/redis" +) + +type ParserTest struct{} + +var _ = Suite(&ParserTest{}) + +func (t *ParserTest) TestParseReq(c *C) { + buf := bytes.NewBufferString("*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nhello\r\n") + rd := bufio.NewReaderSize(buf, 1024) + + args, err := redis.ParseReq(rd) + c.Check(err, IsNil) + c.Check(args, DeepEquals, []string{"SET", "key", "hello"}) +} diff --git a/pubsub.go b/pubsub.go index 0b9c6b5d..650a745c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -45,7 +45,7 @@ func (c *PubSubClient) consumeMessages(conn *Conn) { if err != nil { msg.Err = err c.ch <- msg - break + return } reply, ok := replyIface.([]interface{}) if !ok { diff --git a/redis.go b/redis.go index 9dde6598..22ce2f84 100644 --- a/redis.go +++ b/redis.go @@ -79,6 +79,9 @@ func (c *BaseClient) conn() (*Conn, error) { } err = c.InitConn(client) if err != nil { + if err := c.ConnPool.Remove(conn); err != nil { + panic(err) + } return nil, err } } @@ -102,7 +105,9 @@ func (c *BaseClient) Run(req Req) { err = c.WriteReq(conn, req) if err != nil { - c.ConnPool.Remove(conn) + if err := c.ConnPool.Remove(conn); err != nil { + panic(err) + } req.SetErr(err) return } @@ -110,15 +115,21 @@ func (c *BaseClient) Run(req Req) { val, err := req.ParseReply(conn.Rd) if err != nil { if err == Nil { - c.ConnPool.Add(conn) + if err := c.ConnPool.Add(conn); err != nil { + panic(err) + } } else { - c.ConnPool.Remove(conn) + if err := c.ConnPool.Remove(conn); err != nil { + panic(err) + } } req.SetErr(err) return } - c.ConnPool.Add(conn) + if err := c.ConnPool.Add(conn); err != nil { + panic(err) + } req.SetVal(val) } diff --git a/req_test.go b/req_test.go index 18ba99f2..d7588a31 100644 --- a/req_test.go +++ b/req_test.go @@ -60,10 +60,18 @@ func (t *RequestTest) BenchmarkStatusReq(c *C) { t.benchmarkReq(c, "+OK\r\n", redis.NewStatusReq(), Equals, "OK") } +func (t *RequestTest) BenchmarkIntReq(c *C) { + t.benchmarkReq(c, ":1\r\n", redis.NewIntReq(), Equals, int64(1)) +} + func (t *RequestTest) BenchmarkStringReq(c *C) { t.benchmarkReq(c, "$5\r\nhello\r\n", redis.NewStringReq(), Equals, "hello") } +func (t *RequestTest) BenchmarkFloatReq(c *C) { + t.benchmarkReq(c, "$5\r\n1.111\r\n", redis.NewFloatReq(), Equals, 1.111) +} + func (t *RequestTest) BenchmarkStringSliceReq(c *C) { t.benchmarkReq( c, @@ -73,3 +81,13 @@ func (t *RequestTest) BenchmarkStringSliceReq(c *C) { []string{"hello", "hello"}, ) } + +func (t *RequestTest) BenchmarkIfaceSliceReq(c *C) { + t.benchmarkReq( + c, + "*2\r\n$5\r\nhello\r\n$5\r\nhello\r\n", + redis.NewIfaceSliceReq(), + DeepEquals, + []interface{}{"hello", "hello"}, + ) +}