forked from mirror/redis
Merge pull request #1688 from monkey92t/data_race
fix ring test `Process hook` data race
This commit is contained in:
commit
7633aded81
225
ring_test.go
225
ring_test.go
|
@ -188,153 +188,162 @@ var _ = Describe("Redis Ring", func() {
|
|||
})
|
||||
})
|
||||
|
||||
It("supports Process hook", func() {
|
||||
err := ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var stack []string
|
||||
|
||||
ring.AddHook(&hook{
|
||||
beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
|
||||
Expect(cmd.String()).To(Equal("ping: "))
|
||||
stack = append(stack, "ring.BeforeProcess")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcess: func(ctx context.Context, cmd redis.Cmder) error {
|
||||
Expect(cmd.String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "ring.AfterProcess")
|
||||
return nil
|
||||
},
|
||||
Describe("Process hook", func() {
|
||||
BeforeEach(func() {
|
||||
//the health check leads to data race for variable "stack []string".
|
||||
//here, the health check time is set to 72 hours to avoid health check
|
||||
opt := redisRingOptions()
|
||||
opt.HeartbeatFrequency = 72 * time.Hour
|
||||
ring = redis.NewRing(opt)
|
||||
})
|
||||
It("supports Process hook", func() {
|
||||
err := ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
||||
shard.AddHook(&hook{
|
||||
var stack []string
|
||||
|
||||
ring.AddHook(&hook{
|
||||
beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
|
||||
Expect(cmd.String()).To(Equal("ping: "))
|
||||
stack = append(stack, "shard.BeforeProcess")
|
||||
stack = append(stack, "ring.BeforeProcess")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcess: func(ctx context.Context, cmd redis.Cmder) error {
|
||||
Expect(cmd.String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "shard.AfterProcess")
|
||||
stack = append(stack, "ring.AfterProcess")
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
err = ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(stack).To(Equal([]string{
|
||||
"ring.BeforeProcess",
|
||||
"shard.BeforeProcess",
|
||||
"shard.AfterProcess",
|
||||
"ring.AfterProcess",
|
||||
}))
|
||||
})
|
||||
|
||||
It("supports Pipeline hook", func() {
|
||||
err := ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var stack []string
|
||||
|
||||
ring.AddHook(&hook{
|
||||
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "ring.BeforeProcessPipeline")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "ring.AfterProcessPipeline")
|
||||
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
||||
shard.AddHook(&hook{
|
||||
beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
|
||||
Expect(cmd.String()).To(Equal("ping: "))
|
||||
stack = append(stack, "shard.BeforeProcess")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcess: func(ctx context.Context, cmd redis.Cmder) error {
|
||||
Expect(cmd.String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "shard.AfterProcess")
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
err = ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(stack).To(Equal([]string{
|
||||
"ring.BeforeProcess",
|
||||
"shard.BeforeProcess",
|
||||
"shard.AfterProcess",
|
||||
"ring.AfterProcess",
|
||||
}))
|
||||
})
|
||||
|
||||
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
||||
shard.AddHook(&hook{
|
||||
It("supports Pipeline hook", func() {
|
||||
err := ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var stack []string
|
||||
|
||||
ring.AddHook(&hook{
|
||||
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "shard.BeforeProcessPipeline")
|
||||
stack = append(stack, "ring.BeforeProcessPipeline")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "shard.AfterProcessPipeline")
|
||||
stack = append(stack, "ring.AfterProcessPipeline")
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
_, err = ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
pipe.Ping(ctx)
|
||||
return nil
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(stack).To(Equal([]string{
|
||||
"ring.BeforeProcessPipeline",
|
||||
"shard.BeforeProcessPipeline",
|
||||
"shard.AfterProcessPipeline",
|
||||
"ring.AfterProcessPipeline",
|
||||
}))
|
||||
})
|
||||
|
||||
It("supports TxPipeline hook", func() {
|
||||
err := ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var stack []string
|
||||
|
||||
ring.AddHook(&hook{
|
||||
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "ring.BeforeProcessPipeline")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "ring.AfterProcessPipeline")
|
||||
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
||||
shard.AddHook(&hook{
|
||||
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "shard.BeforeProcessPipeline")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "shard.AfterProcessPipeline")
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
_, err = ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
pipe.Ping(ctx)
|
||||
return nil
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(stack).To(Equal([]string{
|
||||
"ring.BeforeProcessPipeline",
|
||||
"shard.BeforeProcessPipeline",
|
||||
"shard.AfterProcessPipeline",
|
||||
"ring.AfterProcessPipeline",
|
||||
}))
|
||||
})
|
||||
|
||||
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
||||
shard.AddHook(&hook{
|
||||
It("supports TxPipeline hook", func() {
|
||||
err := ring.Ping(ctx).Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
var stack []string
|
||||
|
||||
ring.AddHook(&hook{
|
||||
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
|
||||
Expect(cmds).To(HaveLen(3))
|
||||
Expect(cmds[1].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "shard.BeforeProcessPipeline")
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "ring.BeforeProcessPipeline")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
Expect(cmds).To(HaveLen(3))
|
||||
Expect(cmds[1].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "shard.AfterProcessPipeline")
|
||||
Expect(cmds).To(HaveLen(1))
|
||||
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "ring.AfterProcessPipeline")
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
_, err = ring.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
pipe.Ping(ctx)
|
||||
return nil
|
||||
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
||||
shard.AddHook(&hook{
|
||||
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
|
||||
Expect(cmds).To(HaveLen(3))
|
||||
Expect(cmds[1].String()).To(Equal("ping: "))
|
||||
stack = append(stack, "shard.BeforeProcessPipeline")
|
||||
return ctx, nil
|
||||
},
|
||||
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
Expect(cmds).To(HaveLen(3))
|
||||
Expect(cmds[1].String()).To(Equal("ping: PONG"))
|
||||
stack = append(stack, "shard.AfterProcessPipeline")
|
||||
return nil
|
||||
},
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
_, err = ring.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
pipe.Ping(ctx)
|
||||
return nil
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(stack).To(Equal([]string{
|
||||
"ring.BeforeProcessPipeline",
|
||||
"shard.BeforeProcessPipeline",
|
||||
"shard.AfterProcessPipeline",
|
||||
"ring.AfterProcessPipeline",
|
||||
}))
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(stack).To(Equal([]string{
|
||||
"ring.BeforeProcessPipeline",
|
||||
"shard.BeforeProcessPipeline",
|
||||
"shard.AfterProcessPipeline",
|
||||
"ring.AfterProcessPipeline",
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue