🎏Fix a bug where invokers get stuck in waiting idle workers

This commit is contained in:
Andy Pan 2019-08-18 02:45:32 +08:00
parent 4f88d413b4
commit af70ed0660
2 changed files with 50 additions and 24 deletions

37
pool.go
View File

@ -28,8 +28,7 @@ import (
"time"
)
// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
// Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool.
capacity int32
@ -89,6 +88,13 @@ func (p *Pool) periodicallyPurge() {
}
p.workers = idleWorkers[:m]
}
// There might be a situation that all workers have been cleaned up
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wakes all those invokers.
if len(p.workers) == 0 {
p.cond.Broadcast()
}
p.lock.Unlock()
// Notify obsolete workers to stop.
@ -207,6 +213,17 @@ func (p *Pool) decRunning() {
// retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker() *Worker {
var w *Worker
spawnWorker := func() {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
w.run()
}
p.lock.Lock()
idleWorkers := p.workers
@ -218,18 +235,14 @@ func (p *Pool) retrieveWorker() *Worker {
p.lock.Unlock()
} else if p.Running() < p.Cap() {
p.lock.Unlock()
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
w.run()
spawnWorker()
} else {
Reentry:
p.cond.Wait()
if p.Running() == 0 {
spawnWorker()
return w
}
l := len(p.workers) - 1
if l < 0 {
goto Reentry
@ -237,7 +250,7 @@ func (p *Pool) retrieveWorker() *Worker {
w = p.workers[l]
p.workers[l] = nil
p.workers = p.workers[:l]
p.lock.Unlock()
p.lock.Unlock()
}
return w
}

View File

@ -28,8 +28,7 @@ import (
"time"
)
// PoolWithFunc accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
// PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// capacity of the pool.
capacity int32
@ -92,6 +91,13 @@ func (p *PoolWithFunc) periodicallyPurge() {
}
p.workers = idleWorkers[:m]
}
// There might be a situation that all workers have been cleaned up
// while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wakes all those invokers.
if len(p.workers) == 0 {
p.cond.Broadcast()
}
p.lock.Unlock()
// Notify obsolete workers to stop.
@ -212,6 +218,17 @@ func (p *PoolWithFunc) decRunning() {
// retrieveWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
var w *WorkerWithFunc
spawnWorker := func() {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*WorkerWithFunc)
} else {
w = &WorkerWithFunc{
pool: p,
args: make(chan interface{}, workerChanCap),
}
}
w.run()
}
p.lock.Lock()
idleWorkers := p.workers
@ -223,18 +240,14 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
p.lock.Unlock()
} else if p.Running() < p.Cap() {
p.lock.Unlock()
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*WorkerWithFunc)
} else {
w = &WorkerWithFunc{
pool: p,
args: make(chan interface{}, workerChanCap),
}
}
w.run()
spawnWorker()
} else {
Reentry:
p.cond.Wait()
if p.Running() == 0 {
spawnWorker()
return w
}
l := len(p.workers) - 1
if l < 0 {
goto Reentry
@ -242,7 +255,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
w = p.workers[l]
p.workers[l] = nil
p.workers = p.workers[:l]
p.lock.Unlock()
p.lock.Unlock()
}
return w
}