diff --git a/pipeline.go b/pipeline.go index 1cbc00f3..f3be4d5d 100644 --- a/pipeline.go +++ b/pipeline.go @@ -2,7 +2,6 @@ package redis import ( "sync" - "sync/atomic" "gopkg.in/redis.v5/internal" "gopkg.in/redis.v5/internal/pool" @@ -17,10 +16,9 @@ type Pipeline struct { exec func([]Cmder) error - mu sync.Mutex // protects cmds - cmds []Cmder - - closed int32 + mu sync.Mutex + cmds []Cmder + closed bool } func (c *Pipeline) Process(cmd Cmder) error { @@ -32,20 +30,23 @@ func (c *Pipeline) Process(cmd Cmder) error { // Close closes the pipeline, releasing any open resources. func (c *Pipeline) Close() error { - atomic.StoreInt32(&c.closed, 1) - c.Discard() + c.mu.Lock() + c.discard() + c.closed = true + c.mu.Unlock() return nil } -func (c *Pipeline) isClosed() bool { - return atomic.LoadInt32(&c.closed) == 1 -} - // Discard resets the pipeline and discards queued commands. func (c *Pipeline) Discard() error { - defer c.mu.Unlock() c.mu.Lock() - if c.isClosed() { + err := c.discard() + c.mu.Unlock() + return err +} + +func (c *Pipeline) discard() error { + if c.closed { return pool.ErrClosed } c.cmds = c.cmds[:0] @@ -58,13 +59,13 @@ func (c *Pipeline) Discard() error { // Exec always returns list of commands and error of the first failed // command if any. func (c *Pipeline) Exec() ([]Cmder, error) { - if c.isClosed() { - return nil, pool.ErrClosed - } - defer c.mu.Unlock() c.mu.Lock() + if c.closed { + return nil, pool.ErrClosed + } + if len(c.cmds) == 0 { return c.cmds, nil }