Get rid of custom buffered reader.

This commit is contained in:
Vladimir Mihailenco 2012-08-09 13:12:41 +03:00
parent ea3677294b
commit b2e3463af1
8 changed files with 103 additions and 100 deletions

View File

@ -16,9 +16,8 @@ Supports:
Installation Installation
------------ ------------
Run: Install:
go get github.com/vmihailenco/bufreader
go get github.com/vmihailenco/redis go get github.com/vmihailenco/redis
Run tests: Run tests:

View File

@ -1,23 +1,22 @@
package redis package redis
import ( import (
"bufio"
"io" "io"
"log" "log"
"os" "os"
"sync" "sync"
"github.com/vmihailenco/bufreader"
) )
type Conn struct { type Conn struct {
RW io.ReadWriter RW io.ReadWriter
Rd *bufreader.Reader Rd *bufio.Reader
} }
func NewConn(rw io.ReadWriter) *Conn { func NewConn(rw io.ReadWriter) *Conn {
return &Conn{ return &Conn{
RW: rw, RW: rw,
Rd: bufreader.NewSizedReader(8024), Rd: bufio.NewReaderSize(rw, 1024),
} }
} }

View File

@ -42,17 +42,6 @@ func (c *PubSubClient) consumeMessages() {
req := NewMultiBulkReq() req := NewMultiBulkReq()
for { for {
// Replies can arrive in batches.
// Read whole reply and parse messages one by one.
err := c.ReadReply(conn)
if err != nil {
msg := &Message{}
msg.Err = err
c.ch <- msg
return
}
for { for {
msg := &Message{} msg := &Message{}
@ -79,7 +68,7 @@ func (c *PubSubClient) consumeMessages() {
} }
c.ch <- msg c.ch <- msg
if !conn.Rd.HasUnread() { if conn.Rd.Buffered() <= 0 {
break break
} }
} }

View File

@ -2,12 +2,15 @@ package redis
import ( import (
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
"sync" "sync"
)
"github.com/vmihailenco/bufreader" var (
ErrReaderTooSmall = errors.New("redis: Reader is too small")
) )
type OpenConnFunc func() (io.ReadWriter, error) type OpenConnFunc func() (io.ReadWriter, error)
@ -50,9 +53,7 @@ func AuthSelectFunc(password string, db int64) InitConnFunc {
} }
} }
func createReader() (*bufreader.Reader, error) { //------------------------------------------------------------------------------
return bufreader.NewSizedReader(8192), nil
}
type Client struct { type Client struct {
mtx sync.Mutex mtx sync.Mutex
@ -103,21 +104,6 @@ func (c *Client) WriteReq(buf []byte, conn *Conn) error {
return err return err
} }
func (c *Client) ReadReply(conn *Conn) error {
_, err := conn.Rd.ReadFrom(conn.RW)
if err != nil {
return err
}
return nil
}
func (c *Client) WriteRead(buf []byte, conn *Conn) error {
if err := c.WriteReq(buf, conn); err != nil {
return err
}
return c.ReadReply(conn)
}
func (c *Client) Process(req Req) { func (c *Client) Process(req Req) {
if c.reqs == nil { if c.reqs == nil {
c.Run(req) c.Run(req)
@ -139,7 +125,7 @@ func (c *Client) Run(req Req) {
return return
} }
err = c.WriteRead(req.Req(), conn) err = c.WriteReq(req.Req(), conn)
if err != nil { if err != nil {
c.ConnPool.Remove(conn) c.ConnPool.Remove(conn)
req.SetErr(err) req.SetErr(err)
@ -193,19 +179,12 @@ func (c *Client) RunReqs(reqs []Req, conn *Conn) error {
} }
} }
err := c.WriteRead(multiReq, conn) err := c.WriteReq(multiReq, conn)
if err != nil { if err != nil {
return err return err
} }
for i := 0; i < len(reqs); i++ { for i := 0; i < len(reqs); i++ {
if !conn.Rd.HasUnread() {
_, err := conn.Rd.ReadFrom(conn.RW)
if err != err {
return err
}
}
req := reqs[i] req := reqs[i]
val, err := req.ParseReply(conn.Rd) val, err := req.ParseReply(conn.Rd)
if err != nil { if err != nil {
@ -259,7 +238,7 @@ func (c *Client) ExecReqs(reqs []Req, conn *Conn) error {
} }
multiReq = append(multiReq, PackReq([]string{"EXEC"})...) multiReq = append(multiReq, PackReq([]string{"EXEC"})...)
err := c.WriteRead(multiReq, conn) err := c.WriteReq(multiReq, conn)
if err != nil { if err != nil {
return err return err
} }
@ -274,13 +253,6 @@ func (c *Client) ExecReqs(reqs []Req, conn *Conn) error {
// Parse queued replies. // Parse queued replies.
for _ = range reqs { for _ = range reqs {
if !conn.Rd.HasUnread() {
_, err := conn.Rd.ReadFrom(conn.RW)
if err != err {
return err
}
}
_, err = statusReq.ParseReply(conn.Rd) _, err = statusReq.ParseReply(conn.Rd)
if err != nil { if err != nil {
return err return err
@ -288,12 +260,13 @@ func (c *Client) ExecReqs(reqs []Req, conn *Conn) error {
} }
// Parse number of replies. // Parse number of replies.
line, err := conn.Rd.ReadLine('\n') line, err := readLine(conn.Rd)
if err != nil { if err != nil {
return err return err
} }
if line[0] != '*' { if line[0] != '*' {
return fmt.Errorf("Expected '*', but got line %q of %q.", line, conn.Rd.Bytes()) buf, _ := conn.Rd.Peek(conn.Rd.Buffered())
return fmt.Errorf("Expected '*', but got line %q of %q.", line, buf)
} }
// Parse replies. // Parse replies.

View File

@ -1964,20 +1964,23 @@ func (t *RedisTest) BenchmarkRedisMGet(c *C) {
func (t *RedisTest) BenchmarkRedisWriteRead(c *C) { func (t *RedisTest) BenchmarkRedisWriteRead(c *C) {
c.StopTimer() c.StopTimer()
req := []byte("PING\r\n")
conn, _, err := t.client.ConnPool.Get() conn, _, err := t.client.ConnPool.Get()
c.Check(err, IsNil) c.Check(err, IsNil)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
err := t.client.WriteRead(req, conn) err := t.client.WriteReq([]byte("PING\r\n"), conn)
c.Check(err, IsNil) c.Check(err, IsNil)
c.Check(conn.Rd.Bytes(), DeepEquals, []byte("+PONG\r\n"))
line, _, err := conn.Rd.ReadLine()
c.Check(err, IsNil)
c.Check(line, DeepEquals, []byte("+PONG"))
} }
c.StartTimer() c.StartTimer()
for i := 0; i < c.N; i++ { for i := 0; i < c.N; i++ {
t.client.WriteRead(req, conn) t.client.WriteReq([]byte("PING\r\n"), conn)
conn.Rd.ReadLine()
} }
c.StopTimer() c.StopTimer()

View File

@ -1,11 +1,10 @@
package redis package redis
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
"github.com/vmihailenco/bufreader"
) )
var Nil = errors.New("(nil)") var Nil = errors.New("(nil)")
@ -28,8 +27,8 @@ func isNoReplies(line []byte) bool {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func ParseReq(rd *bufreader.Reader) ([]string, error) { func ParseReq(rd *bufio.Reader) ([]string, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -44,15 +43,19 @@ func ParseReq(rd *bufreader.Reader) ([]string, error) {
args := make([]string, 0) args := make([]string, 0)
for i := int64(0); i < numReplies; i++ { for i := int64(0); i < numReplies; i++ {
line, err = rd.ReadLine('\n') line, err = readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if line[0] != '$' { if line[0] != '$' {
return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, buf)
} }
line, err = rd.ReadLine('\n') line, err = readLine(rd)
if err != nil {
return nil, err
}
args = append(args, string(line)) args = append(args, string(line))
} }
return args, nil return args, nil
@ -79,7 +82,7 @@ func PackReq(args []string) []byte {
type Req interface { type Req interface {
Req() []byte Req() []byte
ParseReply(*bufreader.Reader) (interface{}, error) ParseReply(*bufio.Reader) (interface{}, error)
SetErr(error) SetErr(error)
Err() error Err() error
SetVal(interface{}) SetVal(interface{})
@ -133,7 +136,7 @@ func (r *BaseReq) InterfaceVal() interface{} {
return r.val return r.val
} }
func (r *BaseReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *BaseReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
panic("abstract") panic("abstract")
} }
@ -149,8 +152,8 @@ func NewStatusReq(args ...string) *StatusReq {
} }
} }
func (r *StatusReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *StatusReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -158,7 +161,8 @@ func (r *StatusReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
if line[0] == '-' { if line[0] == '-' {
return nil, errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
} else if line[0] != '+' { } else if line[0] != '+' {
return nil, fmt.Errorf("Expected '+', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected '+', but got %q of %q.", line, buf)
} }
return string(line[1:]), nil return string(line[1:]), nil
@ -183,8 +187,8 @@ func NewIntReq(args ...string) *IntReq {
} }
} }
func (r *IntReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *IntReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -192,7 +196,8 @@ func (r *IntReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
if line[0] == '-' { if line[0] == '-' {
return nil, errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
} else if line[0] != ':' { } else if line[0] != ':' {
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, buf)
} }
return strconv.ParseInt(string(line[1:]), 10, 64) return strconv.ParseInt(string(line[1:]), 10, 64)
@ -217,8 +222,8 @@ func NewIntNilReq(args ...string) *IntNilReq {
} }
} }
func (r *IntNilReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *IntNilReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -231,7 +236,8 @@ func (r *IntNilReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
return nil, Nil return nil, Nil
} }
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, buf)
} }
func (r *IntNilReq) Val() int64 { func (r *IntNilReq) Val() int64 {
@ -253,8 +259,8 @@ func NewBoolReq(args ...string) *BoolReq {
} }
} }
func (r *BoolReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *BoolReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -262,7 +268,8 @@ func (r *BoolReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
if line[0] == '-' { if line[0] == '-' {
return nil, errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
} else if line[0] != ':' { } else if line[0] != ':' {
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected ':', but got %q of %q.", line, buf)
} }
return line[1] == '1', nil return line[1] == '1', nil
@ -287,8 +294,8 @@ func NewBulkReq(args ...string) *BulkReq {
} }
} }
func (r *BulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *BulkReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -296,14 +303,15 @@ func (r *BulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
if line[0] == '-' { if line[0] == '-' {
return nil, errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
} else if line[0] != '$' { } else if line[0] != '$' {
return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, buf)
} }
if isNil(line) { if isNil(line) {
return nil, Nil return nil, Nil
} }
line, err = rd.ReadLine('\n') line, err = readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -330,8 +338,8 @@ func NewFloatReq(args ...string) *FloatReq {
} }
} }
func (r *FloatReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *FloatReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -339,14 +347,15 @@ func (r *FloatReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
if line[0] == '-' { if line[0] == '-' {
return nil, errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
} else if line[0] != '$' { } else if line[0] != '$' {
return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected '$', but got %q of %q.", line, buf)
} }
if isNil(line) { if isNil(line) {
return nil, Nil return nil, Nil
} }
line, err = rd.ReadLine('\n') line, err = readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -373,8 +382,8 @@ func NewMultiBulkReq(args ...string) *MultiBulkReq {
} }
} }
func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) { func (r *MultiBulkReq) ParseReply(rd *bufio.Reader) (interface{}, error) {
line, err := rd.ReadLine('\n') line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -382,7 +391,8 @@ func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
if line[0] == '-' { if line[0] == '-' {
return nil, errors.New(string(line[1:])) return nil, errors.New(string(line[1:]))
} else if line[0] != '*' { } else if line[0] != '*' {
return nil, fmt.Errorf("Expected '*', but got line %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected '*', but got line %q of %q.", line, buf)
} else if isNil(line) { } else if isNil(line) {
return nil, Nil return nil, Nil
} }
@ -397,7 +407,7 @@ func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
} }
for i := int64(0); i < numReplies; i++ { for i := int64(0); i < numReplies; i++ {
line, err = rd.ReadLine('\n') line, err = readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -415,14 +425,15 @@ func (r *MultiBulkReq) ParseReply(rd *bufreader.Reader) (interface{}, error) {
} else if isNil(line) { } else if isNil(line) {
val = append(val, nil) val = append(val, nil)
} else { } else {
line, err = rd.ReadLine('\n') line, err = readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
val = append(val, string(line)) val = append(val, string(line))
} }
} else { } else {
return nil, fmt.Errorf("Expected '$', but got line %q of %q.", line, rd.Bytes()) buf, _ := rd.Peek(rd.Buffered())
return nil, fmt.Errorf("Expected '$', but got line %q of %q.", line, buf)
} }
} }

View File

@ -1,7 +1,8 @@
package redis_test package redis_test
import ( import (
"github.com/vmihailenco/bufreader" "bufio"
. "launchpad.net/gocheck" . "launchpad.net/gocheck"
"github.com/vmihailenco/redis" "github.com/vmihailenco/redis"
@ -9,6 +10,20 @@ import (
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type LineReader struct {
line []byte
}
func NewLineReader(line []byte) *LineReader {
return &LineReader{line: line}
}
func (r *LineReader) Read(buf []byte) (int, error) {
return copy(buf, r.line), nil
}
//------------------------------------------------------------------------------
type RequestTest struct{} type RequestTest struct{}
var _ = Suite(&RequestTest{}) var _ = Suite(&RequestTest{})
@ -24,12 +39,11 @@ func (t *RequestTest) TearDownTest(c *C) {}
func (t *RequestTest) BenchmarkStatusReq(c *C) { func (t *RequestTest) BenchmarkStatusReq(c *C) {
c.StopTimer() c.StopTimer()
rd := bufreader.NewSizedReader(1024) lineReader := NewLineReader([]byte("+OK\r\n"))
rd.Set([]byte("+OK\r\n")) rd := bufio.NewReaderSize(lineReader, 1024)
req := redis.NewStatusReq() req := redis.NewStatusReq()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
rd.ResetPos()
vI, err := req.ParseReply(rd) vI, err := req.ParseReply(rd)
c.Check(err, IsNil) c.Check(err, IsNil)
c.Check(vI, Equals, "OK") c.Check(vI, Equals, "OK")
@ -42,7 +56,6 @@ func (t *RequestTest) BenchmarkStatusReq(c *C) {
c.StartTimer() c.StartTimer()
for i := 0; i < c.N; i++ { for i := 0; i < c.N; i++ {
rd.ResetPos()
v, _ := req.ParseReply(rd) v, _ := req.ParseReply(rd)
req.SetVal(v) req.SetVal(v)
req.Err() req.Err()

16
utils.go Normal file
View File

@ -0,0 +1,16 @@
package redis
import (
"bufio"
)
func readLine(rd *bufio.Reader) ([]byte, error) {
line, isPrefix, err := rd.ReadLine()
if err != nil {
return line, err
}
if isPrefix {
return line, ErrReaderTooSmall
}
return line, nil
}