From 6525bbbaa157eaea40e363c462057a3ad29536a9 Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Sat, 21 Jan 2023 10:41:51 +0200 Subject: [PATCH] fix: remove mutex from pipeline --- CHANGELOG.md | 27 +++++---------------------- pipeline.go | 15 +-------------- race_test.go | 47 ----------------------------------------------- 3 files changed, 6 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b117894..525c5abf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,28 +1,9 @@ -# [9.0.0-rc.2](https://github.com/go-redis/redis/compare/v9.0.0-rc.1...v9.0.0-rc.2) (2022-11-26) - - -### Bug Fixes - -* capture error correctly in withConn ([d1bfaba](https://github.com/go-redis/redis/commit/d1bfaba549fe380d269c26cea0a0183ed1520a85)) -* fixes ring.SetAddrs and rebalance race ([#2283](https://github.com/go-redis/redis/issues/2283)) ([d83436b](https://github.com/go-redis/redis/commit/d83436b321cd9ed52ba33c3edbe8f63bb0444c59)) -* read in route_randomly query param correctly ([f236053](https://github.com/go-redis/redis/commit/f236053735d10aec5e6e31fc3ced1b2e53292554)) -* reduce `SetAddrs` shards lock contention ([6c05a9f](https://github.com/go-redis/redis/commit/6c05a9f6b17f8e32593d3f7d594f82ba3dbcafb1)), closes [/github.com/go-redis/redis/pull/2190#discussion_r953040289](https://github.com//github.com/go-redis/redis/pull/2190/issues/discussion_r953040289) [#2077](https://github.com/go-redis/redis/issues/2077) -* wrap cmds in Conn.TxPipeline ([5053db2](https://github.com/go-redis/redis/commit/5053db2f9c8b3ca25f497a75f70012c7ad6cd775)) - - -### Features - -* add HasErrorPrefix ([d3d8002](https://github.com/go-redis/redis/commit/d3d8002e894a1eab5bab2c9fff13439527e330d8)) -* add support for SINTERCARD command ([bc51c61](https://github.com/go-redis/redis/commit/bc51c61a458d1bc4fb4424c7c3e912325ef980cc)) - - - ## v9 UNRELEASED ### Added -- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol. - Contributed by @monkey92t who has done a lot of work recently. +- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol. It was + contributed by @monkey92t who has done the majority of work in this release. - Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts and deadlines. See [Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details. @@ -30,6 +11,7 @@ `redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`. - Added metrics instrumentation using `redisotel.IstrumentMetrics`. See [documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html) +- Added `redis.HasErrorPrefix` to help working with errors. ### Changed @@ -48,8 +30,9 @@ - Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to reset commands for some reason. +- Changed Pipelines to not be thread-safe any more. ### Fixed - Improved and fixed pipeline retries. -- As usual, added more commands and fixed some bugs. +- As usually, added support for more commands and fixed some bugs. diff --git a/pipeline.go b/pipeline.go index 52bd7213..f6522486 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,7 +2,6 @@ package redis import ( "context" - "sync" ) type pipelineExecer func(context.Context, []Cmder) error @@ -39,8 +38,6 @@ type Pipeline struct { statefulCmdable exec pipelineExecer - - mu sync.Mutex cmds []Cmder } @@ -51,10 +48,7 @@ func (c *Pipeline) init() { // Len returns the number of queued commands. func (c *Pipeline) Len() int { - c.mu.Lock() - ln := len(c.cmds) - c.mu.Unlock() - return ln + return len(c.cmds) } // Do queues the custom command for later execution. @@ -66,17 +60,13 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd { // Process queues the cmd for later execution. func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error { - c.mu.Lock() c.cmds = append(c.cmds, cmd) - c.mu.Unlock() return nil } // Discard resets the pipeline and discards queued commands. func (c *Pipeline) Discard() { - c.mu.Lock() c.cmds = c.cmds[:0] - c.mu.Unlock() } // Exec executes all previously queued commands using one @@ -85,9 +75,6 @@ func (c *Pipeline) Discard() { // Exec always returns list of commands and error of the first failed // command if any. func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) { - c.mu.Lock() - defer c.mu.Unlock() - if len(c.cmds) == 0 { return nil, nil } diff --git a/race_test.go b/race_test.go index 22653660..c657dbe5 100644 --- a/race_test.go +++ b/race_test.go @@ -214,53 +214,6 @@ var _ = Describe("races", func() { Expect(val).To(Equal(int64(C * N))) }) - It("should Pipeline", func() { - perform(C, func(id int) { - pipe := client.Pipeline() - for i := 0; i < N; i++ { - pipe.Echo(ctx, fmt.Sprint(i)) - } - - cmds, err := pipe.Exec(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(N)) - - for i := 0; i < N; i++ { - Expect(cmds[i].(*redis.StringCmd).Val()).To(Equal(fmt.Sprint(i))) - } - }) - }) - - It("should Pipeline", func() { - pipe := client.Pipeline() - perform(N, func(id int) { - pipe.Incr(ctx, "key") - }) - - cmds, err := pipe.Exec(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(N)) - - n, err := client.Get(ctx, "key").Int64() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(N))) - }) - - It("should TxPipeline", func() { - pipe := client.TxPipeline() - perform(N, func(id int) { - pipe.Incr(ctx, "key") - }) - - cmds, err := pipe.Exec(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(cmds).To(HaveLen(N)) - - n, err := client.Get(ctx, "key").Int64() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(N))) - }) - PIt("should BLPop", func() { var received uint32