feat: implement pool.Waiting() API

Fixes #157
This commit is contained in:
Andy Pan 2022-05-07 22:43:25 +08:00
parent 607d0390c6
commit 9310acdff2
6 changed files with 60 additions and 43 deletions

47
pool.go
View File

@ -56,8 +56,8 @@ type Pool struct {
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool workerCache sync.Pool
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
blockingNum int waiting int32
heartbeatDone int32 heartbeatDone int32
stopHeartbeat context.CancelFunc stopHeartbeat context.CancelFunc
@ -97,10 +97,11 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
expiredWorkers[i] = nil expiredWorkers[i] = nil
} }
// There might be a situation that all workers have been cleaned up(no any worker is running) // There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up,
// while some invokers still get stuck in "p.cond.Wait()", // while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers. // then it ought to wake all those invokers.
if p.Running() == 0 { if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast() p.cond.Broadcast()
} }
} }
@ -174,12 +175,12 @@ func (p *Pool) Submit(task func()) error {
return nil return nil
} }
// Running returns the amount of the currently running goroutines. // Running returns the number of workers currently running.
func (p *Pool) Running() int { func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited. // Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
func (p *Pool) Free() int { func (p *Pool) Free() int {
c := p.Cap() c := p.Cap()
if c < 0 { if c < 0 {
@ -188,6 +189,11 @@ func (p *Pool) Free() int {
return c - p.Running() return c - p.Running()
} }
// Waiting returns the number of tasks which are waiting be executed.
func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}
// Cap returns the capacity of this pool. // Cap returns the capacity of this pool.
func (p *Pool) Cap() int { func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
@ -259,14 +265,12 @@ func (p *Pool) Reboot() {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// incRunning increases the number of the currently running goroutines. func (p *Pool) addRunning(delta int) {
func (p *Pool) incRunning() { atomic.AddInt32(&p.running, int32(delta))
atomic.AddInt32(&p.running, 1)
} }
// decRunning decreases the number of the currently running goroutines. func (p *Pool) addWaiting(delta int) {
func (p *Pool) decRunning() { atomic.AddInt32(&p.waiting, int32(delta))
atomic.AddInt32(&p.running, -1)
} }
// retrieveWorker returns an available worker to run the tasks. // retrieveWorker returns an available worker to run the tasks.
@ -292,30 +296,33 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
return return
} }
retry: retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock() p.lock.Unlock()
return return
} }
p.blockingNum++ p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker p.cond.Wait() // block and wait for an available worker
p.blockingNum-- p.addWaiting(-1)
if p.IsClosed() {
p.lock.Unlock()
return
}
var nw int var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock() p.lock.Unlock()
if !p.IsClosed() { spawnWorker()
spawnWorker()
}
return return
} }
if w = p.workers.detach(); w == nil { if w = p.workers.detach(); w == nil {
if nw < capacity { if nw < p.Cap() {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
return return
} }
goto retry goto retry
} }
p.lock.Unlock() p.lock.Unlock()
} }
return return

View File

@ -58,8 +58,8 @@ type PoolWithFunc struct {
// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool workerCache sync.Pool
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
blockingNum int waiting int32
heartbeatDone int32 heartbeatDone int32
stopHeartbeat context.CancelFunc stopHeartbeat context.CancelFunc
@ -112,10 +112,11 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
expiredWorkers[i] = nil expiredWorkers[i] = nil
} }
// There might be a situation that all workers have been cleaned up(no worker is running) // There might be a situation where all workers have been cleaned up(no worker is running),
// or another case where the pool capacity has been Tuned up,
// while some invokers still get stuck in "p.cond.Wait()", // while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wake all those invokers. // then it ought to wake all those invokers.
if p.Running() == 0 { if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
p.cond.Broadcast() p.cond.Broadcast()
} }
} }
@ -191,12 +192,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
return nil return nil
} }
// Running returns the amount of the currently running goroutines. // Running returns the number of workers currently running.
func (p *PoolWithFunc) Running() int { func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited. // Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
func (p *PoolWithFunc) Free() int { func (p *PoolWithFunc) Free() int {
c := p.Cap() c := p.Cap()
if c < 0 { if c < 0 {
@ -205,6 +206,11 @@ func (p *PoolWithFunc) Free() int {
return c - p.Running() return c - p.Running()
} }
// Waiting returns the number of tasks which are waiting be executed.
func (p *PoolWithFunc) Waiting() int {
return int(atomic.LoadInt32(&p.waiting))
}
// Cap returns the capacity of this pool. // Cap returns the capacity of this pool.
func (p *PoolWithFunc) Cap() int { func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
@ -280,14 +286,12 @@ func (p *PoolWithFunc) Reboot() {
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
// incRunning increases the number of the currently running goroutines. func (p *PoolWithFunc) addRunning(delta int) {
func (p *PoolWithFunc) incRunning() { atomic.AddInt32(&p.running, int32(delta))
atomic.AddInt32(&p.running, 1)
} }
// decRunning decreases the number of the currently running goroutines. func (p *PoolWithFunc) addWaiting(delta int) {
func (p *PoolWithFunc) decRunning() { atomic.AddInt32(&p.waiting, int32(delta))
atomic.AddInt32(&p.running, -1)
} }
// retrieveWorker returns an available worker to run the tasks. // retrieveWorker returns an available worker to run the tasks.
@ -316,24 +320,28 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
return return
} }
retry: retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock() p.lock.Unlock()
return return
} }
p.blockingNum++ p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker p.cond.Wait() // block and wait for an available worker
p.blockingNum-- p.addWaiting(-1)
if p.IsClosed() {
p.lock.Unlock()
return
}
var nw int var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock() p.lock.Unlock()
if !p.IsClosed() { spawnWorker()
spawnWorker()
}
return return
} }
l := len(p.workers) - 1 l := len(p.workers) - 1
if l < 0 { if l < 0 {
if nw < capacity { if nw < p.Cap() {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
return return

View File

@ -44,10 +44,10 @@ type goWorker struct {
// run starts a goroutine to repeat the process // run starts a goroutine to repeat the process
// that performs the function calls. // that performs the function calls.
func (w *goWorker) run() { func (w *goWorker) run() {
w.pool.incRunning() w.pool.addRunning(1)
go func() { go func() {
defer func() { defer func() {
w.pool.decRunning() w.pool.addRunning(-1)
w.pool.workerCache.Put(w) w.pool.workerCache.Put(w)
if p := recover(); p != nil { if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil { if ph := w.pool.options.PanicHandler; ph != nil {

View File

@ -44,10 +44,10 @@ type goWorkerWithFunc struct {
// run starts a goroutine to repeat the process // run starts a goroutine to repeat the process
// that performs the function calls. // that performs the function calls.
func (w *goWorkerWithFunc) run() { func (w *goWorkerWithFunc) run() {
w.pool.incRunning() w.pool.addRunning(1)
go func() { go func() {
defer func() { defer func() {
w.pool.decRunning() w.pool.addRunning(-1)
w.pool.workerCache.Put(w) w.pool.workerCache.Put(w)
if p := recover(); p != nil { if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil { if ph := w.pool.options.PanicHandler; ph != nil {

View File

@ -1,3 +1,4 @@
//go:build !windows
// +build !windows // +build !windows
package ants package ants

View File

@ -1,3 +1,4 @@
//go:build !windows
// +build !windows // +build !windows
package ants package ants