refactor: refine the code in `retrieveWorker` to make it more readable (#295)

This commit is contained in:
POABOB 2023-09-17 22:06:25 +08:00 committed by GitHub
parent 1ce814699d
commit aee9c2e2da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 78 deletions

44
pool.go
View File

@ -333,31 +333,31 @@ func (p *Pool) addWaiting(delta int) {
// retrieveWorker returns an available worker to run the tasks. // retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker) { func (p *Pool) retrieveWorker() (w worker) {
spawnWorker := func() { p.lock.Lock()
retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}
// If the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(*goWorker) w = p.workerCache.Get().(*goWorker)
w.run() w.run()
}
p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return return
} }
retry:
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock() p.lock.Unlock()
return return
} }
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1) p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1) p.addWaiting(-1)
@ -367,17 +367,7 @@ func (p *Pool) retrieveWorker() (w worker) {
return return
} }
if w = p.workers.detach(); w == nil {
if p.Free() > 0 {
p.lock.Unlock()
spawnWorker()
return
}
goto retry goto retry
}
p.lock.Unlock()
}
return
} }
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.

View File

@ -339,31 +339,31 @@ func (p *PoolWithFunc) addWaiting(delta int) {
// retrieveWorker returns an available worker to run the tasks. // retrieveWorker returns an available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() (w worker) { func (p *PoolWithFunc) retrieveWorker() (w worker) {
spawnWorker := func() { p.lock.Lock()
retry:
// First try to fetch the worker from the queue.
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}
// If the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
w = p.workerCache.Get().(*goWorkerWithFunc) w = p.workerCache.Get().(*goWorkerWithFunc)
w.run() w.run()
}
p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return return
} }
retry:
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock() p.lock.Unlock()
return return
} }
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
p.addWaiting(1) p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1) p.addWaiting(-1)
@ -373,17 +373,7 @@ func (p *PoolWithFunc) retrieveWorker() (w worker) {
return return
} }
if w = p.workers.detach(); w == nil {
if p.Free() > 0 {
p.lock.Unlock()
spawnWorker()
return
}
goto retry goto retry
}
p.lock.Unlock()
}
return
} }
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.