Get rid of custom bufio package.

This commit is contained in:
Vladimir Mihailenco 2015-09-03 17:55:31 +03:00
parent dace69da84
commit 58cb170ac0
11 changed files with 107 additions and 150 deletions

View File

@ -9,7 +9,6 @@ go:
- tip - tip
install: install:
- go get gopkg.in/bufio.v1
- go get gopkg.in/bsm/ratelimit.v1 - go get gopkg.in/bsm/ratelimit.v1
- go get github.com/onsi/ginkgo - go get github.com/onsi/ginkgo
- go get github.com/onsi/gomega - go get github.com/onsi/gomega

View File

@ -99,7 +99,7 @@ func (pipe *ClusterPipeline) execClusterCmds(
var firstCmdErr error var firstCmdErr error
for i, cmd := range cmds { for i, cmd := range cmds {
err := cmd.parseReply(cn.rd) err := cmd.parseReply(cn)
if err == nil { if err == nil {
continue continue
} }

View File

@ -6,8 +6,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
"gopkg.in/bufio.v1"
) )
var ( var (
@ -30,7 +28,7 @@ var (
type Cmder interface { type Cmder interface {
args() []interface{} args() []interface{}
parseReply(*bufio.Reader) error parseReply(*conn) error
setErr(error) setErr(error)
reset() reset()
@ -154,8 +152,8 @@ func (cmd *Cmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *Cmd) parseReply(rd *bufio.Reader) error { func (cmd *Cmd) parseReply(cn *conn) error {
cmd.val, cmd.err = parseReply(rd, parseSlice) cmd.val, cmd.err = parseReply(cn, parseSlice)
// Convert to string to preserve old behaviour. // Convert to string to preserve old behaviour.
// TODO: remove in v4 // TODO: remove in v4
if v, ok := cmd.val.([]byte); ok { if v, ok := cmd.val.([]byte); ok {
@ -193,8 +191,8 @@ func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *SliceCmd) parseReply(rd *bufio.Reader) error { func (cmd *SliceCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseSlice) v, err := parseReply(cn, parseSlice)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -236,8 +234,8 @@ func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StatusCmd) parseReply(rd *bufio.Reader) error { func (cmd *StatusCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, nil) v, err := parseReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -275,8 +273,8 @@ func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *IntCmd) parseReply(rd *bufio.Reader) error { func (cmd *IntCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, nil) v, err := parseReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -318,8 +316,8 @@ func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *DurationCmd) parseReply(rd *bufio.Reader) error { func (cmd *DurationCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, nil) v, err := parseReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -359,8 +357,8 @@ func (cmd *BoolCmd) String() string {
var ok = []byte("OK") var ok = []byte("OK")
func (cmd *BoolCmd) parseReply(rd *bufio.Reader) error { func (cmd *BoolCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, nil) v, err := parseReply(cn, nil)
// `SET key value NX` returns nil when key already exists, which // `SET key value NX` returns nil when key already exists, which
// is inconsistent with `SETNX key value`. // is inconsistent with `SETNX key value`.
// TODO: is this okay? // TODO: is this okay?
@ -445,15 +443,13 @@ func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StringCmd) parseReply(rd *bufio.Reader) error { func (cmd *StringCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, nil) v, err := parseReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
} }
b := v.([]byte) cmd.val = cn.copyBuf(v.([]byte))
cmd.val = make([]byte, len(b))
copy(cmd.val, b)
return nil return nil
} }
@ -486,8 +482,8 @@ func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *FloatCmd) parseReply(rd *bufio.Reader) error { func (cmd *FloatCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, nil) v, err := parseReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -526,8 +522,8 @@ func (cmd *StringSliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StringSliceCmd) parseReply(rd *bufio.Reader) error { func (cmd *StringSliceCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseStringSlice) v, err := parseReply(cn, parseStringSlice)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -565,8 +561,8 @@ func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *BoolSliceCmd) parseReply(rd *bufio.Reader) error { func (cmd *BoolSliceCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseBoolSlice) v, err := parseReply(cn, parseBoolSlice)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -604,8 +600,8 @@ func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StringStringMapCmd) parseReply(rd *bufio.Reader) error { func (cmd *StringStringMapCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseStringStringMap) v, err := parseReply(cn, parseStringStringMap)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -643,8 +639,8 @@ func (cmd *StringIntMapCmd) reset() {
cmd.err = nil cmd.err = nil
} }
func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error { func (cmd *StringIntMapCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseStringIntMap) v, err := parseReply(cn, parseStringIntMap)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -682,8 +678,8 @@ func (cmd *ZSliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *ZSliceCmd) parseReply(rd *bufio.Reader) error { func (cmd *ZSliceCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseZSlice) v, err := parseReply(cn, parseZSlice)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -723,8 +719,8 @@ func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.keys) return cmdString(cmd, cmd.keys)
} }
func (cmd *ScanCmd) parseReply(rd *bufio.Reader) error { func (cmd *ScanCmd) parseReply(cn *conn) error {
vi, err := parseReply(rd, parseSlice) vi, err := parseReply(cn, parseSlice)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return cmd.err return cmd.err
@ -778,8 +774,8 @@ func (cmd *ClusterSlotCmd) reset() {
cmd.err = nil cmd.err = nil
} }
func (cmd *ClusterSlotCmd) parseReply(rd *bufio.Reader) error { func (cmd *ClusterSlotCmd) parseReply(cn *conn) error {
v, err := parseReply(rd, parseClusterSlotInfoSlice) v, err := parseReply(cn, parseClusterSlotInfoSlice)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err

20
conn.go
View File

@ -1,12 +1,13 @@
package redis package redis
import ( import (
"bufio"
"net" "net"
"time" "time"
"gopkg.in/bufio.v1"
) )
const defaultBufSize = 4096
var ( var (
zeroTime = time.Time{} zeroTime = time.Time{}
) )
@ -30,7 +31,7 @@ func newConnDialer(opt *Options) func() (*conn, error) {
} }
cn := &conn{ cn := &conn{
netcn: netcn, netcn: netcn,
buf: make([]byte, 0, 64), buf: make([]byte, defaultBufSize),
} }
cn.rd = bufio.NewReader(cn) cn.rd = bufio.NewReader(cn)
return cn, cn.init(opt) return cn, cn.init(opt)
@ -102,3 +103,16 @@ func (cn *conn) RemoteAddr() net.Addr {
func (cn *conn) Close() error { func (cn *conn) Close() error {
return cn.netcn.Close() return cn.netcn.Close()
} }
func isSameSlice(s1, s2 []byte) bool {
return len(s1) > 0 && len(s2) > 0 && &s1[0] == &s2[0]
}
func (cn *conn) copyBuf(b []byte) []byte {
if isSameSlice(b, cn.buf) {
new := make([]byte, len(b))
copy(new, b)
return new
}
return b
}

View File

@ -115,14 +115,14 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
// Parse queued replies. // Parse queued replies.
for i := 0; i < cmdsLen; i++ { for i := 0; i < cmdsLen; i++ {
if err := statusCmd.parseReply(cn.rd); err != nil { if err := statusCmd.parseReply(cn); err != nil {
setCmdsErr(cmds[1:len(cmds)-1], err) setCmdsErr(cmds[1:len(cmds)-1], err)
return err return err
} }
} }
// Parse number of replies. // Parse number of replies.
line, err := readLine(cn.rd) line, err := readLine(cn)
if err != nil { if err != nil {
setCmdsErr(cmds[1:len(cmds)-1], err) setCmdsErr(cmds[1:len(cmds)-1], err)
return err return err
@ -143,7 +143,7 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
// Loop starts from 1 to omit MULTI cmd. // Loop starts from 1 to omit MULTI cmd.
for i := 1; i < cmdsLen; i++ { for i := 1; i < cmdsLen; i++ {
cmd := cmds[i] cmd := cmds[i]
if err := cmd.parseReply(cn.rd); err != nil { if err := cmd.parseReply(cn); err != nil {
if firstCmdErr == nil { if firstCmdErr == nil {
firstCmdErr = err firstCmdErr = err
} }

133
parser.go
View File

@ -3,13 +3,12 @@ package redis
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"strconv" "strconv"
"gopkg.in/bufio.v1"
) )
type multiBulkParser func(rd *bufio.Reader, n int64) (interface{}, error) type multiBulkParser func(cn *conn, n int64) (interface{}, error)
var ( var (
errReaderTooSmall = errors.New("redis: reader is too small") errReaderTooSmall = errors.New("redis: reader is too small")
@ -216,8 +215,8 @@ func scan(b []byte, val interface{}) error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func readLine(rd *bufio.Reader) ([]byte, error) { func readLine(cn *conn) ([]byte, error) {
line, isPrefix, err := rd.ReadLine() line, isPrefix, err := cn.rd.ReadLine()
if err != nil { if err != nil {
return line, err return line, err
} }
@ -227,74 +226,21 @@ func readLine(rd *bufio.Reader) ([]byte, error) {
return line, nil return line, nil
} }
func readN(rd *bufio.Reader, n int) ([]byte, error) { func readN(cn *conn, n int) ([]byte, error) {
b, err := rd.ReadN(n) var b []byte
if err == bufio.ErrBufferFull { if cap(cn.buf) < n {
tmp := make([]byte, n) b = make([]byte, n)
r := copy(tmp, b) } else {
b = tmp b = cn.buf[:n]
for {
nn, err := rd.Read(b[r:])
r += nn
if r >= n {
// Ignore error if we read enough.
break
}
if err != nil {
return nil, err
}
}
} else if err != nil {
return nil, err
} }
return b, nil _, err := io.ReadFull(cn.rd, b)
return b, err
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func parseReq(rd *bufio.Reader) ([]string, error) { func parseReply(cn *conn, p multiBulkParser) (interface{}, error) {
line, err := readLine(rd) line, err := readLine(cn)
if err != nil {
return nil, err
}
if line[0] != '*' {
return []string{string(line)}, nil
}
numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return nil, err
}
args := make([]string, 0, numReplies)
for i := int64(0); i < numReplies; i++ {
line, err = readLine(rd)
if err != nil {
return nil, err
}
if line[0] != '$' {
return nil, fmt.Errorf("redis: expected '$', but got %q", line)
}
argLen, err := strconv.ParseInt(string(line[1:]), 10, 32)
if err != nil {
return nil, err
}
arg, err := readN(rd, int(argLen)+2)
if err != nil {
return nil, err
}
args = append(args, string(arg[:argLen]))
}
return args, nil
}
//------------------------------------------------------------------------------
func parseReply(rd *bufio.Reader, p multiBulkParser) (interface{}, error) {
line, err := readLine(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -315,12 +261,12 @@ func parseReply(rd *bufio.Reader, p multiBulkParser) (interface{}, error) {
return nil, Nil return nil, Nil
} }
replyLen, err := strconv.Atoi(string(line[1:])) replyLen, err := strconv.Atoi(bytesToString(line[1:]))
if err != nil { if err != nil {
return nil, err return nil, err
} }
b, err := readN(rd, replyLen+2) b, err := readN(cn, replyLen+2)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -335,15 +281,15 @@ func parseReply(rd *bufio.Reader, p multiBulkParser) (interface{}, error) {
return nil, err return nil, err
} }
return p(rd, repliesNum) return p(cn, repliesNum)
} }
return nil, fmt.Errorf("redis: can't parse %q", line) return nil, fmt.Errorf("redis: can't parse %q", line)
} }
func parseSlice(rd *bufio.Reader, n int64) (interface{}, error) { func parseSlice(cn *conn, n int64) (interface{}, error) {
vals := make([]interface{}, 0, n) vals := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
v, err := parseReply(rd, parseSlice) v, err := parseReply(cn, parseSlice)
if err == Nil { if err == Nil {
vals = append(vals, nil) vals = append(vals, nil)
} else if err != nil { } else if err != nil {
@ -360,10 +306,10 @@ func parseSlice(rd *bufio.Reader, n int64) (interface{}, error) {
return vals, nil return vals, nil
} }
func parseStringSlice(rd *bufio.Reader, n int64) (interface{}, error) { func parseStringSlice(cn *conn, n int64) (interface{}, error) {
vals := make([]string, 0, n) vals := make([]string, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
viface, err := parseReply(rd, nil) viface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -376,10 +322,10 @@ func parseStringSlice(rd *bufio.Reader, n int64) (interface{}, error) {
return vals, nil return vals, nil
} }
func parseBoolSlice(rd *bufio.Reader, n int64) (interface{}, error) { func parseBoolSlice(cn *conn, n int64) (interface{}, error) {
vals := make([]bool, 0, n) vals := make([]bool, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
viface, err := parseReply(rd, nil) viface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -392,36 +338,37 @@ func parseBoolSlice(rd *bufio.Reader, n int64) (interface{}, error) {
return vals, nil return vals, nil
} }
func parseStringStringMap(rd *bufio.Reader, n int64) (interface{}, error) { func parseStringStringMap(cn *conn, n int64) (interface{}, error) {
m := make(map[string]string, n/2) m := make(map[string]string, n/2)
for i := int64(0); i < n; i += 2 { for i := int64(0); i < n; i += 2 {
keyiface, err := parseReply(rd, nil) keyIface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
key, ok := keyiface.([]byte) keyBytes, ok := keyIface.([]byte)
if !ok { if !ok {
return nil, fmt.Errorf("got %T, expected string", keyiface) return nil, fmt.Errorf("got %T, expected []byte", keyIface)
} }
key := string(keyBytes)
valueiface, err := parseReply(rd, nil) valueIface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
value, ok := valueiface.([]byte) valueBytes, ok := valueIface.([]byte)
if !ok { if !ok {
return nil, fmt.Errorf("got %T, expected string", valueiface) return nil, fmt.Errorf("got %T, expected []byte", valueIface)
} }
m[string(key)] = string(value) m[key] = string(valueBytes)
} }
return m, nil return m, nil
} }
func parseStringIntMap(rd *bufio.Reader, n int64) (interface{}, error) { func parseStringIntMap(cn *conn, n int64) (interface{}, error) {
m := make(map[string]int64, n/2) m := make(map[string]int64, n/2)
for i := int64(0); i < n; i += 2 { for i := int64(0); i < n; i += 2 {
keyiface, err := parseReply(rd, nil) keyiface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -430,7 +377,7 @@ func parseStringIntMap(rd *bufio.Reader, n int64) (interface{}, error) {
return nil, fmt.Errorf("got %T, expected string", keyiface) return nil, fmt.Errorf("got %T, expected string", keyiface)
} }
valueiface, err := parseReply(rd, nil) valueiface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -449,12 +396,12 @@ func parseStringIntMap(rd *bufio.Reader, n int64) (interface{}, error) {
return m, nil return m, nil
} }
func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) { func parseZSlice(cn *conn, n int64) (interface{}, error) {
zz := make([]Z, n/2) zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 { for i := int64(0); i < n; i += 2 {
z := &zz[i/2] z := &zz[i/2]
memberiface, err := parseReply(rd, nil) memberiface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -464,7 +411,7 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) {
} }
z.Member = string(member) z.Member = string(member)
scoreiface, err := parseReply(rd, nil) scoreiface, err := parseReply(cn, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -481,10 +428,10 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) {
return zz, nil return zz, nil
} }
func parseClusterSlotInfoSlice(rd *bufio.Reader, n int64) (interface{}, error) { func parseClusterSlotInfoSlice(cn *conn, n int64) (interface{}, error) {
infos := make([]ClusterSlotInfo, 0, n) infos := make([]ClusterSlotInfo, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
viface, err := parseReply(rd, parseSlice) viface, err := parseReply(cn, parseSlice)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,9 +1,9 @@
package redis package redis
import ( import (
"bufio"
"bytes"
"testing" "testing"
"gopkg.in/bufio.v1"
) )
func BenchmarkParseReplyStatus(b *testing.B) { func BenchmarkParseReplyStatus(b *testing.B) {
@ -27,20 +27,21 @@ func BenchmarkParseReplySlice(b *testing.B) {
} }
func benchmarkParseReply(b *testing.B, reply string, p multiBulkParser, wanterr bool) { func benchmarkParseReply(b *testing.B, reply string, p multiBulkParser, wanterr bool) {
b.StopTimer() buf := &bytes.Buffer{}
buf := &bufio.Buffer{}
rd := bufio.NewReader(buf)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
buf.WriteString(reply) buf.WriteString(reply)
} }
cn := &conn{
rd: bufio.NewReader(buf),
buf: make([]byte, 0, defaultBufSize),
}
b.StartTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := parseReply(rd, p) _, err := parseReply(cn, p)
if !wanterr && err != nil { if !wanterr && err != nil {
panic(err) b.Fatal(err)
} }
} }
} }

View File

@ -97,7 +97,7 @@ func execCmds(cn *conn, cmds []Cmder) ([]Cmder, error) {
var firstCmdErr error var firstCmdErr error
var failedCmds []Cmder var failedCmds []Cmder
for _, cmd := range cmds { for _, cmd := range cmds {
err := cmd.parseReply(cn.rd) err := cmd.parseReply(cn)
if err == nil { if err == nil {
continue continue
} }

View File

@ -243,7 +243,7 @@ func (p *connPool) Get() (*conn, error) {
func (p *connPool) Put(cn *conn) error { func (p *connPool) Put(cn *conn) error {
if cn.rd.Buffered() != 0 { if cn.rd.Buffered() != 0 {
b, _ := cn.rd.ReadN(cn.rd.Buffered()) b, _ := cn.rd.Peek(cn.rd.Buffered())
log.Printf("redis: connection has unread data: %q", b) log.Printf("redis: connection has unread data: %q", b)
return p.Remove(cn) return p.Remove(cn)
} }

View File

@ -146,7 +146,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
cn.ReadTimeout = timeout cn.ReadTimeout = timeout
cmd := NewSliceCmd() cmd := NewSliceCmd()
if err := cmd.parseReply(cn.rd); err != nil { if err := cmd.parseReply(cn); err != nil {
return nil, err return nil, err
} }
return newMessage(cmd.Val()) return newMessage(cmd.Val())

View File

@ -69,7 +69,7 @@ func (c *baseClient) process(cmd Cmder) {
return return
} }
err = cmd.parseReply(cn.rd) err = cmd.parseReply(cn)
c.putConn(cn, err) c.putConn(cn, err)
if shouldRetry(err) { if shouldRetry(err) {
continue continue