fix: remove mutex from pipeline

This commit is contained in:
Vladimir Mihailenco 2023-01-21 10:41:51 +02:00
parent e314cd9846
commit 6525bbbaa1
3 changed files with 6 additions and 83 deletions

View File

@ -1,28 +1,9 @@
# [9.0.0-rc.2](https://github.com/go-redis/redis/compare/v9.0.0-rc.1...v9.0.0-rc.2) (2022-11-26)
### Bug Fixes
* capture error correctly in withConn ([d1bfaba](https://github.com/go-redis/redis/commit/d1bfaba549fe380d269c26cea0a0183ed1520a85))
* fixes ring.SetAddrs and rebalance race ([#2283](https://github.com/go-redis/redis/issues/2283)) ([d83436b](https://github.com/go-redis/redis/commit/d83436b321cd9ed52ba33c3edbe8f63bb0444c59))
* read in route_randomly query param correctly ([f236053](https://github.com/go-redis/redis/commit/f236053735d10aec5e6e31fc3ced1b2e53292554))
* reduce `SetAddrs` shards lock contention ([6c05a9f](https://github.com/go-redis/redis/commit/6c05a9f6b17f8e32593d3f7d594f82ba3dbcafb1)), closes [/github.com/go-redis/redis/pull/2190#discussion_r953040289](https://github.com//github.com/go-redis/redis/pull/2190/issues/discussion_r953040289) [#2077](https://github.com/go-redis/redis/issues/2077)
* wrap cmds in Conn.TxPipeline ([5053db2](https://github.com/go-redis/redis/commit/5053db2f9c8b3ca25f497a75f70012c7ad6cd775))
### Features
* add HasErrorPrefix ([d3d8002](https://github.com/go-redis/redis/commit/d3d8002e894a1eab5bab2c9fff13439527e330d8))
* add support for SINTERCARD command ([bc51c61](https://github.com/go-redis/redis/commit/bc51c61a458d1bc4fb4424c7c3e912325ef980cc))
## v9 UNRELEASED ## v9 UNRELEASED
### Added ### Added
- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol. - Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol. It was
Contributed by @monkey92t who has done a lot of work recently. contributed by @monkey92t who has done the majority of work in this release.
- Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts - Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts
and deadlines. See and deadlines. See
[Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details. [Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details.
@ -30,6 +11,7 @@
`redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`. `redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`.
- Added metrics instrumentation using `redisotel.IstrumentMetrics`. See - Added metrics instrumentation using `redisotel.IstrumentMetrics`. See
[documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html) [documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html)
- Added `redis.HasErrorPrefix` to help working with errors.
### Changed ### Changed
@ -48,8 +30,9 @@
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and - Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and
it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to
reset commands for some reason. reset commands for some reason.
- Changed Pipelines to not be thread-safe any more.
### Fixed ### Fixed
- Improved and fixed pipeline retries. - Improved and fixed pipeline retries.
- As usual, added more commands and fixed some bugs. - As usually, added support for more commands and fixed some bugs.

View File

@ -2,7 +2,6 @@ package redis
import ( import (
"context" "context"
"sync"
) )
type pipelineExecer func(context.Context, []Cmder) error type pipelineExecer func(context.Context, []Cmder) error
@ -39,8 +38,6 @@ type Pipeline struct {
statefulCmdable statefulCmdable
exec pipelineExecer exec pipelineExecer
mu sync.Mutex
cmds []Cmder cmds []Cmder
} }
@ -51,10 +48,7 @@ func (c *Pipeline) init() {
// Len returns the number of queued commands. // Len returns the number of queued commands.
func (c *Pipeline) Len() int { func (c *Pipeline) Len() int {
c.mu.Lock() return len(c.cmds)
ln := len(c.cmds)
c.mu.Unlock()
return ln
} }
// Do queues the custom command for later execution. // Do queues the custom command for later execution.
@ -66,17 +60,13 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
// Process queues the cmd for later execution. // Process queues the cmd for later execution.
func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error { func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
c.mu.Lock()
c.cmds = append(c.cmds, cmd) c.cmds = append(c.cmds, cmd)
c.mu.Unlock()
return nil return nil
} }
// Discard resets the pipeline and discards queued commands. // Discard resets the pipeline and discards queued commands.
func (c *Pipeline) Discard() { func (c *Pipeline) Discard() {
c.mu.Lock()
c.cmds = c.cmds[:0] c.cmds = c.cmds[:0]
c.mu.Unlock()
} }
// Exec executes all previously queued commands using one // Exec executes all previously queued commands using one
@ -85,9 +75,6 @@ func (c *Pipeline) Discard() {
// Exec always returns list of commands and error of the first failed // Exec always returns list of commands and error of the first failed
// command if any. // command if any.
func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) { func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.cmds) == 0 { if len(c.cmds) == 0 {
return nil, nil return nil, nil
} }

View File

@ -214,53 +214,6 @@ var _ = Describe("races", func() {
Expect(val).To(Equal(int64(C * N))) Expect(val).To(Equal(int64(C * N)))
}) })
It("should Pipeline", func() {
perform(C, func(id int) {
pipe := client.Pipeline()
for i := 0; i < N; i++ {
pipe.Echo(ctx, fmt.Sprint(i))
}
cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(N))
for i := 0; i < N; i++ {
Expect(cmds[i].(*redis.StringCmd).Val()).To(Equal(fmt.Sprint(i)))
}
})
})
It("should Pipeline", func() {
pipe := client.Pipeline()
perform(N, func(id int) {
pipe.Incr(ctx, "key")
})
cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(N))
n, err := client.Get(ctx, "key").Int64()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(N)))
})
It("should TxPipeline", func() {
pipe := client.TxPipeline()
perform(N, func(id int) {
pipe.Incr(ctx, "key")
})
cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(N))
n, err := client.Get(ctx, "key").Int64()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(N)))
})
PIt("should BLPop", func() { PIt("should BLPop", func() {
var received uint32 var received uint32