opt: awake blocking goroutines more precisely in purgeStaleWorkers

Fixes #272
This commit is contained in:
Andy Pan 2023-04-15 23:47:55 +08:00
parent 73defa0289
commit b32591f8bd
2 changed files with 12 additions and 22 deletions

17
pool.go
View File

@ -90,8 +90,11 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
break break
} }
var isDormant bool
p.lock.Lock() p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration) staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock() p.lock.Unlock()
// Notify obsolete workers to stop. // Notify obsolete workers to stop.
@ -104,10 +107,8 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
} }
// There might be a situation where all workers have been cleaned up(no worker is running), // There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up, // while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
// while some invokers still get stuck in "p.cond.Wait()", if isDormant && p.Waiting() > 0 {
// then it ought to wake all those invokers.
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast() p.cond.Broadcast()
} }
} }
@ -365,14 +366,8 @@ func (p *Pool) retrieveWorker() (w worker) {
return return
} }
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
spawnWorker()
return
}
if w = p.workers.detach(); w == nil { if w = p.workers.detach(); w == nil {
if nw < p.Cap() { if p.Free() > 0 {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
return return

View File

@ -91,8 +91,11 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
break break
} }
var isDormant bool
p.lock.Lock() p.lock.Lock()
staleWorkers := p.workers.refresh(p.options.ExpiryDuration) staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
n := p.Running()
isDormant = n == 0 || n == len(staleWorkers)
p.lock.Unlock() p.lock.Unlock()
// Notify obsolete workers to stop. // Notify obsolete workers to stop.
@ -105,10 +108,8 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
} }
// There might be a situation where all workers have been cleaned up(no worker is running), // There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up, // while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
// while some invokers still get stuck in "p.cond.Wait()", if isDormant && p.Waiting() > 0 {
// then it ought to wake all those invokers.
if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast() p.cond.Broadcast()
} }
} }
@ -371,14 +372,8 @@ func (p *PoolWithFunc) retrieveWorker() (w worker) {
return return
} }
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
spawnWorker()
return
}
if w = p.workers.detach(); w == nil { if w = p.workers.detach(); w == nil {
if nw < p.Cap() { if p.Free() > 0 {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
return return