forked from mirror/redis
Merge pull request #1496 from pendo-io/fix-tx-cmd-corruption
Fixes #1386; pipeline.Exec() sometimes returns spurious `StatusCmd`
This commit is contained in:
commit
fcb50dc64b
|
@ -1,6 +1,8 @@
|
||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
|
@ -67,6 +69,21 @@ var _ = Describe("pipelining", func() {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Expect(cmds).To(HaveLen(1))
|
Expect(cmds).To(HaveLen(1))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("handles large pipelines", func() {
|
||||||
|
for callCount := 1; callCount < 16; callCount++ {
|
||||||
|
for i := 1; i <= callCount; i++ {
|
||||||
|
pipe.SetNX(ctx, strconv.Itoa(i)+"_key", strconv.Itoa(i)+"_value", 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmds, err := pipe.Exec(ctx)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(cmds).To(HaveLen(callCount))
|
||||||
|
for _, cmd := range cmds {
|
||||||
|
Expect(cmd).To(BeAssignableToTypeOf(&redis.BoolCmd{}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
Describe("Pipeline", func() {
|
Describe("Pipeline", func() {
|
||||||
|
|
10
redis.go
10
redis.go
|
@ -492,11 +492,11 @@ func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
|
||||||
if len(cmds) == 0 {
|
if len(cmds) == 0 {
|
||||||
panic("not reached")
|
panic("not reached")
|
||||||
}
|
}
|
||||||
cmds = append(cmds, make([]Cmder, 2)...)
|
cmdCopy := make([]Cmder, len(cmds)+2)
|
||||||
copy(cmds[1:], cmds[:len(cmds)-2])
|
cmdCopy[0] = NewStatusCmd(ctx, "multi")
|
||||||
cmds[0] = NewStatusCmd(ctx, "multi")
|
copy(cmdCopy[1:], cmds)
|
||||||
cmds[len(cmds)-1] = NewSliceCmd(ctx, "exec")
|
cmdCopy[len(cmdCopy)-1] = NewSliceCmd(ctx, "exec")
|
||||||
return cmds
|
return cmdCopy
|
||||||
}
|
}
|
||||||
|
|
||||||
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
|
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
|
||||||
|
|
Loading…
Reference in New Issue