diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 3ab4a73e..24fc0bd8 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -44,27 +44,40 @@ func (r *Reader) Buffered() int { return r.rd.Buffered() } +func (r *Reader) Peek(n int) ([]byte, error) { + return r.rd.Peek(n) +} + func (r *Reader) Reset(rd io.Reader) { r.rd.Reset(rd) } func (r *Reader) ReadLine() ([]byte, error) { - line, isPrefix, err := r.rd.ReadLine() + line, err := r.readLine() if err != nil { return nil, err } - if isPrefix { - return nil, bufio.ErrBufferFull - } - if len(line) == 0 { - return nil, fmt.Errorf("redis: reply is empty") - } if isNilReply(line) { return nil, Nil } return line, nil } +// readLine that returns an error if: +// - there is a pending read error; +// - or line does not end with \r\n. +func (r *Reader) readLine() ([]byte, error) { + b, err := r.rd.ReadSlice('\n') + if err != nil { + return nil, err + } + if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' { + return nil, fmt.Errorf("redis: invalid reply: %q", b) + } + b = b[:len(b)-2] + return b, nil +} + func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { line, err := r.ReadLine() if err != nil { @@ -273,10 +286,12 @@ func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) { } func (r *Reader) buf(n int) []byte { - if d := n - cap(r._buf); d > 0 { - r._buf = append(r._buf, make([]byte, d)...) + if n <= cap(r._buf) { + return r._buf[:n] } - return r._buf[:n] + d := n - cap(r._buf) + r._buf = append(r._buf, make([]byte, d)...) + return r._buf } func isNilReply(b []byte) bool {