mirror of https://github.com/go-redis/redis.git
Merge pull request #1493 from go-redis/feature/ctx-cancel
Feature/ctx cancel
This commit is contained in:
commit
c89ef0aacf
12
cluster.go
12
cluster.go
|
@ -751,15 +751,6 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
||||||
err := c._process(ctx, cmd)
|
|
||||||
if err != nil {
|
|
||||||
cmd.SetErr(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClusterClient) _process(ctx context.Context, cmd Cmder) error {
|
|
||||||
cmdInfo := c.cmdInfo(cmd.Name())
|
cmdInfo := c.cmdInfo(cmd.Name())
|
||||||
slot := c.cmdSlot(cmd)
|
slot := c.cmdSlot(cmd)
|
||||||
|
|
||||||
|
@ -1197,9 +1188,12 @@ func (c *ClusterClient) pipelineReadCmds(
|
||||||
) error {
|
) error {
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
err := cmd.readReply(rd)
|
err := cmd.readReply(rd)
|
||||||
|
cmd.SetErr(err)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.checkMovedErr(ctx, cmd, err, failedCmds) {
|
if c.checkMovedErr(ctx, cmd, err, failedCmds) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
146
command.go
146
command.go
|
@ -299,9 +299,9 @@ func (cmd *Cmd) Bool() (bool, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *Cmd) readReply(rd *proto.Reader) error {
|
func (cmd *Cmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.val, cmd.err = rd.ReadReply(sliceParser)
|
cmd.val, err = rd.ReadReply(sliceParser)
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// sliceParser implements proto.MultiBulkParse.
|
// sliceParser implements proto.MultiBulkParse.
|
||||||
|
@ -357,10 +357,9 @@ func (cmd *SliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *SliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *SliceCmd) readReply(rd *proto.Reader) error {
|
||||||
var v interface{}
|
v, err := rd.ReadArrayReply(sliceParser)
|
||||||
v, cmd.err = rd.ReadArrayReply(sliceParser)
|
if err != nil {
|
||||||
if cmd.err != nil {
|
return err
|
||||||
return cmd.err
|
|
||||||
}
|
}
|
||||||
cmd.val = v.([]interface{})
|
cmd.val = v.([]interface{})
|
||||||
return nil
|
return nil
|
||||||
|
@ -397,9 +396,9 @@ func (cmd *StatusCmd) String() string {
|
||||||
return cmdString(cmd, cmd.val)
|
return cmdString(cmd, cmd.val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *StatusCmd) readReply(rd *proto.Reader) error {
|
func (cmd *StatusCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.val, cmd.err = rd.ReadString()
|
cmd.val, err = rd.ReadString()
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -437,9 +436,9 @@ func (cmd *IntCmd) String() string {
|
||||||
return cmdString(cmd, cmd.val)
|
return cmdString(cmd, cmd.val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *IntCmd) readReply(rd *proto.Reader) error {
|
func (cmd *IntCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.val, cmd.err = rd.ReadIntReply()
|
cmd.val, err = rd.ReadIntReply()
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -474,7 +473,7 @@ func (cmd *IntSliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *IntSliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *IntSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]int64, n)
|
cmd.val = make([]int64, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
num, err := rd.ReadIntReply()
|
num, err := rd.ReadIntReply()
|
||||||
|
@ -485,7 +484,7 @@ func (cmd *IntSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -522,10 +521,9 @@ func (cmd *DurationCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *DurationCmd) readReply(rd *proto.Reader) error {
|
func (cmd *DurationCmd) readReply(rd *proto.Reader) error {
|
||||||
var n int64
|
n, err := rd.ReadIntReply()
|
||||||
n, cmd.err = rd.ReadIntReply()
|
if err != nil {
|
||||||
if cmd.err != nil {
|
return err
|
||||||
return cmd.err
|
|
||||||
}
|
}
|
||||||
switch n {
|
switch n {
|
||||||
// -2 if the key does not exist
|
// -2 if the key does not exist
|
||||||
|
@ -570,7 +568,7 @@ func (cmd *TimeCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
|
func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
if n != 2 {
|
if n != 2 {
|
||||||
return nil, fmt.Errorf("got %d elements, expected 2", n)
|
return nil, fmt.Errorf("got %d elements, expected 2", n)
|
||||||
}
|
}
|
||||||
|
@ -588,7 +586,7 @@ func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
|
||||||
cmd.val = time.Unix(sec, microsec*1000)
|
cmd.val = time.Unix(sec, microsec*1000)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -623,17 +621,15 @@ func (cmd *BoolCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
|
func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
|
||||||
var v interface{}
|
v, err := rd.ReadReply(nil)
|
||||||
v, cmd.err = rd.ReadReply(nil)
|
|
||||||
// `SET key value NX` returns nil when key already exists. But
|
// `SET key value NX` returns nil when key already exists. But
|
||||||
// `SETNX key value` returns bool (0/1). So convert nil to bool.
|
// `SETNX key value` returns bool (0/1). So convert nil to bool.
|
||||||
if cmd.err == Nil {
|
if err == Nil {
|
||||||
cmd.val = false
|
cmd.val = false
|
||||||
cmd.err = nil
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if cmd.err != nil {
|
if err != nil {
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
switch v := v.(type) {
|
switch v := v.(type) {
|
||||||
case int64:
|
case int64:
|
||||||
|
@ -643,8 +639,7 @@ func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
|
||||||
cmd.val = v == "OK"
|
cmd.val = v == "OK"
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
cmd.err = fmt.Errorf("got %T, wanted int64 or string", v)
|
return fmt.Errorf("got %T, wanted int64 or string", v)
|
||||||
return cmd.err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -736,9 +731,9 @@ func (cmd *StringCmd) String() string {
|
||||||
return cmdString(cmd, cmd.val)
|
return cmdString(cmd, cmd.val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *StringCmd) readReply(rd *proto.Reader) error {
|
func (cmd *StringCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.val, cmd.err = rd.ReadString()
|
cmd.val, err = rd.ReadString()
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -772,9 +767,9 @@ func (cmd *FloatCmd) String() string {
|
||||||
return cmdString(cmd, cmd.val)
|
return cmdString(cmd, cmd.val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *FloatCmd) readReply(rd *proto.Reader) error {
|
func (cmd *FloatCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.val, cmd.err = rd.ReadFloatReply()
|
cmd.val, err = rd.ReadFloatReply()
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -813,7 +808,7 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]string, n)
|
cmd.val = make([]string, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
switch s, err := rd.ReadString(); {
|
switch s, err := rd.ReadString(); {
|
||||||
|
@ -827,7 +822,7 @@ func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -862,7 +857,7 @@ func (cmd *BoolSliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]bool, n)
|
cmd.val = make([]bool, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
n, err := rd.ReadIntReply()
|
n, err := rd.ReadIntReply()
|
||||||
|
@ -873,7 +868,7 @@ func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -908,7 +903,7 @@ func (cmd *StringStringMapCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
|
func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make(map[string]string, n/2)
|
cmd.val = make(map[string]string, n/2)
|
||||||
for i := int64(0); i < n; i += 2 {
|
for i := int64(0); i < n; i += 2 {
|
||||||
key, err := rd.ReadString()
|
key, err := rd.ReadString()
|
||||||
|
@ -925,7 +920,7 @@ func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -960,7 +955,7 @@ func (cmd *StringIntMapCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
|
func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make(map[string]int64, n/2)
|
cmd.val = make(map[string]int64, n/2)
|
||||||
for i := int64(0); i < n; i += 2 {
|
for i := int64(0); i < n; i += 2 {
|
||||||
key, err := rd.ReadString()
|
key, err := rd.ReadString()
|
||||||
|
@ -977,7 +972,7 @@ func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1012,7 +1007,7 @@ func (cmd *StringStructMapCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
|
func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make(map[string]struct{}, n)
|
cmd.val = make(map[string]struct{}, n)
|
||||||
for i := int64(0); i < n; i++ {
|
for i := int64(0); i < n; i++ {
|
||||||
key, err := rd.ReadString()
|
key, err := rd.ReadString()
|
||||||
|
@ -1023,7 +1018,7 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1063,10 +1058,9 @@ func (cmd *XMessageSliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
var v interface{}
|
v, err := rd.ReadArrayReply(xMessageSliceParser)
|
||||||
v, cmd.err = rd.ReadArrayReply(xMessageSliceParser)
|
if err != nil {
|
||||||
if cmd.err != nil {
|
return err
|
||||||
return cmd.err
|
|
||||||
}
|
}
|
||||||
cmd.val = v.([]XMessage)
|
cmd.val = v.([]XMessage)
|
||||||
return nil
|
return nil
|
||||||
|
@ -1163,7 +1157,7 @@ func (cmd *XStreamSliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]XStream, n)
|
cmd.val = make([]XStream, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
i := i
|
i := i
|
||||||
|
@ -1194,7 +1188,7 @@ func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1235,7 +1229,7 @@ func (cmd *XPendingCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
|
func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
if n != 4 {
|
if n != 4 {
|
||||||
return nil, fmt.Errorf("got %d, wanted 4", n)
|
return nil, fmt.Errorf("got %d, wanted 4", n)
|
||||||
}
|
}
|
||||||
|
@ -1296,7 +1290,7 @@ func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1337,7 +1331,7 @@ func (cmd *XPendingExtCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
|
func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]XPendingExt, 0, n)
|
cmd.val = make([]XPendingExt, 0, n)
|
||||||
for i := int64(0); i < n; i++ {
|
for i := int64(0); i < n; i++ {
|
||||||
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
@ -1379,7 +1373,7 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1420,8 +1414,7 @@ func (cmd *XInfoGroupsCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
|
func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
func(rd *proto.Reader, n int64) (interface{}, error) {
|
|
||||||
for i := int64(0); i < n; i++ {
|
for i := int64(0); i < n; i++ {
|
||||||
v, err := rd.ReadReply(xGroupInfoParser)
|
v, err := rd.ReadReply(xGroupInfoParser)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1431,7 +1424,7 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func xGroupInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
|
func xGroupInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
@ -1507,7 +1500,7 @@ func (cmd *ZSliceCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
|
func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]Z, n/2)
|
cmd.val = make([]Z, n/2)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
member, err := rd.ReadString()
|
member, err := rd.ReadString()
|
||||||
|
@ -1527,7 +1520,7 @@ func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1562,7 +1555,7 @@ func (cmd *ZWithKeyCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
|
func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
if n != 3 {
|
if n != 3 {
|
||||||
return nil, fmt.Errorf("got %d elements, expected 3", n)
|
return nil, fmt.Errorf("got %d elements, expected 3", n)
|
||||||
}
|
}
|
||||||
|
@ -1587,7 +1580,7 @@ func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1625,9 +1618,9 @@ func (cmd *ScanCmd) String() string {
|
||||||
return cmdString(cmd, cmd.page)
|
return cmdString(cmd, cmd.page)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ScanCmd) readReply(rd *proto.Reader) error {
|
func (cmd *ScanCmd) readReply(rd *proto.Reader) (err error) {
|
||||||
cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply()
|
cmd.page, cmd.cursor, err = rd.ReadScanReply()
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterator creates a new ScanIterator.
|
// Iterator creates a new ScanIterator.
|
||||||
|
@ -1680,7 +1673,7 @@ func (cmd *ClusterSlotsCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
|
func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]ClusterSlot, n)
|
cmd.val = make([]ClusterSlot, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
n, err := rd.ReadArrayLen()
|
n, err := rd.ReadArrayLen()
|
||||||
|
@ -1742,7 +1735,7 @@ func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -1834,10 +1827,9 @@ func (cmd *GeoLocationCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
|
func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
|
||||||
var v interface{}
|
v, err := rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
|
||||||
v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
|
if err != nil {
|
||||||
if cmd.err != nil {
|
return err
|
||||||
return cmd.err
|
|
||||||
}
|
}
|
||||||
cmd.locations = v.([]GeoLocation)
|
cmd.locations = v.([]GeoLocation)
|
||||||
return nil
|
return nil
|
||||||
|
@ -1947,7 +1939,7 @@ func (cmd *GeoPosCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
|
func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]*GeoPos, n)
|
cmd.val = make([]*GeoPos, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
i := i
|
i := i
|
||||||
|
@ -1978,7 +1970,7 @@ func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
@ -2024,7 +2016,7 @@ func (cmd *CommandsInfoCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make(map[string]*CommandInfo, n)
|
cmd.val = make(map[string]*CommandInfo, n)
|
||||||
for i := int64(0); i < n; i++ {
|
for i := int64(0); i < n; i++ {
|
||||||
v, err := rd.ReadReply(commandInfoParser)
|
v, err := rd.ReadReply(commandInfoParser)
|
||||||
|
@ -2036,7 +2028,7 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
|
func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
|
@ -2211,7 +2203,7 @@ func (cmd *SlowLogCmd) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error {
|
func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error {
|
||||||
_, cmd.err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
|
||||||
cmd.val = make([]SlowLog, n)
|
cmd.val = make([]SlowLog, n)
|
||||||
for i := 0; i < len(cmd.val); i++ {
|
for i := 0; i < len(cmd.val); i++ {
|
||||||
n, err := rd.ReadArrayLen()
|
n, err := rd.ReadArrayLen()
|
||||||
|
@ -2281,5 +2273,5 @@ func (cmd *SlowLogCmd) readReply(rd *proto.Reader) error {
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
})
|
})
|
||||||
return cmd.err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,7 @@ var _ = Describe("Cmd", func() {
|
||||||
Expect(tm2).To(BeTemporally("==", tm))
|
Expect(tm2).To(BeTemporally("==", tm))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("allow to set custom error", func() {
|
It("allows to set custom error", func() {
|
||||||
e := errors.New("custom error")
|
e := errors.New("custom error")
|
||||||
cmd := redis.Cmd{}
|
cmd := redis.Cmd{}
|
||||||
cmd.SetErr(e)
|
cmd.SetErr(e)
|
||||||
|
|
43
redis.go
43
redis.go
|
@ -49,7 +49,11 @@ func (hs hooks) process(
|
||||||
ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
|
ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error,
|
||||||
) error {
|
) error {
|
||||||
if len(hs.hooks) == 0 {
|
if len(hs.hooks) == 0 {
|
||||||
|
err := hs.withContext(ctx, func() error {
|
||||||
return fn(ctx, cmd)
|
return fn(ctx, cmd)
|
||||||
|
})
|
||||||
|
cmd.SetErr(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var hookIndex int
|
var hookIndex int
|
||||||
|
@ -63,7 +67,10 @@ func (hs hooks) process(
|
||||||
}
|
}
|
||||||
|
|
||||||
if retErr == nil {
|
if retErr == nil {
|
||||||
retErr = fn(ctx, cmd)
|
retErr = hs.withContext(ctx, func() error {
|
||||||
|
return fn(ctx, cmd)
|
||||||
|
})
|
||||||
|
cmd.SetErr(retErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
for hookIndex--; hookIndex >= 0; hookIndex-- {
|
for hookIndex--; hookIndex >= 0; hookIndex-- {
|
||||||
|
@ -80,7 +87,10 @@ func (hs hooks) processPipeline(
|
||||||
ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error,
|
ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error,
|
||||||
) error {
|
) error {
|
||||||
if len(hs.hooks) == 0 {
|
if len(hs.hooks) == 0 {
|
||||||
|
err := hs.withContext(ctx, func() error {
|
||||||
return fn(ctx, cmds)
|
return fn(ctx, cmds)
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var hookIndex int
|
var hookIndex int
|
||||||
|
@ -94,7 +104,9 @@ func (hs hooks) processPipeline(
|
||||||
}
|
}
|
||||||
|
|
||||||
if retErr == nil {
|
if retErr == nil {
|
||||||
retErr = fn(ctx, cmds)
|
retErr = hs.withContext(ctx, func() error {
|
||||||
|
return fn(ctx, cmds)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for hookIndex--; hookIndex >= 0; hookIndex-- {
|
for hookIndex--; hookIndex >= 0; hookIndex-- {
|
||||||
|
@ -114,6 +126,23 @@ func (hs hooks) processTxPipeline(
|
||||||
return hs.processPipeline(ctx, cmds, fn)
|
return hs.processPipeline(ctx, cmds, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hs hooks) withContext(ctx context.Context, fn func() error) error {
|
||||||
|
done := ctx.Done()
|
||||||
|
if done == nil {
|
||||||
|
return fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
errc := make(chan error, 1)
|
||||||
|
go func() { errc <- fn() }()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return ctx.Err()
|
||||||
|
case err := <-errc:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type baseClient struct {
|
type baseClient struct {
|
||||||
|
@ -283,15 +312,6 @@ func (c *baseClient) withConn(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
|
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
|
||||||
err := c._process(ctx, cmd)
|
|
||||||
if err != nil {
|
|
||||||
cmd.SetErr(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
||||||
attempt := attempt
|
attempt := attempt
|
||||||
|
@ -435,6 +455,7 @@ func (c *baseClient) pipelineProcessCmds(
|
||||||
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
|
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
|
||||||
for _, cmd := range cmds {
|
for _, cmd := range cmds {
|
||||||
err := cmd.readReply(rd)
|
err := cmd.readReply(rd)
|
||||||
|
cmd.SetErr(err)
|
||||||
if err != nil && !isRedisError(err) {
|
if err != nil && !isRedisError(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -389,3 +389,28 @@ var _ = Describe("Client OnConnect", func() {
|
||||||
Expect(name).To(Equal("on_connect"))
|
Expect(name).To(Equal("on_connect"))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Client context cancelation", func() {
|
||||||
|
var opt *redis.Options
|
||||||
|
var client *redis.Client
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
opt = redisOptions()
|
||||||
|
opt.ReadTimeout = -1
|
||||||
|
opt.WriteTimeout = -1
|
||||||
|
client = redis.NewClient(opt)
|
||||||
|
})
|
||||||
|
|
||||||
|
AfterEach(func() {
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Blocking operation cancelation", func() {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := client.BLPop(ctx, 1*time.Second, "test").Err()
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err).To(BeIdenticalTo(context.Canceled))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
15
ring.go
15
ring.go
|
@ -588,15 +588,6 @@ func (c *Ring) cmdShard(ctx context.Context, cmd Cmder) (*ringShard, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Ring) process(ctx context.Context, cmd Cmder) error {
|
func (c *Ring) process(ctx context.Context, cmd Cmder) error {
|
||||||
err := c._process(ctx, cmd)
|
|
||||||
if err != nil {
|
|
||||||
cmd.SetErr(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Ring) _process(ctx context.Context, cmd Cmder) error {
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
||||||
if attempt > 0 {
|
if attempt > 0 {
|
||||||
|
@ -694,11 +685,9 @@ func (c *Ring) processShardPipeline(
|
||||||
}
|
}
|
||||||
|
|
||||||
if tx {
|
if tx {
|
||||||
err = shard.Client.processTxPipeline(ctx, cmds)
|
return shard.Client.processTxPipeline(ctx, cmds)
|
||||||
} else {
|
|
||||||
err = shard.Client.processPipeline(ctx, cmds)
|
|
||||||
}
|
}
|
||||||
return err
|
return shard.Client.processPipeline(ctx, cmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
|
func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
|
||||||
|
|
|
@ -224,6 +224,7 @@ func masterSlaveDialer(
|
||||||
// SentinelClient is a client for a Redis Sentinel.
|
// SentinelClient is a client for a Redis Sentinel.
|
||||||
type SentinelClient struct {
|
type SentinelClient struct {
|
||||||
*baseClient
|
*baseClient
|
||||||
|
hooks
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +254,7 @@ func (c *SentinelClient) WithContext(ctx context.Context) *SentinelClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
|
func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
|
||||||
return c.baseClient.process(ctx, cmd)
|
return c.hooks.process(ctx, cmd, c.baseClient.process)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SentinelClient) pubSub() *PubSub {
|
func (c *SentinelClient) pubSub() *PubSub {
|
||||||
|
|
Loading…
Reference in New Issue