From 51c0008356f635f6e9658d5d0bc76b6de7b23b96 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 27 Jul 2019 12:05:44 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9EFix=20a=20bug=20that=20blocks=20get?= =?UTF-8?q?ting=20worker=20from=20pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool.go | 12 ++++++++---- pool_func.go | 12 ++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pool.go b/pool.go index 65bde25..1acda9d 100644 --- a/pool.go +++ b/pool.go @@ -70,7 +70,7 @@ func (p *Pool) periodicallyPurge() { var expiredWorkers []*Worker for range heartbeat.C { - if CLOSED == atomic.LoadInt32(&p.release) { + if atomic.LoadInt32(&p.release) == CLOSED { break } currentTime := time.Now() @@ -142,7 +142,7 @@ func NewUltimatePool(size, expiry int, preAlloc bool) (*Pool, error) { // Submit submits a task to this pool. func (p *Pool) Submit(task func()) error { - if CLOSED == atomic.LoadInt32(&p.release) { + if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } p.retrieveWorker().task <- task @@ -166,7 +166,7 @@ func (p *Pool) Cap() int { // Tune changes the capacity of this pool. func (p *Pool) Tune(size int) { - if size == p.Cap() { + if p.Cap() == size { return } atomic.StoreInt32(&p.capacity, int32(size)) @@ -211,6 +211,7 @@ func (p *Pool) retrieveWorker() *Worker { p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 +RESUME: if n >= 0 { w = idleWorkers[n] idleWorkers[n] = nil @@ -229,6 +230,9 @@ func (p *Pool) retrieveWorker() *Worker { w.run() } else { for { + if p.Running() == 0 { + goto RESUME + } p.cond.Wait() l := len(p.workers) - 1 if l < 0 { @@ -246,7 +250,7 @@ func (p *Pool) retrieveWorker() *Worker { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *Worker) bool { - if CLOSED == atomic.LoadInt32(&p.release) { + if atomic.LoadInt32(&p.release) == CLOSED { return false } worker.recycleTime = time.Now() diff --git a/pool_func.go b/pool_func.go index 5085e78..e93d838 100644 --- a/pool_func.go +++ b/pool_func.go @@ -73,7 +73,7 @@ func (p *PoolWithFunc) periodicallyPurge() { var expiredWorkers []*WorkerWithFunc for range heartbeat.C { - if CLOSED == atomic.LoadInt32(&p.release) { + if atomic.LoadInt32(&p.release) == CLOSED { break } currentTime := time.Now() @@ -147,7 +147,7 @@ func NewUltimatePoolWithFunc(size, expiry int, pf func(interface{}), preAlloc bo // Invoke submits a task to pool. func (p *PoolWithFunc) Invoke(args interface{}) error { - if CLOSED == atomic.LoadInt32(&p.release) { + if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } p.retrieveWorker().args <- args @@ -171,7 +171,7 @@ func (p *PoolWithFunc) Cap() int { // Tune change the capacity of this pool. func (p *PoolWithFunc) Tune(size int) { - if size == p.Cap() { + if p.Cap() == size { return } atomic.StoreInt32(&p.capacity, int32(size)) @@ -216,6 +216,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 +RESUME: if n >= 0 { w = idleWorkers[n] idleWorkers[n] = nil @@ -234,6 +235,9 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { w.run() } else { for { + if p.Running() == 0 { + goto RESUME + } p.cond.Wait() l := len(p.workers) - 1 if l < 0 { @@ -251,7 +255,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) bool { - if CLOSED == atomic.LoadInt32(&p.release) { + if atomic.LoadInt32(&p.release) == CLOSED { return false } worker.recycleTime = time.Now()