diff --git a/internal/pool/conn.go b/internal/pool/conn.go index f39f265..4cab656 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -24,7 +24,7 @@ func NewConn(netConn net.Conn) *Conn { cn := &Conn{ netConn: netConn, } - buf := proto.NewBufioReader(netConn) + buf := proto.NewElasticBufReader(netConn) cn.Rd = proto.NewReader(buf) cn.WB = proto.NewWriteBuffer(buf) cn.SetUsedAt(time.Now()) diff --git a/internal/proto/bufio_reader.go b/internal/proto/elastic_reader.go similarity index 71% rename from internal/proto/bufio_reader.go rename to internal/proto/elastic_reader.go index 7ca05d6..9e4eea2 100644 --- a/internal/proto/bufio_reader.go +++ b/internal/proto/elastic_reader.go @@ -8,49 +8,56 @@ import ( const defaultBufSize = 4096 -type BufioReader struct { +// ElasticBufReader is like bufio.Reader but instead of returning ErrBufferFull +// it automatically grows the buffer. +type ElasticBufReader struct { buf []byte rd io.Reader // reader provided by the client r, w int // buf read and write positions err error } -func NewBufioReader(rd io.Reader) *BufioReader { - r := new(BufioReader) - r.reset(make([]byte, defaultBufSize), rd) - return r +func NewElasticBufReader(rd io.Reader) *ElasticBufReader { + return &ElasticBufReader{ + buf: make([]byte, defaultBufSize), + rd: rd, + } } -func (b *BufioReader) Reset(rd io.Reader) { - b.reset(b.buf, rd) +func (b *ElasticBufReader) Reset(rd io.Reader) { + b.rd = rd + b.r, b.w = 0, 0 + b.err = nil } -func (b *BufioReader) Buffer() []byte { +func (b *ElasticBufReader) Buffer() []byte { return b.buf } -func (b *BufioReader) ResetBuffer(buf []byte) { - b.reset(buf, b.rd) +func (b *ElasticBufReader) ResetBuffer(buf []byte) { + b.buf = buf + b.r, b.w = 0, 0 + b.err = nil } -func (b *BufioReader) reset(buf []byte, rd io.Reader) { - *b = BufioReader{ +func (b *ElasticBufReader) reset(buf []byte, rd io.Reader) { + *b = ElasticBufReader{ buf: buf, rd: rd, } } // Buffered returns the number of bytes that can be read from the current buffer. -func (b *BufioReader) Buffered() int { return b.w - b.r } +func (b *ElasticBufReader) Buffered() int { return b.w - b.r } -func (b *BufioReader) Bytes() []byte { +func (b *ElasticBufReader) Bytes() []byte { return b.buf[b.r:b.w] } var errNegativeRead = errors.New("bufio: reader returned negative count from Read") // fill reads a new chunk into the buffer. -func (b *BufioReader) fill() { +func (b *ElasticBufReader) fill() { // Slide existing data to beginning. if b.r > 0 { copy(b.buf, b.buf[b.r:b.w]) @@ -81,13 +88,13 @@ func (b *BufioReader) fill() { b.err = io.ErrNoProgress } -func (b *BufioReader) readErr() error { +func (b *ElasticBufReader) readErr() error { err := b.err b.err = nil return err } -func (b *BufioReader) Read(p []byte) (n int, err error) { +func (b *ElasticBufReader) Read(p []byte) (n int, err error) { n = len(p) if n == 0 { return 0, b.readErr() @@ -125,7 +132,7 @@ func (b *BufioReader) Read(p []byte) (n int, err error) { return n, nil } -func (b *BufioReader) ReadSlice(delim byte) (line []byte, err error) { +func (b *ElasticBufReader) ReadSlice(delim byte) (line []byte, err error) { for { // Search buffer. if i := bytes.IndexByte(b.buf[b.r:b.w], delim); i >= 0 { @@ -153,7 +160,7 @@ func (b *BufioReader) ReadSlice(delim byte) (line []byte, err error) { return } -func (b *BufioReader) ReadLine() (line []byte, err error) { +func (b *ElasticBufReader) ReadLine() (line []byte, err error) { line, err = b.ReadSlice('\n') if len(line) == 0 { if err != nil { @@ -173,7 +180,7 @@ func (b *BufioReader) ReadLine() (line []byte, err error) { return } -func (b *BufioReader) ReadByte() (byte, error) { +func (b *ElasticBufReader) ReadByte() (byte, error) { for b.r == b.w { if b.err != nil { return 0, b.readErr() @@ -185,7 +192,7 @@ func (b *BufioReader) ReadByte() (byte, error) { return c, nil } -func (b *BufioReader) ReadN(n int) ([]byte, error) { +func (b *ElasticBufReader) ReadN(n int) ([]byte, error) { b.grow(n) for b.Buffered() < n { // Pending error? @@ -203,7 +210,7 @@ func (b *BufioReader) ReadN(n int) ([]byte, error) { return buf, nil } -func (b *BufioReader) grow(n int) { +func (b *ElasticBufReader) grow(n int) { // Slide existing data to beginning. if b.r > 0 { copy(b.buf, b.buf[b.r:b.w]) diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 26167eb..9e42b00 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -31,10 +31,10 @@ func (e RedisError) Error() string { return string(e) } type MultiBulkParse func(*Reader, int64) (interface{}, error) type Reader struct { - src *BufioReader + src *ElasticBufReader } -func NewReader(src *BufioReader) *Reader { +func NewReader(src *ElasticBufReader) *Reader { return &Reader{ src: src, } diff --git a/internal/proto/reader_test.go b/internal/proto/reader_test.go index c36e42f..658850a 100644 --- a/internal/proto/reader_test.go +++ b/internal/proto/reader_test.go @@ -12,7 +12,7 @@ import ( ) func newReader(s string) *proto.Reader { - return proto.NewReader(proto.NewBufioReader(strings.NewReader(s))) + return proto.NewReader(proto.NewElasticBufReader(strings.NewReader(s))) } var _ = Describe("Reader", func() { @@ -67,7 +67,7 @@ func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wan for i := 0; i < b.N; i++ { buf.WriteString(reply) } - p := proto.NewReader(proto.NewBufioReader(buf)) + p := proto.NewReader(proto.NewElasticBufReader(buf)) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/internal/proto/write_buffer.go b/internal/proto/write_buffer.go index 51c5480..cb9520b 100644 --- a/internal/proto/write_buffer.go +++ b/internal/proto/write_buffer.go @@ -7,11 +7,11 @@ import ( ) type WriteBuffer struct { - rb *BufioReader + rb *ElasticBufReader buf []byte } -func NewWriteBuffer(rb *BufioReader) *WriteBuffer { +func NewWriteBuffer(rb *ElasticBufReader) *WriteBuffer { return &WriteBuffer{ rb: rb, } diff --git a/internal/proto/write_buffer_test.go b/internal/proto/write_buffer_test.go index dba8be6..b40c65e 100644 --- a/internal/proto/write_buffer_test.go +++ b/internal/proto/write_buffer_test.go @@ -15,7 +15,7 @@ var _ = Describe("WriteBuffer", func() { var buf *proto.WriteBuffer BeforeEach(func() { - buf = proto.NewWriteBuffer(proto.NewBufioReader(strings.NewReader(""))) + buf = proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader(""))) }) It("should reset", func() { @@ -54,7 +54,7 @@ var _ = Describe("WriteBuffer", func() { }) func BenchmarkWriteBuffer_Append(b *testing.B) { - buf := proto.NewWriteBuffer(proto.NewBufioReader(strings.NewReader(""))) + buf := proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader(""))) args := []interface{}{"hello", "world", "foo", "bar"} for i := 0; i < b.N; i++ {