From af70ed06608fd1885efa56ba406b06476bf7060f Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 18 Aug 2019 02:45:32 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=8FFix=20a=20bug=20where=20invokers=20?= =?UTF-8?q?get=20stuck=20in=20waiting=20idle=20workers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool.go | 37 +++++++++++++++++++++++++------------ pool_func.go | 37 +++++++++++++++++++++++++------------ 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/pool.go b/pool.go index bde1137..cfd3a20 100644 --- a/pool.go +++ b/pool.go @@ -28,8 +28,7 @@ import ( "time" ) -// Pool accept the tasks from client,it limits the total -// of goroutines to a given number by recycling goroutines. +// Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. capacity int32 @@ -89,6 +88,13 @@ func (p *Pool) periodicallyPurge() { } p.workers = idleWorkers[:m] } + + // There might be a situation that all workers have been cleaned up + // while some invokers still get stuck in "p.cond.Wait()", + // then it ought to wakes all those invokers. + if len(p.workers) == 0 { + p.cond.Broadcast() + } p.lock.Unlock() // Notify obsolete workers to stop. @@ -207,6 +213,17 @@ func (p *Pool) decRunning() { // retrieveWorker returns a available worker to run the tasks. func (p *Pool) retrieveWorker() *Worker { var w *Worker + spawnWorker := func() { + if cacheWorker := p.workerCache.Get(); cacheWorker != nil { + w = cacheWorker.(*Worker) + } else { + w = &Worker{ + pool: p, + task: make(chan func(), workerChanCap), + } + } + w.run() + } p.lock.Lock() idleWorkers := p.workers @@ -218,18 +235,14 @@ func (p *Pool) retrieveWorker() *Worker { p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() - if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*Worker) - } else { - w = &Worker{ - pool: p, - task: make(chan func(), workerChanCap), - } - } - w.run() + spawnWorker() } else { Reentry: p.cond.Wait() + if p.Running() == 0 { + spawnWorker() + return w + } l := len(p.workers) - 1 if l < 0 { goto Reentry @@ -237,7 +250,7 @@ func (p *Pool) retrieveWorker() *Worker { w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] - p.lock.Unlock() + p.lock.Unlock() } return w } diff --git a/pool_func.go b/pool_func.go index 9301397..fd2e225 100644 --- a/pool_func.go +++ b/pool_func.go @@ -28,8 +28,7 @@ import ( "time" ) -// PoolWithFunc accept the tasks from client,it limits the total -// of goroutines to a given number by recycling goroutines. +// PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. capacity int32 @@ -92,6 +91,13 @@ func (p *PoolWithFunc) periodicallyPurge() { } p.workers = idleWorkers[:m] } + + // There might be a situation that all workers have been cleaned up + // while some invokers still get stuck in "p.cond.Wait()", + // then it ought to wakes all those invokers. + if len(p.workers) == 0 { + p.cond.Broadcast() + } p.lock.Unlock() // Notify obsolete workers to stop. @@ -212,6 +218,17 @@ func (p *PoolWithFunc) decRunning() { // retrieveWorker returns a available worker to run the tasks. func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { var w *WorkerWithFunc + spawnWorker := func() { + if cacheWorker := p.workerCache.Get(); cacheWorker != nil { + w = cacheWorker.(*WorkerWithFunc) + } else { + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}, workerChanCap), + } + } + w.run() + } p.lock.Lock() idleWorkers := p.workers @@ -223,18 +240,14 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() - if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*WorkerWithFunc) - } else { - w = &WorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerChanCap), - } - } - w.run() + spawnWorker() } else { Reentry: p.cond.Wait() + if p.Running() == 0 { + spawnWorker() + return w + } l := len(p.workers) - 1 if l < 0 { goto Reentry @@ -242,7 +255,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] - p.lock.Unlock() + p.lock.Unlock() } return w }