From f8d71bb27645e72dc1ebc07ac169eb2c50258c95 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 26 Jan 2019 15:29:05 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=AF=20Refactor=20variable=20"release"?= =?UTF-8?q?=20with=20atomic=20operation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool.go | 17 ++++++----------- pool_func.go | 17 ++++++----------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/pool.go b/pool.go index e8fd74e..a5b9a15 100644 --- a/pool.go +++ b/pool.go @@ -46,7 +46,7 @@ type Pool struct { workers []*Worker // release is used to notice the pool to closed itself. - release bool + release int32 // lock for synchronous operation. lock sync.Mutex @@ -70,7 +70,7 @@ func (p *Pool) periodicallyPurge() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - if len(idleWorkers) == 0 && p.Running() == 0 && p.release { + if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { p.lock.Unlock() return } @@ -120,12 +120,11 @@ func NewTimingPool(size, expiry int) (*Pool, error) { // Submit submits a task to this pool. func (p *Pool) Submit(task f) error { - if worker := p.getWorker(); worker != nil { - worker.task <- task - return nil - } else { + if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } + p.getWorker().task <- task + return nil } // Running returns the number of the currently running goroutines. @@ -158,8 +157,8 @@ func (p *Pool) ReSize(size int) { // Release Closes this pool. func (p *Pool) Release() error { p.once.Do(func() { + atomic.StoreInt32(&p.release, 1) p.lock.Lock() - p.release = true idleWorkers := p.workers for i, w := range idleWorkers { w.task <- nil @@ -191,10 +190,6 @@ func (p *Pool) getWorker() *Worker { p.lock.Lock() defer p.lock.Unlock() - if p.release { - return nil - } - idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 { diff --git a/pool_func.go b/pool_func.go index f3609e9..a1750f5 100644 --- a/pool_func.go +++ b/pool_func.go @@ -46,7 +46,7 @@ type PoolWithFunc struct { workers []*WorkerWithFunc // release is used to notice the pool to closed itself. - release bool + release int32 // lock for synchronous operation. lock sync.Mutex @@ -73,7 +73,7 @@ func (p *PoolWithFunc) periodicallyPurge() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - if len(idleWorkers) == 0 && p.Running() == 0 && p.release { + if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { p.lock.Unlock() return } @@ -124,12 +124,11 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { // Serve submits a task to pool. func (p *PoolWithFunc) Serve(args interface{}) error { - if worker := p.getWorker(); worker != nil { - worker.args <- args - return nil - } else { + if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } + p.getWorker().args <- args + return nil } // Running returns the number of the currently running goroutines. @@ -162,8 +161,8 @@ func (p *PoolWithFunc) ReSize(size int) { // Release Closed this pool. func (p *PoolWithFunc) Release() error { p.once.Do(func() { + atomic.StoreInt32(&p.release, 1) p.lock.Lock() - p.release = true idleWorkers := p.workers for i, w := range idleWorkers { w.args <- nil @@ -195,10 +194,6 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { p.lock.Lock() defer p.lock.Unlock() - if p.release { - return nil - } - idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 {