This commit is contained in:
Vladimir Mihailenco 2020-09-23 10:29:13 +03:00
parent c89b69131d
commit a32502b1d9
2 changed files with 30 additions and 28 deletions

View File

@ -18,8 +18,8 @@ type Cmder interface {
Args() []interface{} Args() []interface{}
String() string String() string
stringArg(int) string stringArg(int) string
firstKeyPos() int firstKeyPos() int8
addKeyPos(int) setFirstKeyPos(int8)
readTimeout() *time.Duration readTimeout() *time.Duration
readReply(rd *proto.Reader) error readReply(rd *proto.Reader) error
@ -59,6 +59,10 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
} }
func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int { func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}
switch cmd.Name() { switch cmd.Name() {
case "eval", "evalsha": case "eval", "evalsha":
if cmd.stringArg(2) != "0" { if cmd.stringArg(2) != "0" {
@ -75,13 +79,10 @@ func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
} }
} }
if pos := cmd.firstKeyPos(); pos != 0 { if info != nil {
return pos return int(info.FirstKeyPos)
} }
if info == nil { return 0
return 0
}
return int(info.FirstKeyPos)
} }
func cmdString(cmd Cmder, val interface{}) string { func cmdString(cmd Cmder, val interface{}) string {
@ -108,10 +109,10 @@ func cmdString(cmd Cmder, val interface{}) string {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type baseCmd struct { type baseCmd struct {
ctx context.Context ctx context.Context
args []interface{} args []interface{}
err error err error
keyPos int keyPos int8
_readTimeout *time.Duration _readTimeout *time.Duration
} }
@ -153,12 +154,12 @@ func (cmd *baseCmd) stringArg(pos int) string {
return s return s
} }
func (cmd *baseCmd) firstKeyPos() int { func (cmd *baseCmd) firstKeyPos() int8 {
return cmd.keyPos return cmd.keyPos
} }
func (cmd *baseCmd) addKeyPos(offset int) { func (cmd *baseCmd) setFirstKeyPos(keyPos int8) {
cmd.keyPos += offset cmd.keyPos = keyPos
} }
func (cmd *baseCmd) SetErr(e error) { func (cmd *baseCmd) SetErr(e error) {

View File

@ -1505,19 +1505,19 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
args := make([]interface{}, 0, 5+len(a.Streams)) args := make([]interface{}, 0, 5+len(a.Streams))
args = append(args, "xread") args = append(args, "xread")
offset := 1 keyPos := int8(1)
if a.Count > 0 { if a.Count > 0 {
args = append(args, "count") args = append(args, "count")
args = append(args, a.Count) args = append(args, a.Count)
offset += 2 keyPos += 2
} }
if a.Block >= 0 { if a.Block >= 0 {
args = append(args, "block") args = append(args, "block")
args = append(args, int64(a.Block/time.Millisecond)) args = append(args, int64(a.Block/time.Millisecond))
offset += 2 keyPos += 2
} }
args = append(args, "streams") args = append(args, "streams")
offset += 1 keyPos += 1
for _, s := range a.Streams { for _, s := range a.Streams {
args = append(args, s) args = append(args, s)
} }
@ -1526,7 +1526,7 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
if a.Block >= 0 { if a.Block >= 0 {
cmd.setReadTimeout(a.Block) cmd.setReadTimeout(a.Block)
} }
cmd.addKeyPos(offset) cmd.setFirstKeyPos(keyPos)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -1581,21 +1581,21 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
args := make([]interface{}, 0, 8+len(a.Streams)) args := make([]interface{}, 0, 8+len(a.Streams))
args = append(args, "xreadgroup", "group", a.Group, a.Consumer) args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
offset := 1 keyPos := int8(1)
if a.Count > 0 { if a.Count > 0 {
args = append(args, "count", a.Count) args = append(args, "count", a.Count)
offset += 2 keyPos += 2
} }
if a.Block >= 0 { if a.Block >= 0 {
args = append(args, "block", int64(a.Block/time.Millisecond)) args = append(args, "block", int64(a.Block/time.Millisecond))
offset += 2 keyPos += 2
} }
if a.NoAck { if a.NoAck {
args = append(args, "noack") args = append(args, "noack")
offset += 1 keyPos += 1
} }
args = append(args, "streams") args = append(args, "streams")
offset += 1 keyPos += 1
for _, s := range a.Streams { for _, s := range a.Streams {
args = append(args, s) args = append(args, s)
} }
@ -1604,7 +1604,7 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic
if a.Block >= 0 { if a.Block >= 0 {
cmd.setReadTimeout(a.Block) cmd.setReadTimeout(a.Block)
} }
cmd.addKeyPos(offset) cmd.setFirstKeyPos(keyPos)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -1883,7 +1883,7 @@ func (c cmdable) ZInterStore(ctx context.Context, destination string, store *ZSt
args = append(args, "aggregate", store.Aggregate) args = append(args, "aggregate", store.Aggregate)
} }
cmd := NewIntCmd(ctx, args...) cmd := NewIntCmd(ctx, args...)
cmd.addKeyPos(3) cmd.setFirstKeyPos(3)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }
@ -2118,8 +2118,9 @@ func (c cmdable) ZUnionStore(ctx context.Context, dest string, store *ZStore) *I
if store.Aggregate != "" { if store.Aggregate != "" {
args = append(args, "aggregate", store.Aggregate) args = append(args, "aggregate", store.Aggregate)
} }
cmd := NewIntCmd(ctx, args...) cmd := NewIntCmd(ctx, args...)
cmd.addKeyPos(3) cmd.setFirstKeyPos(3)
_ = c(ctx, cmd) _ = c(ctx, cmd)
return cmd return cmd
} }