diff --git a/.travis.yml b/.travis.yml index 8a951ff..1dea73b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,6 @@ go: - tip install: - - go get gopkg.in/bufio.v1 - go get gopkg.in/bsm/ratelimit.v1 - go get github.com/onsi/ginkgo - go get github.com/onsi/gomega diff --git a/cluster_pipeline.go b/cluster_pipeline.go index 2e11940..01f06e7 100644 --- a/cluster_pipeline.go +++ b/cluster_pipeline.go @@ -99,7 +99,7 @@ func (pipe *ClusterPipeline) execClusterCmds( var firstCmdErr error for i, cmd := range cmds { - err := cmd.parseReply(cn.rd) + err := cmd.parseReply(cn) if err == nil { continue } diff --git a/command.go b/command.go index 298c68d..6c80906 100644 --- a/command.go +++ b/command.go @@ -6,8 +6,6 @@ import ( "strconv" "strings" "time" - - "gopkg.in/bufio.v1" ) var ( @@ -30,7 +28,7 @@ var ( type Cmder interface { args() []interface{} - parseReply(*bufio.Reader) error + parseReply(*conn) error setErr(error) reset() @@ -154,8 +152,8 @@ func (cmd *Cmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *Cmd) parseReply(rd *bufio.Reader) error { - cmd.val, cmd.err = parseReply(rd, parseSlice) +func (cmd *Cmd) parseReply(cn *conn) error { + cmd.val, cmd.err = parseReply(cn, parseSlice) // Convert to string to preserve old behaviour. // TODO: remove in v4 if v, ok := cmd.val.([]byte); ok { @@ -193,8 +191,8 @@ func (cmd *SliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SliceCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseSlice) +func (cmd *SliceCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseSlice) if err != nil { cmd.err = err return err @@ -236,8 +234,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, nil) +func (cmd *StatusCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, nil) if err != nil { cmd.err = err return err @@ -275,8 +273,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, nil) +func (cmd *IntCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, nil) if err != nil { cmd.err = err return err @@ -318,8 +316,8 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, nil) +func (cmd *DurationCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, nil) if err != nil { cmd.err = err return err @@ -359,8 +357,8 @@ func (cmd *BoolCmd) String() string { var ok = []byte("OK") -func (cmd *BoolCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, nil) +func (cmd *BoolCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, nil) // `SET key value NX` returns nil when key already exists, which // is inconsistent with `SETNX key value`. // TODO: is this okay? @@ -445,15 +443,13 @@ func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, nil) +func (cmd *StringCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, nil) if err != nil { cmd.err = err return err } - b := v.([]byte) - cmd.val = make([]byte, len(b)) - copy(cmd.val, b) + cmd.val = cn.copyBuf(v.([]byte)) return nil } @@ -486,8 +482,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, nil) +func (cmd *FloatCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, nil) if err != nil { cmd.err = err return err @@ -526,8 +522,8 @@ func (cmd *StringSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringSliceCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseStringSlice) +func (cmd *StringSliceCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseStringSlice) if err != nil { cmd.err = err return err @@ -565,8 +561,8 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseBoolSlice) +func (cmd *BoolSliceCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseBoolSlice) if err != nil { cmd.err = err return err @@ -604,8 +600,8 @@ func (cmd *StringStringMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStringMapCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseStringStringMap) +func (cmd *StringStringMapCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseStringStringMap) if err != nil { cmd.err = err return err @@ -643,8 +639,8 @@ func (cmd *StringIntMapCmd) reset() { cmd.err = nil } -func (cmd *StringIntMapCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseStringIntMap) +func (cmd *StringIntMapCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseStringIntMap) if err != nil { cmd.err = err return err @@ -682,8 +678,8 @@ func (cmd *ZSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseZSlice) +func (cmd *ZSliceCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseZSlice) if err != nil { cmd.err = err return err @@ -723,8 +719,8 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.keys) } -func (cmd *ScanCmd) parseReply(rd *bufio.Reader) error { - vi, err := parseReply(rd, parseSlice) +func (cmd *ScanCmd) parseReply(cn *conn) error { + vi, err := parseReply(cn, parseSlice) if err != nil { cmd.err = err return cmd.err @@ -778,8 +774,8 @@ func (cmd *ClusterSlotCmd) reset() { cmd.err = nil } -func (cmd *ClusterSlotCmd) parseReply(rd *bufio.Reader) error { - v, err := parseReply(rd, parseClusterSlotInfoSlice) +func (cmd *ClusterSlotCmd) parseReply(cn *conn) error { + v, err := parseReply(cn, parseClusterSlotInfoSlice) if err != nil { cmd.err = err return err diff --git a/conn.go b/conn.go index 9dc2ede..36ba99a 100644 --- a/conn.go +++ b/conn.go @@ -1,12 +1,13 @@ package redis import ( + "bufio" "net" "time" - - "gopkg.in/bufio.v1" ) +const defaultBufSize = 4096 + var ( zeroTime = time.Time{} ) @@ -30,7 +31,7 @@ func newConnDialer(opt *Options) func() (*conn, error) { } cn := &conn{ netcn: netcn, - buf: make([]byte, 0, 64), + buf: make([]byte, defaultBufSize), } cn.rd = bufio.NewReader(cn) return cn, cn.init(opt) @@ -102,3 +103,16 @@ func (cn *conn) RemoteAddr() net.Addr { func (cn *conn) Close() error { return cn.netcn.Close() } + +func isSameSlice(s1, s2 []byte) bool { + return len(s1) > 0 && len(s2) > 0 && &s1[0] == &s2[0] +} + +func (cn *conn) copyBuf(b []byte) []byte { + if isSameSlice(b, cn.buf) { + new := make([]byte, len(b)) + copy(new, b) + return new + } + return b +} diff --git a/multi.go b/multi.go index 63ecdd5..1cc419c 100644 --- a/multi.go +++ b/multi.go @@ -115,14 +115,14 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error { // Parse queued replies. for i := 0; i < cmdsLen; i++ { - if err := statusCmd.parseReply(cn.rd); err != nil { + if err := statusCmd.parseReply(cn); err != nil { setCmdsErr(cmds[1:len(cmds)-1], err) return err } } // Parse number of replies. - line, err := readLine(cn.rd) + line, err := readLine(cn) if err != nil { setCmdsErr(cmds[1:len(cmds)-1], err) return err @@ -143,7 +143,7 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error { // Loop starts from 1 to omit MULTI cmd. for i := 1; i < cmdsLen; i++ { cmd := cmds[i] - if err := cmd.parseReply(cn.rd); err != nil { + if err := cmd.parseReply(cn); err != nil { if firstCmdErr == nil { firstCmdErr = err } diff --git a/parser.go b/parser.go index 32646ff..5b6e073 100644 --- a/parser.go +++ b/parser.go @@ -3,13 +3,12 @@ package redis import ( "errors" "fmt" + "io" "net" "strconv" - - "gopkg.in/bufio.v1" ) -type multiBulkParser func(rd *bufio.Reader, n int64) (interface{}, error) +type multiBulkParser func(cn *conn, n int64) (interface{}, error) var ( errReaderTooSmall = errors.New("redis: reader is too small") @@ -216,8 +215,8 @@ func scan(b []byte, val interface{}) error { //------------------------------------------------------------------------------ -func readLine(rd *bufio.Reader) ([]byte, error) { - line, isPrefix, err := rd.ReadLine() +func readLine(cn *conn) ([]byte, error) { + line, isPrefix, err := cn.rd.ReadLine() if err != nil { return line, err } @@ -227,74 +226,21 @@ func readLine(rd *bufio.Reader) ([]byte, error) { return line, nil } -func readN(rd *bufio.Reader, n int) ([]byte, error) { - b, err := rd.ReadN(n) - if err == bufio.ErrBufferFull { - tmp := make([]byte, n) - r := copy(tmp, b) - b = tmp - - for { - nn, err := rd.Read(b[r:]) - r += nn - if r >= n { - // Ignore error if we read enough. - break - } - if err != nil { - return nil, err - } - } - } else if err != nil { - return nil, err +func readN(cn *conn, n int) ([]byte, error) { + var b []byte + if cap(cn.buf) < n { + b = make([]byte, n) + } else { + b = cn.buf[:n] } - return b, nil + _, err := io.ReadFull(cn.rd, b) + return b, err } //------------------------------------------------------------------------------ -func parseReq(rd *bufio.Reader) ([]string, error) { - line, err := readLine(rd) - if err != nil { - return nil, err - } - - if line[0] != '*' { - return []string{string(line)}, nil - } - numReplies, err := strconv.ParseInt(string(line[1:]), 10, 64) - if err != nil { - return nil, err - } - - args := make([]string, 0, numReplies) - for i := int64(0); i < numReplies; i++ { - line, err = readLine(rd) - if err != nil { - return nil, err - } - if line[0] != '$' { - return nil, fmt.Errorf("redis: expected '$', but got %q", line) - } - - argLen, err := strconv.ParseInt(string(line[1:]), 10, 32) - if err != nil { - return nil, err - } - - arg, err := readN(rd, int(argLen)+2) - if err != nil { - return nil, err - } - args = append(args, string(arg[:argLen])) - } - return args, nil -} - -//------------------------------------------------------------------------------ - -func parseReply(rd *bufio.Reader, p multiBulkParser) (interface{}, error) { - line, err := readLine(rd) +func parseReply(cn *conn, p multiBulkParser) (interface{}, error) { + line, err := readLine(cn) if err != nil { return nil, err } @@ -315,12 +261,12 @@ func parseReply(rd *bufio.Reader, p multiBulkParser) (interface{}, error) { return nil, Nil } - replyLen, err := strconv.Atoi(string(line[1:])) + replyLen, err := strconv.Atoi(bytesToString(line[1:])) if err != nil { return nil, err } - b, err := readN(rd, replyLen+2) + b, err := readN(cn, replyLen+2) if err != nil { return nil, err } @@ -335,15 +281,15 @@ func parseReply(rd *bufio.Reader, p multiBulkParser) (interface{}, error) { return nil, err } - return p(rd, repliesNum) + return p(cn, repliesNum) } return nil, fmt.Errorf("redis: can't parse %q", line) } -func parseSlice(rd *bufio.Reader, n int64) (interface{}, error) { +func parseSlice(cn *conn, n int64) (interface{}, error) { vals := make([]interface{}, 0, n) for i := int64(0); i < n; i++ { - v, err := parseReply(rd, parseSlice) + v, err := parseReply(cn, parseSlice) if err == Nil { vals = append(vals, nil) } else if err != nil { @@ -360,10 +306,10 @@ func parseSlice(rd *bufio.Reader, n int64) (interface{}, error) { return vals, nil } -func parseStringSlice(rd *bufio.Reader, n int64) (interface{}, error) { +func parseStringSlice(cn *conn, n int64) (interface{}, error) { vals := make([]string, 0, n) for i := int64(0); i < n; i++ { - viface, err := parseReply(rd, nil) + viface, err := parseReply(cn, nil) if err != nil { return nil, err } @@ -376,10 +322,10 @@ func parseStringSlice(rd *bufio.Reader, n int64) (interface{}, error) { return vals, nil } -func parseBoolSlice(rd *bufio.Reader, n int64) (interface{}, error) { +func parseBoolSlice(cn *conn, n int64) (interface{}, error) { vals := make([]bool, 0, n) for i := int64(0); i < n; i++ { - viface, err := parseReply(rd, nil) + viface, err := parseReply(cn, nil) if err != nil { return nil, err } @@ -392,36 +338,37 @@ func parseBoolSlice(rd *bufio.Reader, n int64) (interface{}, error) { return vals, nil } -func parseStringStringMap(rd *bufio.Reader, n int64) (interface{}, error) { +func parseStringStringMap(cn *conn, n int64) (interface{}, error) { m := make(map[string]string, n/2) for i := int64(0); i < n; i += 2 { - keyiface, err := parseReply(rd, nil) + keyIface, err := parseReply(cn, nil) if err != nil { return nil, err } - key, ok := keyiface.([]byte) + keyBytes, ok := keyIface.([]byte) if !ok { - return nil, fmt.Errorf("got %T, expected string", keyiface) + return nil, fmt.Errorf("got %T, expected []byte", keyIface) } + key := string(keyBytes) - valueiface, err := parseReply(rd, nil) + valueIface, err := parseReply(cn, nil) if err != nil { return nil, err } - value, ok := valueiface.([]byte) + valueBytes, ok := valueIface.([]byte) if !ok { - return nil, fmt.Errorf("got %T, expected string", valueiface) + return nil, fmt.Errorf("got %T, expected []byte", valueIface) } - m[string(key)] = string(value) + m[key] = string(valueBytes) } return m, nil } -func parseStringIntMap(rd *bufio.Reader, n int64) (interface{}, error) { +func parseStringIntMap(cn *conn, n int64) (interface{}, error) { m := make(map[string]int64, n/2) for i := int64(0); i < n; i += 2 { - keyiface, err := parseReply(rd, nil) + keyiface, err := parseReply(cn, nil) if err != nil { return nil, err } @@ -430,7 +377,7 @@ func parseStringIntMap(rd *bufio.Reader, n int64) (interface{}, error) { return nil, fmt.Errorf("got %T, expected string", keyiface) } - valueiface, err := parseReply(rd, nil) + valueiface, err := parseReply(cn, nil) if err != nil { return nil, err } @@ -449,12 +396,12 @@ func parseStringIntMap(rd *bufio.Reader, n int64) (interface{}, error) { return m, nil } -func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) { +func parseZSlice(cn *conn, n int64) (interface{}, error) { zz := make([]Z, n/2) for i := int64(0); i < n; i += 2 { z := &zz[i/2] - memberiface, err := parseReply(rd, nil) + memberiface, err := parseReply(cn, nil) if err != nil { return nil, err } @@ -464,7 +411,7 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) { } z.Member = string(member) - scoreiface, err := parseReply(rd, nil) + scoreiface, err := parseReply(cn, nil) if err != nil { return nil, err } @@ -481,10 +428,10 @@ func parseZSlice(rd *bufio.Reader, n int64) (interface{}, error) { return zz, nil } -func parseClusterSlotInfoSlice(rd *bufio.Reader, n int64) (interface{}, error) { +func parseClusterSlotInfoSlice(cn *conn, n int64) (interface{}, error) { infos := make([]ClusterSlotInfo, 0, n) for i := int64(0); i < n; i++ { - viface, err := parseReply(rd, parseSlice) + viface, err := parseReply(cn, parseSlice) if err != nil { return nil, err } diff --git a/parser_test.go b/parser_test.go index b71305a..10403f6 100644 --- a/parser_test.go +++ b/parser_test.go @@ -1,9 +1,9 @@ package redis import ( + "bufio" + "bytes" "testing" - - "gopkg.in/bufio.v1" ) func BenchmarkParseReplyStatus(b *testing.B) { @@ -27,20 +27,21 @@ func BenchmarkParseReplySlice(b *testing.B) { } func benchmarkParseReply(b *testing.B, reply string, p multiBulkParser, wanterr bool) { - b.StopTimer() - - buf := &bufio.Buffer{} - rd := bufio.NewReader(buf) + buf := &bytes.Buffer{} for i := 0; i < b.N; i++ { buf.WriteString(reply) } + cn := &conn{ + rd: bufio.NewReader(buf), + buf: make([]byte, 0, defaultBufSize), + } - b.StartTimer() + b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := parseReply(rd, p) + _, err := parseReply(cn, p) if !wanterr && err != nil { - panic(err) + b.Fatal(err) } } } diff --git a/pipeline.go b/pipeline.go index 8981cb5..6fb1db1 100644 --- a/pipeline.go +++ b/pipeline.go @@ -97,7 +97,7 @@ func execCmds(cn *conn, cmds []Cmder) ([]Cmder, error) { var firstCmdErr error var failedCmds []Cmder for _, cmd := range cmds { - err := cmd.parseReply(cn.rd) + err := cmd.parseReply(cn) if err == nil { continue } diff --git a/pool.go b/pool.go index 71ac456..f52eb6f 100644 --- a/pool.go +++ b/pool.go @@ -243,7 +243,7 @@ func (p *connPool) Get() (*conn, error) { func (p *connPool) Put(cn *conn) error { if cn.rd.Buffered() != 0 { - b, _ := cn.rd.ReadN(cn.rd.Buffered()) + b, _ := cn.rd.Peek(cn.rd.Buffered()) log.Printf("redis: connection has unread data: %q", b) return p.Remove(cn) } diff --git a/pubsub.go b/pubsub.go index 1f4f5b6..be36caa 100644 --- a/pubsub.go +++ b/pubsub.go @@ -146,7 +146,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { cn.ReadTimeout = timeout cmd := NewSliceCmd() - if err := cmd.parseReply(cn.rd); err != nil { + if err := cmd.parseReply(cn); err != nil { return nil, err } return newMessage(cmd.Val()) diff --git a/redis.go b/redis.go index f77c663..1504e6c 100644 --- a/redis.go +++ b/redis.go @@ -69,7 +69,7 @@ func (c *baseClient) process(cmd Cmder) { return } - err = cmd.parseReply(cn.rd) + err = cmd.parseReply(cn) c.putConn(cn, err) if shouldRetry(err) { continue