Support unlimited pool

Fixes #90
This commit is contained in:
Andy Pan 2020-05-08 20:13:35 +08:00
parent 88b5a85d64
commit 1c534853c8
3 changed files with 36 additions and 17 deletions

View File

@ -535,6 +535,20 @@ func TestRebootNewPool(t *testing.T) {
wg.Wait()
}
func TestInfinitePool(t *testing.T) {
c := make(chan struct{})
p, _ := NewPool(-1)
_ = p.Submit(func() {
_ = p.Submit(func() {
<-c
})
})
c <- struct{}{}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
}
func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)

28
pool.go
View File

@ -56,6 +56,11 @@ type Pool struct {
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int
// infinite indicates whether the pool capacity is limitless, an infinite pool is used to avoid
// potential the endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
infinite bool
options *Options
}
@ -92,10 +97,6 @@ func (p *Pool) periodicallyPurge() {
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
if size <= 0 {
return nil, ErrInvalidPoolSize
}
opts := loadOptions(options...)
if expiry := opts.ExpiryDuration; expiry < 0 {
@ -119,6 +120,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
task: make(chan func(), workerChanCap),
}
}
if size <= 0 {
p.infinite = true
}
if p.options.PreAlloc {
p.workers = newWorkerArray(loopQueueType, size)
} else {
@ -199,8 +203,7 @@ func (p *Pool) decRunning() {
}
// retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker() *goWorker {
var w *goWorker
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
@ -211,18 +214,21 @@ func (p *Pool) retrieveWorker() *goWorker {
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if p.infinite {
p.lock.Unlock()
spawnWorker()
} else if p.Running() < p.Cap() {
p.lock.Unlock()
spawnWorker()
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return nil
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return nil
return
}
p.blockingNum++
p.cond.Wait()
@ -230,7 +236,7 @@ func (p *Pool) retrieveWorker() *goWorker {
if p.Running() == 0 {
p.lock.Unlock()
spawnWorker()
return w
return
}
w = p.workers.detach()
@ -240,12 +246,12 @@ func (p *Pool) retrieveWorker() *goWorker {
p.lock.Unlock()
}
return w
return
}
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
if atomic.LoadInt32(&p.state) == CLOSED || (!p.infinite && p.Running() > p.Cap()) {
return false
}
worker.recycleTime = time.Now()

View File

@ -223,8 +223,7 @@ func (p *PoolWithFunc) decRunning() {
}
// retrieveWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
var w *goWorkerWithFunc
func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorkerWithFunc)
w.run()
@ -244,12 +243,12 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
} else {
if p.options.Nonblocking {
p.lock.Unlock()
return nil
return
}
Reentry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return nil
return
}
p.blockingNum++
p.cond.Wait()
@ -257,7 +256,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
if p.Running() == 0 {
p.lock.Unlock()
spawnWorker()
return w
return
}
l := len(p.workers) - 1
if l < 0 {
@ -268,7 +267,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
p.workers = p.workers[:l]
p.lock.Unlock()
}
return w
return
}
// revertWorker puts a worker back into free pool, recycling the goroutines.