Pass cn.Rd where possible

This commit is contained in:
Vladimir Mihailenco 2018-08-15 09:38:58 +03:00
parent cfe305296f
commit 7c26d1ceb6
7 changed files with 123 additions and 119 deletions

View File

@ -1345,14 +1345,14 @@ func (c *ClusterClient) pipelineProcessCmds(
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
return c.pipelineReadCmds(cn, cmds, failedCmds)
return c.pipelineReadCmds(cn.Rd, cmds, failedCmds)
}
func (c *ClusterClient) pipelineReadCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
rd proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
for _, cmd := range cmds {
err := cmd.readReply(cn)
err := cmd.readReply(rd)
if err == nil {
continue
}
@ -1486,25 +1486,26 @@ func (c *ClusterClient) txPipelineProcessCmds(
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
err := c.txPipelineReadQueued(cn.Rd, cmds, failedCmds)
if err != nil {
setCmdsErr(cmds, err)
return err
}
return pipelineReadCmds(cn, cmds)
return pipelineReadCmds(cn.Rd, cmds)
}
func (c *ClusterClient) txPipelineReadQueued(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
rd proto.Reader, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
// Parse queued replies.
var statusCmd StatusCmd
if err := statusCmd.readReply(cn); err != nil {
if err := statusCmd.readReply(rd); err != nil {
return err
}
for _, cmd := range cmds {
err := statusCmd.readReply(cn)
err := statusCmd.readReply(rd)
if err == nil {
continue
}
@ -1517,7 +1518,7 @@ func (c *ClusterClient) txPipelineReadQueued(
}
// Parse number of replies.
line, err := cn.Rd.ReadLine()
line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr

View File

@ -19,7 +19,7 @@ type Cmder interface {
Args() []interface{}
stringArg(int) string
readReply(*pool.Conn) error
readReply(rd proto.Reader) error
setErr(error)
readTimeout() *time.Duration
@ -239,8 +239,8 @@ func (cmd *Cmd) Bool() (bool, error) {
}
}
func (cmd *Cmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser)
func (cmd *Cmd) readReply(rd proto.Reader) error {
cmd.val, cmd.err = rd.ReadReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
@ -252,7 +252,7 @@ func (cmd *Cmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func sliceParser(rd proto.Reader, n int64) (interface{}, error) {
vals := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(sliceParser)
@ -306,9 +306,9 @@ func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *SliceCmd) readReply(cn *pool.Conn) error {
func (cmd *SliceCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(sliceParser)
v, cmd.err = rd.ReadArrayReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
@ -344,8 +344,8 @@ func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StatusCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadStringReply()
func (cmd *StatusCmd) readReply(rd proto.Reader) error {
cmd.val, cmd.err = rd.ReadStringReply()
return cmd.err
}
@ -377,8 +377,8 @@ func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *IntCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadIntReply()
func (cmd *IntCmd) readReply(rd proto.Reader) error {
cmd.val, cmd.err = rd.ReadIntReply()
return cmd.err
}
@ -412,9 +412,9 @@ func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *DurationCmd) readReply(cn *pool.Conn) error {
func (cmd *DurationCmd) readReply(rd proto.Reader) error {
var n int64
n, cmd.err = cn.Rd.ReadIntReply()
n, cmd.err = rd.ReadIntReply()
if cmd.err != nil {
return cmd.err
}
@ -450,9 +450,9 @@ func (cmd *TimeCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
func (cmd *TimeCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(timeParser)
v, cmd.err = rd.ReadArrayReply(timeParser)
if cmd.err != nil {
return cmd.err
}
@ -461,7 +461,7 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
func timeParser(rd proto.Reader, n int64) (interface{}, error) {
if n != 2 {
return nil, fmt.Errorf("got %d elements, expected 2", n)
}
@ -509,9 +509,9 @@ func (cmd *BoolCmd) String() string {
var ok = []byte("OK")
func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
func (cmd *BoolCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadReply(nil)
v, cmd.err = rd.ReadReply(nil)
// `SET key value NX` returns nil when key already exists. But
// `SETNX key value` returns bool (0/1). So convert nil to bool.
// TODO: is this okay?
@ -596,8 +596,8 @@ func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadBytesReply()
func (cmd *StringCmd) readReply(rd proto.Reader) error {
cmd.val, cmd.err = rd.ReadBytesReply()
return cmd.err
}
@ -629,8 +629,8 @@ func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *FloatCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadFloatReply()
func (cmd *FloatCmd) readReply(rd proto.Reader) error {
cmd.val, cmd.err = rd.ReadFloatReply()
return cmd.err
}
@ -666,9 +666,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
return proto.ScanSlice(cmd.Val(), container)
}
func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
func (cmd *StringSliceCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser)
v, cmd.err = rd.ReadArrayReply(stringSliceParser)
if cmd.err != nil {
return cmd.err
}
@ -677,7 +677,7 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func stringSliceParser(rd proto.Reader, n int64) (interface{}, error) {
ss := make([]string, 0, n)
for i := int64(0); i < n; i++ {
s, err := rd.ReadStringReply()
@ -720,9 +720,9 @@ func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
func (cmd *BoolSliceCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser)
v, cmd.err = rd.ReadArrayReply(boolSliceParser)
if cmd.err != nil {
return cmd.err
}
@ -731,7 +731,7 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func boolSliceParser(rd proto.Reader, n int64) (interface{}, error) {
bools := make([]bool, 0, n)
for i := int64(0); i < n; i++ {
n, err := rd.ReadIntReply()
@ -771,9 +771,9 @@ func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
func (cmd *StringStringMapCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser)
v, cmd.err = rd.ReadArrayReply(stringStringMapParser)
if cmd.err != nil {
return cmd.err
}
@ -782,7 +782,7 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
func stringStringMapParser(rd proto.Reader, n int64) (interface{}, error) {
m := make(map[string]string, n/2)
for i := int64(0); i < n; i += 2 {
key, err := rd.ReadStringReply()
@ -828,9 +828,9 @@ func (cmd *StringIntMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
func (cmd *StringIntMapCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser)
v, cmd.err = rd.ReadArrayReply(stringIntMapParser)
if cmd.err != nil {
return cmd.err
}
@ -839,7 +839,7 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
func stringIntMapParser(rd proto.Reader, n int64) (interface{}, error) {
m := make(map[string]int64, n/2)
for i := int64(0); i < n; i += 2 {
key, err := rd.ReadStringReply()
@ -885,9 +885,9 @@ func (cmd *StringStructMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
func (cmd *StringStructMapCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser)
v, cmd.err = rd.ReadArrayReply(stringStructMapParser)
if cmd.err != nil {
return cmd.err
}
@ -896,7 +896,7 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
func stringStructMapParser(rd proto.Reader, n int64) (interface{}, error) {
m := make(map[string]struct{}, n)
for i := int64(0); i < n; i++ {
key, err := rd.ReadStringReply()
@ -942,9 +942,9 @@ func (cmd *XMessageSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
func (cmd *XMessageSliceCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
v, cmd.err = rd.ReadArrayReply(xMessageSliceParser)
if cmd.err != nil {
return cmd.err
}
@ -953,10 +953,10 @@ func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func xMessageSliceParser(rd proto.Reader, n int64) (interface{}, error) {
msgs := make([]XMessage, 0, n)
for i := int64(0); i < n; i++ {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
_, err := rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) {
id, err := rd.ReadStringReply()
if err != nil {
return nil, err
@ -981,7 +981,7 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
}
// Implements proto.MultiBulkParse
func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) {
func stringInterfaceMapParser(rd proto.Reader, n int64) (interface{}, error) {
m := make(map[string]interface{}, n/2)
for i := int64(0); i < n; i += 2 {
key, err := rd.ReadStringReply()
@ -1032,9 +1032,9 @@ func (cmd *XStreamSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
func (cmd *XStreamSliceCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser)
v, cmd.err = rd.ReadArrayReply(xStreamSliceParser)
if cmd.err != nil {
return cmd.err
}
@ -1043,10 +1043,10 @@ func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func xStreamSliceParser(rd proto.Reader, n int64) (interface{}, error) {
ret := make([]XStream, 0, n)
for i := int64(0); i < n; i++ {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
_, err := rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) {
if n != 2 {
return nil, fmt.Errorf("got %d, wanted 2", n)
}
@ -1108,9 +1108,9 @@ func (cmd *XPendingCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XPendingCmd) readReply(cn *pool.Conn) error {
func (cmd *XPendingCmd) readReply(rd proto.Reader) error {
var info interface{}
info, cmd.err = cn.Rd.ReadArrayReply(xPendingParser)
info, cmd.err = rd.ReadArrayReply(xPendingParser)
if cmd.err != nil {
return cmd.err
}
@ -1118,7 +1118,7 @@ func (cmd *XPendingCmd) readReply(cn *pool.Conn) error {
return nil
}
func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
func xPendingParser(rd proto.Reader, n int64) (interface{}, error) {
if n != 4 {
return nil, fmt.Errorf("got %d, wanted 4", n)
}
@ -1143,9 +1143,9 @@ func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
Lower: lower,
Higher: higher,
}
_, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
_, err = rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) {
for i := int64(0); i < n; i++ {
_, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
_, err = rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) {
if n != 2 {
return nil, fmt.Errorf("got %d, wanted 2", n)
}
@ -1219,9 +1219,9 @@ func (cmd *XPendingExtCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *XPendingExtCmd) readReply(cn *pool.Conn) error {
func (cmd *XPendingExtCmd) readReply(rd proto.Reader) error {
var info interface{}
info, cmd.err = cn.Rd.ReadArrayReply(xPendingExtSliceParser)
info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser)
if cmd.err != nil {
return cmd.err
}
@ -1229,10 +1229,10 @@ func (cmd *XPendingExtCmd) readReply(cn *pool.Conn) error {
return nil
}
func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func xPendingExtSliceParser(rd proto.Reader, n int64) (interface{}, error) {
ret := make([]XPendingExt, 0, n)
for i := int64(0); i < n; i++ {
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
_, err := rd.ReadArrayReply(func(rd proto.Reader, n int64) (interface{}, error) {
if n != 4 {
return nil, fmt.Errorf("got %d, wanted 4", n)
}
@ -1302,9 +1302,9 @@ func (cmd *ZSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
func (cmd *ZSliceCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser)
v, cmd.err = rd.ReadArrayReply(zSliceParser)
if cmd.err != nil {
return cmd.err
}
@ -1313,7 +1313,7 @@ func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
func zSliceParser(rd proto.Reader, n int64) (interface{}, error) {
zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 {
var err error
@ -1365,8 +1365,8 @@ func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.page)
}
func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply()
func (cmd *ScanCmd) readReply(rd proto.Reader) error {
cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply()
return cmd.err
}
@ -1416,9 +1416,9 @@ func (cmd *ClusterSlotsCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
func (cmd *ClusterSlotsCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser)
v, cmd.err = rd.ReadArrayReply(clusterSlotsParser)
if cmd.err != nil {
return cmd.err
}
@ -1427,7 +1427,7 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
}
// Implements proto.MultiBulkParse
func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
func clusterSlotsParser(rd proto.Reader, n int64) (interface{}, error) {
slots := make([]ClusterSlot, n)
for i := 0; i < len(slots); i++ {
n, err := rd.ReadArrayLen()
@ -1570,9 +1570,9 @@ func (cmd *GeoLocationCmd) String() string {
return cmdString(cmd, cmd.locations)
}
func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
func (cmd *GeoLocationCmd) readReply(rd proto.Reader) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
v, cmd.err = </