diff --git a/pool.go b/pool.go index bde1137..cfd3a20 100644 --- a/pool.go +++ b/pool.go @@ -28,8 +28,7 @@ import ( "time" ) -// Pool accept the tasks from client,it limits the total -// of goroutines to a given number by recycling goroutines. +// Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. capacity int32 @@ -89,6 +88,13 @@ func (p *Pool) periodicallyPurge() { } p.workers = idleWorkers[:m] } + + // There might be a situation that all workers have been cleaned up + // while some invokers still get stuck in "p.cond.Wait()", + // then it ought to wakes all those invokers. + if len(p.workers) == 0 { + p.cond.Broadcast() + } p.lock.Unlock() // Notify obsolete workers to stop. @@ -207,6 +213,17 @@ func (p *Pool) decRunning() { // retrieveWorker returns a available worker to run the tasks. func (p *Pool) retrieveWorker() *Worker { var w *Worker + spawnWorker := func() { + if cacheWorker := p.workerCache.Get(); cacheWorker != nil { + w = cacheWorker.(*Worker) + } else { + w = &Worker{ + pool: p, + task: make(chan func(), workerChanCap), + } + } + w.run() + } p.lock.Lock() idleWorkers := p.workers @@ -218,18 +235,14 @@ func (p *Pool) retrieveWorker() *Worker { p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() - if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*Worker) - } else { - w = &Worker{ - pool: p, - task: make(chan func(), workerChanCap), - } - } - w.run() + spawnWorker() } else { Reentry: p.cond.Wait() + if p.Running() == 0 { + spawnWorker() + return w + } l := len(p.workers) - 1 if l < 0 { goto Reentry @@ -237,7 +250,7 @@ func (p *Pool) retrieveWorker() *Worker { w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] - p.lock.Unlock() + p.lock.Unlock() } return w } diff --git a/pool_func.go b/pool_func.go index 9301397..fd2e225 100644 --- a/pool_func.go +++ b/pool_func.go @@ -28,8 +28,7 @@ import ( "time" ) -// PoolWithFunc accept the tasks from client,it limits the total -// of goroutines to a given number by recycling goroutines. +// PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. capacity int32 @@ -92,6 +91,13 @@ func (p *PoolWithFunc) periodicallyPurge() { } p.workers = idleWorkers[:m] } + + // There might be a situation that all workers have been cleaned up + // while some invokers still get stuck in "p.cond.Wait()", + // then it ought to wakes all those invokers. + if len(p.workers) == 0 { + p.cond.Broadcast() + } p.lock.Unlock() // Notify obsolete workers to stop. @@ -212,6 +218,17 @@ func (p *PoolWithFunc) decRunning() { // retrieveWorker returns a available worker to run the tasks. func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { var w *WorkerWithFunc + spawnWorker := func() { + if cacheWorker := p.workerCache.Get(); cacheWorker != nil { + w = cacheWorker.(*WorkerWithFunc) + } else { + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}, workerChanCap), + } + } + w.run() + } p.lock.Lock() idleWorkers := p.workers @@ -223,18 +240,14 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() - if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*WorkerWithFunc) - } else { - w = &WorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerChanCap), - } - } - w.run() + spawnWorker() } else { Reentry: p.cond.Wait() + if p.Running() == 0 { + spawnWorker() + return w + } l := len(p.workers) - 1 if l < 0 { goto Reentry @@ -242,7 +255,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] - p.lock.Unlock() + p.lock.Unlock() } return w }