Simplify sync in pipeline.

This commit is contained in:
Vladimir Mihailenco 2016-10-13 12:11:58 +03:00
parent 0c5e085895
commit 8d2fb6e09b
1 changed files with 18 additions and 17 deletions

View File

@ -2,7 +2,6 @@ package redis
import ( import (
"sync" "sync"
"sync/atomic"
"gopkg.in/redis.v5/internal" "gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/pool" "gopkg.in/redis.v5/internal/pool"
@ -17,10 +16,9 @@ type Pipeline struct {
exec func([]Cmder) error exec func([]Cmder) error
mu sync.Mutex // protects cmds mu sync.Mutex
cmds []Cmder cmds []Cmder
closed bool
closed int32
} }
func (c *Pipeline) Process(cmd Cmder) error { func (c *Pipeline) Process(cmd Cmder) error {
@ -32,20 +30,23 @@ func (c *Pipeline) Process(cmd Cmder) error {
// Close closes the pipeline, releasing any open resources. // Close closes the pipeline, releasing any open resources.
func (c *Pipeline) Close() error { func (c *Pipeline) Close() error {
atomic.StoreInt32(&c.closed, 1) c.mu.Lock()
c.Discard() c.discard()
c.closed = true
c.mu.Unlock()
return nil return nil
} }
func (c *Pipeline) isClosed() bool {
return atomic.LoadInt32(&c.closed) == 1
}
// Discard resets the pipeline and discards queued commands. // Discard resets the pipeline and discards queued commands.
func (c *Pipeline) Discard() error { func (c *Pipeline) Discard() error {
defer c.mu.Unlock()
c.mu.Lock() c.mu.Lock()
if c.isClosed() { err := c.discard()
c.mu.Unlock()
return err
}
func (c *Pipeline) discard() error {
if c.closed {
return pool.ErrClosed return pool.ErrClosed
} }
c.cmds = c.cmds[:0] c.cmds = c.cmds[:0]
@ -58,13 +59,13 @@ func (c *Pipeline) Discard() error {
// Exec always returns list of commands and error of the first failed // Exec always returns list of commands and error of the first failed
// command if any. // command if any.
func (c *Pipeline) Exec() ([]Cmder, error) { func (c *Pipeline) Exec() ([]Cmder, error) {
if c.isClosed() {
return nil, pool.ErrClosed
}
defer c.mu.Unlock() defer c.mu.Unlock()
c.mu.Lock() c.mu.Lock()
if c.closed {
return nil, pool.ErrClosed
}
if len(c.cmds) == 0 { if len(c.cmds) == 0 {
return c.cmds, nil return c.cmds, nil
} }