add miss Pipeline and TxPipeline

Signed-off-by: monkey <golang@88.com>
This commit is contained in:
monkey 2021-03-07 17:34:40 +08:00
parent 783725f3f7
commit 4a7d03359b
1 changed files with 117 additions and 114 deletions

View File

@ -188,159 +188,162 @@ var _ = Describe("Redis Ring", func() {
}) })
}) })
It("supports Process hook", func() { Describe("Process hook", func() {
//the health check leads to data race for variable "stack []string". BeforeEach(func() {
//here, the health check time is set to 72 hours to avoid health check //the health check leads to data race for variable "stack []string".
opt := redisRingOptions() //here, the health check time is set to 72 hours to avoid health check
opt.HeartbeatFrequency = 72 * time.Hour opt := redisRingOptions()
ring = redis.NewRing(opt) opt.HeartbeatFrequency = 72 * time.Hour
ring = redis.NewRing(opt)
err := ring.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
var stack []string
ring.AddHook(&hook{
beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
Expect(cmd.String()).To(Equal("ping: "))
stack = append(stack, "ring.BeforeProcess")
return ctx, nil
},
afterProcess: func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "ring.AfterProcess")
return nil
},
}) })
It("supports Process hook", func() {
err := ring.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error { var stack []string
shard.AddHook(&hook{
ring.AddHook(&hook{
beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) { beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
Expect(cmd.String()).To(Equal("ping: ")) Expect(cmd.String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcess") stack = append(stack, "ring.BeforeProcess")
return ctx, nil return ctx, nil
}, },
afterProcess: func(ctx context.Context, cmd redis.Cmder) error { afterProcess: func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: PONG")) Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcess") stack = append(stack, "ring.AfterProcess")
return nil return nil
}, },
}) })
return nil
})
err = ring.Ping(ctx).Err() ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
Expect(err).NotTo(HaveOccurred()) shard.AddHook(&hook{
Expect(stack).To(Equal([]string{ beforeProcess: func(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
"ring.BeforeProcess", Expect(cmd.String()).To(Equal("ping: "))
"shard.BeforeProcess", stack = append(stack, "shard.BeforeProcess")
"shard.AfterProcess", return ctx, nil
"ring.AfterProcess", },
})) afterProcess: func(ctx context.Context, cmd redis.Cmder) error {
}) Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcess")
It("supports Pipeline hook", func() { return nil
err := ring.Ping(ctx).Err() },
Expect(err).NotTo(HaveOccurred()) })
var stack []string
ring.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "ring.BeforeProcessPipeline")
return ctx, nil
},
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "ring.AfterProcessPipeline")
return nil return nil
}, })
err = ring.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcess",
"shard.BeforeProcess",
"shard.AfterProcess",
"ring.AfterProcess",
}))
}) })
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error { It("supports Pipeline hook", func() {
shard.AddHook(&hook{ err := ring.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
var stack []string
ring.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: ")) Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcessPipeline") stack = append(stack, "ring.BeforeProcessPipeline")
return ctx, nil return ctx, nil
}, },
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error { afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG")) Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline") stack = append(stack, "ring.AfterProcessPipeline")
return nil return nil
}, },
}) })
return nil
})
_, err = ring.Pipelined(ctx, func(pipe redis.Pipeliner) error { ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
pipe.Ping(ctx) shard.AddHook(&hook{
return nil beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
}) Expect(cmds).To(HaveLen(1))
Expect(err).NotTo(HaveOccurred()) Expect(cmds[0].String()).To(Equal("ping: "))
Expect(stack).To(Equal([]string{ stack = append(stack, "shard.BeforeProcessPipeline")
"ring.BeforeProcessPipeline", return ctx, nil
"shard.BeforeProcessPipeline", },
"shard.AfterProcessPipeline", afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
"ring.AfterProcessPipeline", Expect(cmds).To(HaveLen(1))
})) Expect(cmds[0].String()).To(Equal("ping: PONG"))
}) stack = append(stack, "shard.AfterProcessPipeline")
return nil
It("supports TxPipeline hook", func() { },
err := ring.Ping(ctx).Err() })
Expect(err).NotTo(HaveOccurred())
var stack []string
ring.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "ring.BeforeProcessPipeline")
return ctx, nil
},
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "ring.AfterProcessPipeline")
return nil return nil
}, })
_, err = ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"ring.AfterProcessPipeline",
}))
}) })
ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error { It("supports TxPipeline hook", func() {
shard.AddHook(&hook{ err := ring.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
var stack []string
ring.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(3)) Expect(cmds).To(HaveLen(1))
Expect(cmds[1].String()).To(Equal("ping: ")) Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcessPipeline") stack = append(stack, "ring.BeforeProcessPipeline")
return ctx, nil return ctx, nil
}, },
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error { afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3)) Expect(cmds).To(HaveLen(1))
Expect(cmds[1].String()).To(Equal("ping: PONG")) Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline") stack = append(stack, "ring.AfterProcessPipeline")
return nil return nil
}, },
}) })
return nil
})
_, err = ring.TxPipelined(ctx, func(pipe redis.Pipeliner) error { ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
pipe.Ping(ctx) shard.AddHook(&hook{
return nil beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcessPipeline")
return ctx, nil
},
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline")
return nil
},
})
return nil
})
_, err = ring.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"ring.AfterProcessPipeline",
}))
}) })
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"ring.AfterProcessPipeline",
}))
}) })
}) })