From 7c26d1ceb6eb2e8d3e6a65f64f51515b8904db18 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Wed, 15 Aug 2018 09:38:58 +0300 Subject: [PATCH] Pass cn.Rd where possible --- cluster.go | 19 ++--- command.go | 148 +++++++++++++++++----------------- internal/pool/conn.go | 2 +- internal/proto/reader.go | 42 +++++----- internal/proto/reader_test.go | 4 +- pubsub.go | 2 +- redis.go | 25 +++--- 7 files changed, 123 insertions(+), 119 deletions(-) diff --git a/cluster.go b/cluster.go index 9fb1c4e..12ff1d9 100644 --- a/cluster.go +++ b/cluster.go @@ -1345,14 +1345,14 @@ func (c *ClusterClient) pipelineProcessCmds( // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - return c.pipelineReadCmds(cn, cmds, failedCmds) + return c.pipelineReadCmds(cn.Rd, cmds, failedCmds) } func (c *ClusterClient) pipelineReadCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err == nil { continue } @@ -1486,25 +1486,26 @@ func (c *ClusterClient) txPipelineProcessCmds( // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + err := c.txPipelineReadQueued(cn.Rd, cmds, failedCmds) + if err != nil { setCmdsErr(cmds, err) return err } - return pipelineReadCmds(cn, cmds) + return pipelineReadCmds(cn.Rd, cmds) } func (c *ClusterClient) txPipelineReadQueued( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + if err := statusCmd.readReply(rd); err != nil { return err } for _, cmd := range cmds { - err := statusCmd.readReply(cn) + err := statusCmd.readReply(rd) if err == nil { continue } @@ -1517,7 +1518,7 @@ func (c *ClusterClient) txPipelineReadQueued( } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr diff --git a/command.go b/command.go index 14494cd..522d6bf 100644 --- a/command.go +++ b/command.go @@ -19,7 +19,7 @@ type Cmder interface { Args() []interface{} stringArg(int) string - readReply(*pool.Conn) error + readReply(rd proto.Reader) error setErr(error) readTimeout() *time.Duration @@ -239,8 +239,8 @@ func (cmd *Cmd) Bool() (bool, error) { } } -func (cmd *Cmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser) +func (cmd *Cmd) readReply(rd proto.Reader) error { + cmd.val, cmd.err = rd.ReadReply(sliceParser) if cmd.err != nil { return cmd.err } @@ -252,7 +252,7 @@ func (cmd *Cmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func sliceParser(rd proto.Reader, n int64) (interface{}, error) { vals := make([]interface{}, 0, n) for i := int64(0); i < n; i++ { v, err := rd.ReadReply(sliceParser) @@ -306,9 +306,9 @@ func (cmd *SliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SliceCmd) readReply(cn *pool.Conn) error { +func (cmd *SliceCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(sliceParser) + v, cmd.err = rd.ReadArrayReply(sliceParser) if cmd.err != nil { return cmd.err } @@ -344,8 +344,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadStringReply() +func (cmd *StatusCmd) readReply(rd proto.Reader) error { + cmd.val, cmd.err = rd.ReadStringReply() return cmd.err } @@ -377,8 +377,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadIntReply() +func (cmd *IntCmd) readReply(rd proto.Reader) error { + cmd.val, cmd.err = rd.ReadIntReply() return cmd.err } @@ -412,9 +412,9 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) readReply(cn *pool.Conn) error { +func (cmd *DurationCmd) readReply(rd proto.Reader) error { var n int64 - n, cmd.err = cn.Rd.ReadIntReply() + n, cmd.err = rd.ReadIntReply() if cmd.err != nil { return cmd.err } @@ -450,9 +450,9 @@ func (cmd *TimeCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *TimeCmd) readReply(cn *pool.Conn) error { +func (cmd *TimeCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(timeParser) + v, cmd.err = rd.ReadArrayReply(timeParser) if cmd.err != nil { return cmd.err } @@ -461,7 +461,7 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func timeParser(rd *proto.Reader, n int64) (interface{}, error) { +func timeParser(rd proto.Reader, n int64) (interface{}, error) { if n != 2 { return nil, fmt.Errorf("got %d elements, expected 2", n) } @@ -509,9 +509,9 @@ func (cmd *BoolCmd) String() string { var ok = []byte("OK") -func (cmd *BoolCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadReply(nil) + v, cmd.err = rd.ReadReply(nil) // `SET key value NX` returns nil when key already exists. But // `SETNX key value` returns bool (0/1). So convert nil to bool. // TODO: is this okay? @@ -596,8 +596,8 @@ func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadBytesReply() +func (cmd *StringCmd) readReply(rd proto.Reader) error { + cmd.val, cmd.err = rd.ReadBytesReply() return cmd.err } @@ -629,8 +629,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadFloatReply() +func (cmd *FloatCmd) readReply(rd proto.Reader) error { + cmd.val, cmd.err = rd.ReadFloatReply() return cmd.err } @@ -666,9 +666,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { return proto.ScanSlice(cmd.Val(), container) } -func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *StringSliceCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser) + v, cmd.err = rd.ReadArrayReply(stringSliceParser) if cmd.err != nil { return cmd.err } @@ -677,7 +677,7 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func stringSliceParser(rd proto.Reader, n int64) (interface{}, error) { ss := make([]string, 0, n) for i := int64(0); i < n; i++ { s, err := rd.ReadStringReply() @@ -720,9 +720,9 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolSliceCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser) + v, cmd.err = rd.ReadArrayReply(boolSliceParser) if cmd.err != nil { return cmd.err } @@ -731,7 +731,7 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func boolSliceParser(rd proto.Reader, n int64) (interface{}, error) { bools := make([]bool, 0, n) for i := int64(0); i < n; i++ { n, err := rd.ReadIntReply() @@ -771,9 +771,9 @@ func (cmd *StringStringMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStringMapCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser) + v, cmd.err = rd.ReadArrayReply(stringStringMapParser) if cmd.err != nil { return cmd.err } @@ -782,7 +782,7 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { +func stringStringMapParser(rd proto.Reader, n int64) (interface{}, error) { m := make(map[string]string, n/2) for i := int64(0); i < n; i += 2 { key, err := rd.ReadStringReply() @@ -828,9 +828,9 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringIntMapCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser) + v, cmd.err = rd.ReadArrayReply(stringIntMapParser) if cmd.err != nil { return cmd.err } @@ -839,7 +839,7 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { +func stringIntMapParser(rd proto.Reader, n int64) (interface{}, error) { m := make(map[string]int64, n/2) for i := int64(0); i < n; i += 2 { key, err := rd.ReadStringReply() @@ -885,9 +885,9 @@ func (cmd *StringStructMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStructMapCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser) + v, cmd.err = rd.ReadArrayReply(stringStructMapParser) if cmd.err != nil { return cmd.err } @@ -896,7 +896,7 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { +func stringStructMapParser(rd proto.Reader, n int64) (interface{}, error) { m := make(map[string]struct{}, n) for i := int64(0); i < n; i++ { key, err := rd.ReadStringReply() @@ -942,9 +942,9 @@ func (cmd *XMessageSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *XMessageSliceCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser) + v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) if cmd.err != nil { return cmd.err } @@ -953,10 +953,10 @@ func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func xMessageSliceParser(rd proto.Reader, n int64) (interface{}, error) { msgs := make([]XMessage, 0, n) for i := int64(0); i < n; i++ { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + _, err := rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) { id, err := rd.ReadStringReply() if err != nil { return nil, err @@ -981,7 +981,7 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { } // Implements proto.MultiBulkParse -func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { +func stringInterfaceMapParser(rd proto.Reader, n int64) (interface{}, error) { m := make(map[string]interface{}, n/2) for i := int64(0); i < n; i += 2 { key, err := rd.ReadStringReply() @@ -1032,9 +1032,9 @@ func (cmd *XStreamSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *XStreamSliceCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) + v, cmd.err = rd.ReadArrayReply(xStreamSliceParser) if cmd.err != nil { return cmd.err } @@ -1043,10 +1043,10 @@ func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func xStreamSliceParser(rd proto.Reader, n int64) (interface{}, error) { ret := make([]XStream, 0, n) for i := int64(0); i < n; i++ { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + _, err := rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) { if n != 2 { return nil, fmt.Errorf("got %d, wanted 2", n) } @@ -1108,9 +1108,9 @@ func (cmd *XPendingCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XPendingCmd) readReply(cn *pool.Conn) error { +func (cmd *XPendingCmd) readReply(rd proto.Reader) error { var info interface{} - info, cmd.err = cn.Rd.ReadArrayReply(xPendingParser) + info, cmd.err = rd.ReadArrayReply(xPendingParser) if cmd.err != nil { return cmd.err } @@ -1118,7 +1118,7 @@ func (cmd *XPendingCmd) readReply(cn *pool.Conn) error { return nil } -func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { +func xPendingParser(rd proto.Reader, n int64) (interface{}, error) { if n != 4 { return nil, fmt.Errorf("got %d, wanted 4", n) } @@ -1143,9 +1143,9 @@ func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { Lower: lower, Higher: higher, } - _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + _, err = rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) { for i := int64(0); i < n; i++ { - _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + _, err = rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) { if n != 2 { return nil, fmt.Errorf("got %d, wanted 2", n) } @@ -1219,9 +1219,9 @@ func (cmd *XPendingExtCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XPendingExtCmd) readReply(cn *pool.Conn) error { +func (cmd *XPendingExtCmd) readReply(rd proto.Reader) error { var info interface{} - info, cmd.err = cn.Rd.ReadArrayReply(xPendingExtSliceParser) + info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser) if cmd.err != nil { return cmd.err } @@ -1229,10 +1229,10 @@ func (cmd *XPendingExtCmd) readReply(cn *pool.Conn) error { return nil } -func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func xPendingExtSliceParser(rd proto.Reader, n int64) (interface{}, error) { ret := make([]XPendingExt, 0, n) for i := int64(0); i < n; i++ { - _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + _, err := rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) { if n != 4 { return nil, fmt.Errorf("got %d, wanted 4", n) } @@ -1302,9 +1302,9 @@ func (cmd *ZSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *ZSliceCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser) + v, cmd.err = rd.ReadArrayReply(zSliceParser) if cmd.err != nil { return cmd.err } @@ -1313,7 +1313,7 @@ func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func zSliceParser(rd proto.Reader, n int64) (interface{}, error) { zz := make([]Z, n/2) for i := int64(0); i < n; i += 2 { var err error @@ -1365,8 +1365,8 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.page) } -func (cmd *ScanCmd) readReply(cn *pool.Conn) error { - cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply() +func (cmd *ScanCmd) readReply(rd proto.Reader) error { + cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() return cmd.err } @@ -1416,9 +1416,9 @@ func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { +func (cmd *ClusterSlotsCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser) + v, cmd.err = rd.ReadArrayReply(clusterSlotsParser) if cmd.err != nil { return cmd.err } @@ -1427,7 +1427,7 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { +func clusterSlotsParser(rd proto.Reader, n int64) (interface{}, error) { slots := make([]ClusterSlot, n) for i := 0; i < len(slots); i++ { n, err := rd.ReadArrayLen() @@ -1570,9 +1570,9 @@ func (cmd *GeoLocationCmd) String() string { return cmdString(cmd, cmd.locations) } -func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoLocationCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) + v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) if cmd.err != nil { return cmd.err } @@ -1581,7 +1581,7 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { } func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { + return func(rd proto.Reader, n int64) (interface{}, error) { var loc GeoLocation var err error @@ -1625,7 +1625,7 @@ func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { } func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { + return func(rd proto.Reader, n int64) (interface{}, error) { locs := make([]GeoLocation, 0, n) for i := int64(0); i < n; i++ { v, err := rd.ReadReply(newGeoLocationParser(q)) @@ -1679,9 +1679,9 @@ func (cmd *GeoPosCmd) String() string { return cmdString(cmd, cmd.positions) } -func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoPosCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser) + v, cmd.err = rd.ReadArrayReply(geoPosSliceParser) if cmd.err != nil { return cmd.err } @@ -1689,7 +1689,7 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { return nil } -func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func geoPosSliceParser(rd proto.Reader, n int64) (interface{}, error) { positions := make([]*GeoPos, 0, n) for i := int64(0); i < n; i++ { v, err := rd.ReadReply(geoPosParser) @@ -1710,7 +1710,7 @@ func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { return positions, nil } -func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { +func geoPosParser(rd proto.Reader, n int64) (interface{}, error) { var pos GeoPos var err error @@ -1765,9 +1765,9 @@ func (cmd *CommandsInfoCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { +func (cmd *CommandsInfoCmd) readReply(rd proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser) + v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser) if cmd.err != nil { return cmd.err } @@ -1776,7 +1776,7 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { } // Implements proto.MultiBulkParse -func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { +func commandInfoSliceParser(rd proto.Reader, n int64) (interface{}, error) { m := make(map[string]*CommandInfo, n) for i := int64(0); i < n; i++ { v, err := rd.ReadReply(commandInfoParser) @@ -1790,7 +1790,7 @@ func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { return m, nil } -func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { +func commandInfoParser(rd proto.Reader, n int64) (interface{}, error) { var cmd CommandInfo var err error diff --git a/internal/pool/conn.go b/internal/pool/conn.go index a70e818..cfdf60d 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -13,7 +13,7 @@ var noDeadline = time.Time{} type Conn struct { netConn net.Conn - Rd *proto.Reader + Rd proto.Reader wb *proto.WriteBuffer concurrentReadWrite bool diff --git a/internal/proto/reader.go b/internal/proto/reader.go index e4a84f1..3e9c38a 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -26,39 +26,39 @@ func (e RedisError) Error() string { return string(e) } //------------------------------------------------------------------------------ -type MultiBulkParse func(*Reader, int64) (interface{}, error) +type MultiBulkParse func(Reader, int64) (interface{}, error) type Reader struct { src *ElasticBufReader } -func NewReader(src *ElasticBufReader) *Reader { - return &Reader{ +func NewReader(src *ElasticBufReader) Reader { + return Reader{ src: src, } } -func (r *Reader) Reset(rd io.Reader) { +func (r Reader) Reset(rd io.Reader) { r.src.Reset(rd) } -func (r *Reader) Buffer() []byte { +func (r Reader) Buffer() []byte { return r.src.Buffer() } -func (r *Reader) ResetBuffer(buf []byte) { +func (r Reader) ResetBuffer(buf []byte) { r.src.ResetBuffer(buf) } -func (r *Reader) Bytes() []byte { +func (r Reader) Bytes() []byte { return r.src.Bytes() } -func (r *Reader) ReadN(n int) ([]byte, error) { +func (r Reader) ReadN(n int) ([]byte, error) { return r.src.ReadN(n) } -func (r *Reader) ReadLine() ([]byte, error) { +func (r Reader) ReadLine() ([]byte, error) { line, err := r.src.ReadLine() if err != nil { return nil, err @@ -72,7 +72,7 @@ func (r *Reader) ReadLine() ([]byte, error) { return line, nil } -func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { +func (r Reader) ReadReply(m MultiBulkParse) (interface{}, error) { line, err := r.ReadLine() if err != nil { return nil, err @@ -97,7 +97,7 @@ func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { return nil, fmt.Errorf("redis: can't parse %.100q", line) } -func (r *Reader) ReadIntReply() (int64, error) { +func (r Reader) ReadIntReply() (int64, error) { line, err := r.ReadLine() if err != nil { return 0, err @@ -112,7 +112,7 @@ func (r *Reader) ReadIntReply() (int64, error) { } } -func (r *Reader) ReadTmpBytesReply() ([]byte, error) { +func (r Reader) ReadTmpBytesReply() ([]byte, error) { line, err := r.ReadLine() if err != nil { return nil, err @@ -129,7 +129,7 @@ func (r *Reader) ReadTmpBytesReply() ([]byte, error) { } } -func (r *Reader) ReadBytesReply() ([]byte, error) { +func (r Reader) ReadBytesReply() ([]byte, error) { b, err := r.ReadTmpBytesReply() if err != nil { return nil, err @@ -139,7 +139,7 @@ func (r *Reader) ReadBytesReply() ([]byte, error) { return cp, nil } -func (r *Reader) ReadStringReply() (string, error) { +func (r Reader) ReadStringReply() (string, error) { b, err := r.ReadTmpBytesReply() if err != nil { return "", err @@ -147,7 +147,7 @@ func (r *Reader) ReadStringReply() (string, error) { return string(b), nil } -func (r *Reader) ReadFloatReply() (float64, error) { +func (r Reader) ReadFloatReply() (float64, error) { b, err := r.ReadTmpBytesReply() if err != nil { return 0, err @@ -155,7 +155,7 @@ func (r *Reader) ReadFloatReply() (float64, error) { return util.ParseFloat(b, 64) } -func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { +func (r Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { line, err := r.ReadLine() if err != nil { return nil, err @@ -174,7 +174,7 @@ func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { } } -func (r *Reader) ReadArrayLen() (int64, error) { +func (r Reader) ReadArrayLen() (int64, error) { line, err := r.ReadLine() if err != nil { return 0, err @@ -189,7 +189,7 @@ func (r *Reader) ReadArrayLen() (int64, error) { } } -func (r *Reader) ReadScanReply() ([]string, uint64, error) { +func (r Reader) ReadScanReply() ([]string, uint64, error) { n, err := r.ReadArrayLen() if err != nil { return nil, 0, err @@ -220,7 +220,7 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { return keys, cursor, err } -func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) { +func (r Reader) readTmpBytesReply(line []byte) ([]byte, error) { if isNilReply(line) { return nil, Nil } @@ -237,7 +237,7 @@ func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) { return b[:replyLen], nil } -func (r *Reader) ReadInt() (int64, error) { +func (r Reader) ReadInt() (int64, error) { b, err := r.ReadTmpBytesReply() if err != nil { return 0, err @@ -245,7 +245,7 @@ func (r *Reader) ReadInt() (int64, error) { return util.ParseInt(b, 10, 64) } -func (r *Reader) ReadUint() (uint64, error) { +func (r Reader) ReadUint() (uint64, error) { b, err := r.ReadTmpBytesReply() if err != nil { return 0, err diff --git a/internal/proto/reader_test.go b/internal/proto/reader_test.go index 658850a..2025248 100644 --- a/internal/proto/reader_test.go +++ b/internal/proto/reader_test.go @@ -11,7 +11,7 @@ import ( . "github.com/onsi/gomega" ) -func newReader(s string) *proto.Reader { +func newReader(s string) proto.Reader { return proto.NewReader(proto.NewElasticBufReader(strings.NewReader(s))) } @@ -78,7 +78,7 @@ func benchmarkParseReply(b *testing.B, reply string, m proto.MultiBulkParse, wan } } -func multiBulkParse(p *proto.Reader, n int64) (interface{}, error) { +func multiBulkParse(p proto.Reader, n int64) (interface{}, error) { vv := make([]interface{}, 0, n) for i := int64(0); i < n; i++ { v, err := p.ReadReply(multiBulkParse) diff --git a/pubsub.go b/pubsub.go index e3c6338..0d146a7 100644 --- a/pubsub.go +++ b/pubsub.go @@ -340,7 +340,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { } cn.SetReadTimeout(timeout) - err = c.cmd.readReply(cn) + err = c.cmd.readReply(cn.Rd) c.releaseConn(cn, err, timeout > 0) if err != nil { return nil, err diff --git a/redis.go b/redis.go index 2913938..32daab1 100644 --- a/redis.go +++ b/redis.go @@ -167,7 +167,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { } cn.SetReadTimeout(c.cmdTimeout(cmd)) - err = cmd.readReply(cn) + err = cmd.readReply(cn.Rd) c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue @@ -264,12 +264,12 @@ func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, err // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - return true, pipelineReadCmds(cn, cmds) + return true, pipelineReadCmds(cn.Rd, cmds) } -func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { +func pipelineReadCmds(rd proto.Reader, cmds []Cmder) error { for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } @@ -279,7 +279,8 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + err := txPipelineWriteMulti(cn, cmds) + if err != nil { setCmdsErr(cmds, err) return true, err } @@ -287,12 +288,13 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - if err := c.txPipelineReadQueued(cn, cmds); err != nil { + err = c.txPipelineReadQueued(cn.Rd, cmds) + if err != nil { setCmdsErr(cmds, err) return false, err } - return false, pipelineReadCmds(cn, cmds) + return false, pipelineReadCmds(cn.Rd, cmds) } func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { @@ -303,22 +305,23 @@ func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { return writeCmd(cn, multiExec...) } -func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { +func (c *baseClient) txPipelineReadQueued(rd proto.Reader, cmds []Cmder) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + err := statusCmd.readReply(rd) + if err != nil { return err } for _ = range cmds { - err := statusCmd.readReply(cn) + err = statusCmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr