Rename BufioReader to ElasticBufReader

This commit is contained in:
Vladimir Mihailenco 2018-08-06 11:54:47 +03:00
parent bf3a84175e
commit 464daeb271
6 changed files with 38 additions and 31 deletions

View File

@ -24,7 +24,7 @@ func NewConn(netConn net.Conn) *Conn {
cn := &Conn{ cn := &Conn{
netConn: netConn, netConn: netConn,
} }
buf := proto.NewBufioReader(netConn) buf := proto.NewElasticBufReader(netConn)
cn.Rd = proto.NewReader(buf) cn.Rd = proto.NewReader(buf)
cn.WB = proto.NewWriteBuffer(buf) cn.WB = proto.NewWriteBuffer(buf)
cn.SetUsedAt(time.Now()) cn.SetUsedAt(time.Now())

View File

@ -8,49 +8,56 @@ import (
const defaultBufSize = 4096 const defaultBufSize = 4096
type BufioReader struct { // ElasticBufReader is like bufio.Reader but instead of returning ErrBufferFull
// it automatically grows the buffer.
type ElasticBufReader struct {
buf []byte buf []byte
rd io.Reader // reader provided by the client rd io.Reader // reader provided by the client
r, w int // buf read and write positions r, w int // buf read and write positions
err error err error
} }
func NewBufioReader(rd io.Reader) *BufioReader { func NewElasticBufReader(rd io.Reader) *ElasticBufReader {
r := new(BufioReader) return &ElasticBufReader{
r.reset(make([]byte, defaultBufSize), rd) buf: make([]byte, defaultBufSize),
return r rd: rd,
}
} }
func (b *BufioReader) Reset(rd io.Reader) { func (b *ElasticBufReader) Reset(rd io.Reader) {
b.reset(b.buf, rd) b.rd = rd
b.r, b.w = 0, 0
b.err = nil
} }
func (b *BufioReader) Buffer() []byte { func (b *ElasticBufReader) Buffer() []byte {
return b.buf return b.buf
} }
func (b *BufioReader) ResetBuffer(buf []byte) { func (b *ElasticBufReader) ResetBuffer(buf []byte) {
b.reset(buf, b.rd) b.buf = buf
b.r, b.w = 0, 0
b.err = nil
} }
func (b *BufioReader) reset(buf []byte, rd io.Reader) { func (b *ElasticBufReader) reset(buf []byte, rd io.Reader) {
*b = BufioReader{ *b = ElasticBufReader{
buf: buf, buf: buf,
rd: rd, rd: rd,
} }
} }
// Buffered returns the number of bytes that can be read from the current buffer. // Buffered returns the number of bytes that can be read from the current buffer.
func (b *BufioReader) Buffered() int { return b.w - b.r } func (b *ElasticBufReader) Buffered() int { return b.w - b.r }
func (b *BufioReader) Bytes() []byte { func (b *ElasticBufReader) Bytes() []byte {
return b.buf[b.r:b.w] return b.buf[b.r:b.w]
} }
var errNegativeRead = errors.New("bufio: reader returned negative count from Read") var errNegativeRead = errors.New("bufio: reader returned negative count from Read")
// fill reads a new chunk into the buffer. // fill reads a new chunk into the buffer.
func (b *BufioReader) fill() { func (b *ElasticBufReader) fill() {
// Slide existing data to beginning. // Slide existing data to beginning.
if b.r > 0 { if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w]) copy(b.buf, b.buf[b.r:b.w])
@ -81,13 +88,13 @@ func (b *BufioReader) fill() {
b.err = io.ErrNoProgress b.err = io.ErrNoProgress
} }
func (b *BufioReader) readErr() error { func (b *ElasticBufReader) readErr() error {
err := b.err err := b.err
b.err = nil b.err = nil
return err return err
} }
func (b *BufioReader) Read(p []byte) (n int, err error) { func (b *ElasticBufReader) Read(p []byte) (n int, err error) {
n = len(p) n = len(p)
if n == 0 { if n == 0 {
return 0, b.readErr() return 0, b.readErr()
@ -125,7 +132,7 @@ func (b *BufioReader) Read(p []byte) (n int, err error) {
return n, nil return n, nil
} }
func (b *BufioReader) ReadSlice(delim byte) (line []byte, err error) { func (b *ElasticBufReader) ReadSlice(delim byte) (line []byte, err error) {
for { for {
// Search buffer. // Search buffer.
if i := bytes.IndexByte(b.buf[b.r:b.w], delim); i >= 0 { if i := bytes.IndexByte(b.buf[b.r:b.w], delim); i >= 0 {
@ -153,7 +160,7 @@ func (b *BufioReader) ReadSlice(delim byte) (line []byte, err error) {
return return
} }
func (b *BufioReader) ReadLine() (line []byte, err error) { func (b *ElasticBufReader) ReadLine() (line []byte, err error) {
line, err = b.ReadSlice('\n') line, err = b.ReadSlice('\n')
if len(line) == 0 { if len(line) == 0 {
if err != nil { if err != nil {
@ -173,7 +180,7 @@ func (b *BufioReader) ReadLine() (line []byte, err error) {
return return
} }
func (b *BufioReader) ReadByte() (byte, error) { func (b *ElasticBufReader) ReadByte() (byte, error) {
for b.r == b.w { for b.r == b.w {
if b.err != nil { if b.err != nil {
return 0, b.readErr() return 0, b.readErr()
@ -185,7 +192,7 @@ func (b *BufioReader) ReadByte() (byte, error) {
return c, nil return c, nil
} }
func (b *BufioReader) ReadN(n int) ([]byte, error) { func (b *ElasticBufReader) ReadN(n int) ([]byte, error) {
b.grow(n) b.grow(n)
for b.Buffered() < n { for b.Buffered() < n {
// Pending error? // Pending error?
@ -203,7 +210,7 @@ func (b *BufioReader) ReadN(n int) ([]byte, error) {
return buf, nil return buf, nil
} }
func (b *BufioReader) grow(n int) { func (b *ElasticBufReader) grow(n int) {
// Slide existing data to beginning. // Slide existing data to beginning.
if b.r > 0 { if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w]) copy(b.buf, b.buf[b.r:b.w])

View File

@ -31,10 +31,10 @@ func (e RedisError) Error() string { return string(e) }
type MultiBulkParse func(*Reader, int64) (interface{}, error) type MultiBulkParse func(*Reader, int64) (interface{}, error)
type Reader struct { type Reader struct {
src *BufioReader src *ElasticBufReader
} }
func NewReader(src *BufioReader) *Reader { func NewReader(src *ElasticBufReader) *Reader {
return &Reader{ return &Reader{
src: src, src: src,
} }

View File

@ -12,7 +12,7 @@ import (
) )
func newReader(s string) *proto.Reader { func newReader(s string) *proto.Reader {
return proto.NewReader(proto.NewBufioReader(strings.NewReader(s))) return proto.NewReader(proto.NewElasticBufReader(strings.NewReader(s)))
} }
var _ = Describe("Reader", func() { var _ = Describe("Reader", func() {
@ -67,7 +67,7 @@ func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wan
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
buf.WriteString(reply) buf.WriteString(reply)
} }
p := proto.NewReader(proto.NewBufioReader(buf)) p := proto.NewReader(proto.NewElasticBufReader(buf))
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {

View File

@ -7,11 +7,11 @@ import (
) )
type WriteBuffer struct { type WriteBuffer struct {
rb *BufioReader rb *ElasticBufReader
buf []byte buf []byte
} }
func NewWriteBuffer(rb *BufioReader) *WriteBuffer { func NewWriteBuffer(rb *ElasticBufReader) *WriteBuffer {
return &WriteBuffer{ return &WriteBuffer{
rb: rb, rb: rb,
} }

View File

@ -15,7 +15,7 @@ var _ = Describe("WriteBuffer", func() {
var buf *proto.WriteBuffer var buf *proto.WriteBuffer
BeforeEach(func() { BeforeEach(func() {
buf = proto.NewWriteBuffer(proto.NewBufioReader(strings.NewReader(""))) buf = proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader("")))
}) })
It("should reset", func() { It("should reset", func() {
@ -54,7 +54,7 @@ var _ = Describe("WriteBuffer", func() {
}) })
func BenchmarkWriteBuffer_Append(b *testing.B) { func BenchmarkWriteBuffer_Append(b *testing.B) {
buf := proto.NewWriteBuffer(proto.NewBufioReader(strings.NewReader(""))) buf := proto.NewWriteBuffer(proto.NewElasticBufReader(strings.NewReader("")))
args := []interface{}{"hello", "world", "foo", "bar"} args := []interface{}{"hello", "world", "foo", "bar"}
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {