diff --git a/pool.go b/pool.go index b0481bb..bdf3454 100644 --- a/pool.go +++ b/pool.go @@ -333,51 +333,41 @@ func (p *Pool) addWaiting(delta int) { // retrieveWorker returns an available worker to run the tasks. func (p *Pool) retrieveWorker() (w worker) { - spawnWorker := func() { + p.lock.Lock() + +retry: + // First try to fetch the worker from the queue. + if w = p.workers.detach(); w != nil { + p.lock.Unlock() + return + } + + // If the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + p.lock.Unlock() w = p.workerCache.Get().(*goWorker) w.run() + return } - p.lock.Lock() - w = p.workers.detach() - if w != nil { // first try to fetch the worker from the queue - p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - // if the worker queue is empty and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. - p.lock.Unlock() - spawnWorker() - } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - retry: - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { - p.lock.Unlock() - return - } - - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) - - if p.IsClosed() { - p.lock.Unlock() - return - } - - if w = p.workers.detach(); w == nil { - if p.Free() > 0 { - p.lock.Unlock() - spawnWorker() - return - } - goto retry - } + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() + return } - return + + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return + } + + goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines. diff --git a/pool_func.go b/pool_func.go index 69d3c86..e41ae0e 100644 --- a/pool_func.go +++ b/pool_func.go @@ -339,51 +339,41 @@ func (p *PoolWithFunc) addWaiting(delta int) { // retrieveWorker returns an available worker to run the tasks. func (p *PoolWithFunc) retrieveWorker() (w worker) { - spawnWorker := func() { + p.lock.Lock() + +retry: + // First try to fetch the worker from the queue. + if w = p.workers.detach(); w != nil { + p.lock.Unlock() + return + } + + // If the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + p.lock.Unlock() w = p.workerCache.Get().(*goWorkerWithFunc) w.run() + return } - p.lock.Lock() - w = p.workers.detach() - if w != nil { // first try to fetch the worker from the queue - p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - // if the worker queue is empty and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. - p.lock.Unlock() - spawnWorker() - } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - retry: - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { - p.lock.Unlock() - return - } - - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) - - if p.IsClosed() { - p.lock.Unlock() - return - } - - if w = p.workers.detach(); w == nil { - if p.Free() > 0 { - p.lock.Unlock() - spawnWorker() - return - } - goto retry - } + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() + return } - return + + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return + } + + goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines.