mirror of https://github.com/go-redis/redis.git
Merge pull request #2359 from go-redis/fix/pipeline-mutext
fix: remove mutex from pipeline
This commit is contained in:
commit
0b709c7773
27
CHANGELOG.md
27
CHANGELOG.md
|
@ -1,28 +1,9 @@
|
||||||
# [9.0.0-rc.2](https://github.com/go-redis/redis/compare/v9.0.0-rc.1...v9.0.0-rc.2) (2022-11-26)
|
|
||||||
|
|
||||||
|
|
||||||
### Bug Fixes
|
|
||||||
|
|
||||||
* capture error correctly in withConn ([d1bfaba](https://github.com/go-redis/redis/commit/d1bfaba549fe380d269c26cea0a0183ed1520a85))
|
|
||||||
* fixes ring.SetAddrs and rebalance race ([#2283](https://github.com/go-redis/redis/issues/2283)) ([d83436b](https://github.com/go-redis/redis/commit/d83436b321cd9ed52ba33c3edbe8f63bb0444c59))
|
|
||||||
* read in route_randomly query param correctly ([f236053](https://github.com/go-redis/redis/commit/f236053735d10aec5e6e31fc3ced1b2e53292554))
|
|
||||||
* reduce `SetAddrs` shards lock contention ([6c05a9f](https://github.com/go-redis/redis/commit/6c05a9f6b17f8e32593d3f7d594f82ba3dbcafb1)), closes [/github.com/go-redis/redis/pull/2190#discussion_r953040289](https://github.com//github.com/go-redis/redis/pull/2190/issues/discussion_r953040289) [#2077](https://github.com/go-redis/redis/issues/2077)
|
|
||||||
* wrap cmds in Conn.TxPipeline ([5053db2](https://github.com/go-redis/redis/commit/5053db2f9c8b3ca25f497a75f70012c7ad6cd775))
|
|
||||||
|
|
||||||
|
|
||||||
### Features
|
|
||||||
|
|
||||||
* add HasErrorPrefix ([d3d8002](https://github.com/go-redis/redis/commit/d3d8002e894a1eab5bab2c9fff13439527e330d8))
|
|
||||||
* add support for SINTERCARD command ([bc51c61](https://github.com/go-redis/redis/commit/bc51c61a458d1bc4fb4424c7c3e912325ef980cc))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## v9 UNRELEASED
|
## v9 UNRELEASED
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol.
|
- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol. It was
|
||||||
Contributed by @monkey92t who has done a lot of work recently.
|
contributed by @monkey92t who has done the majority of work in this release.
|
||||||
- Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts
|
- Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts
|
||||||
and deadlines. See
|
and deadlines. See
|
||||||
[Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details.
|
[Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details.
|
||||||
|
@ -30,6 +11,7 @@
|
||||||
`redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`.
|
`redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`.
|
||||||
- Added metrics instrumentation using `redisotel.IstrumentMetrics`. See
|
- Added metrics instrumentation using `redisotel.IstrumentMetrics`. See
|
||||||
[documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html)
|
[documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html)
|
||||||
|
- Added `redis.HasErrorPrefix` to help working with errors.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
|
@ -48,8 +30,9 @@
|
||||||
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and
|
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and
|
||||||
it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to
|
it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to
|
||||||
reset commands for some reason.
|
reset commands for some reason.
|
||||||
|
- Changed Pipelines to not be thread-safe any more.
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- Improved and fixed pipeline retries.
|
- Improved and fixed pipeline retries.
|
||||||
- As usual, added more commands and fixed some bugs.
|
- As usually, added support for more commands and fixed some bugs.
|
||||||
|
|
19
pipeline.go
19
pipeline.go
|
@ -2,7 +2,6 @@ package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type pipelineExecer func(context.Context, []Cmder) error
|
type pipelineExecer func(context.Context, []Cmder) error
|
||||||
|
@ -32,15 +31,13 @@ type Pipeliner interface {
|
||||||
var _ Pipeliner = (*Pipeline)(nil)
|
var _ Pipeliner = (*Pipeline)(nil)
|
||||||
|
|
||||||
// Pipeline implements pipelining as described in
|
// Pipeline implements pipelining as described in
|
||||||
// http://redis.io/topics/pipelining. It's safe for concurrent use
|
// http://redis.io/topics/pipelining.
|
||||||
// by multiple goroutines.
|
// Please note: it is not safe for concurrent use by multiple goroutines.
|
||||||
type Pipeline struct {
|
type Pipeline struct {
|
||||||
cmdable
|
cmdable
|
||||||
statefulCmdable
|
statefulCmdable
|
||||||
|
|
||||||
exec pipelineExecer
|
exec pipelineExecer
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
cmds []Cmder
|
cmds []Cmder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,10 +48,7 @@ func (c *Pipeline) init() {
|
||||||
|
|
||||||
// Len returns the number of queued commands.
|
// Len returns the number of queued commands.
|
||||||
func (c *Pipeline) Len() int {
|
func (c *Pipeline) Len() int {
|
||||||
c.mu.Lock()
|
return len(c.cmds)
|
||||||
ln := len(c.cmds)
|
|
||||||
c.mu.Unlock()
|
|
||||||
return ln
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do queues the custom command for later execution.
|
// Do queues the custom command for later execution.
|
||||||
|
@ -66,17 +60,13 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
|
||||||
|
|
||||||
// Process queues the cmd for later execution.
|
// Process queues the cmd for later execution.
|
||||||
func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
|
func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
|
||||||
c.mu.Lock()
|
|
||||||
c.cmds = append(c.cmds, cmd)
|
c.cmds = append(c.cmds, cmd)
|
||||||
c.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Discard resets the pipeline and discards queued commands.
|
// Discard resets the pipeline and discards queued commands.
|
||||||
func (c *Pipeline) Discard() {
|
func (c *Pipeline) Discard() {
|
||||||
c.mu.Lock()
|
|
||||||
c.cmds = c.cmds[:0]
|
c.cmds = c.cmds[:0]
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec executes all previously queued commands using one
|
// Exec executes all previously queued commands using one
|
||||||
|
@ -85,9 +75,6 @@ func (c *Pipeline) Discard() {
|
||||||
// Exec always returns list of commands and error of the first failed
|
// Exec always returns list of commands and error of the first failed
|
||||||
// command if any.
|
// command if any.
|
||||||
func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
|
func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
if len(c.cmds) == 0 {
|
if len(c.cmds) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
47
race_test.go
47
race_test.go
|
@ -214,53 +214,6 @@ var _ = Describe("races", func() {
|
||||||
Expect(val).To(Equal(int64(C * N)))
|
Expect(val).To(Equal(int64(C * N)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should Pipeline", func() {
|
|
||||||
perform(C, func(id int) {
|
|
||||||
pipe := client.Pipeline()
|
|
||||||
for i := 0; i < N; i++ {
|
|
||||||
pipe.Echo(ctx, fmt.Sprint(i))
|
|
||||||
}
|
|
||||||
|
|
||||||
cmds, err := pipe.Exec(ctx)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(cmds).To(HaveLen(N))
|
|
||||||
|
|
||||||
for i := 0; i < N; i++ {
|
|
||||||
Expect(cmds[i].(*redis.StringCmd).Val()).To(Equal(fmt.Sprint(i)))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should Pipeline", func() {
|
|
||||||
pipe := client.Pipeline()
|
|
||||||
perform(N, func(id int) {
|
|
||||||
pipe.Incr(ctx, "key")
|
|
||||||
})
|
|
||||||
|
|
||||||
cmds, err := pipe.Exec(ctx)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(cmds).To(HaveLen(N))
|
|
||||||
|
|
||||||
n, err := client.Get(ctx, "key").Int64()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(N)))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("should TxPipeline", func() {
|
|
||||||
pipe := client.TxPipeline()
|
|
||||||
perform(N, func(id int) {
|
|
||||||
pipe.Incr(ctx, "key")
|
|
||||||
})
|
|
||||||
|
|
||||||
cmds, err := pipe.Exec(ctx)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(cmds).To(HaveLen(N))
|
|
||||||
|
|
||||||
n, err := client.Get(ctx, "key").Int64()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(n).To(Equal(int64(N)))
|
|
||||||
})
|
|
||||||
|
|
||||||
PIt("should BLPop", func() {
|
PIt("should BLPop", func() {
|
||||||
var received uint32
|
var received uint32
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue