From 1c534853c887e0a7b0d56d51303bf09f201e26a0 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Fri, 8 May 2020 20:13:35 +0800 Subject: [PATCH] Support unlimited pool Fixes #90 --- ants_test.go | 14 ++++++++++++++ pool.go | 28 +++++++++++++++++----------- pool_func.go | 11 +++++------ 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/ants_test.go b/ants_test.go index 2ff430c..5672f19 100644 --- a/ants_test.go +++ b/ants_test.go @@ -535,6 +535,20 @@ func TestRebootNewPool(t *testing.T) { wg.Wait() } +func TestInfinitePool(t *testing.T) { + c := make(chan struct{}) + p, _ := NewPool(-1) + _ = p.Submit(func() { + _ = p.Submit(func() { + <-c + }) + }) + c <- struct{}{} + if n := p.Running(); n != 2 { + t.Errorf("expect 2 workers running, but got %d", n) + } +} + func TestRestCodeCoverage(t *testing.T) { _, err := NewPool(-1, WithExpiryDuration(-1)) t.Log(err) diff --git a/pool.go b/pool.go index 02dbdce..d54f7ac 100644 --- a/pool.go +++ b/pool.go @@ -56,6 +56,11 @@ type Pool struct { // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock blockingNum int + // infinite indicates whether the pool capacity is limitless, an infinite pool is used to avoid + // potential the endless blocking caused by nested usage of a pool: submitting a task to pool + // which submits a new task to the same pool. + infinite bool + options *Options } @@ -92,10 +97,6 @@ func (p *Pool) periodicallyPurge() { // NewPool generates an instance of ants pool. func NewPool(size int, options ...Option) (*Pool, error) { - if size <= 0 { - return nil, ErrInvalidPoolSize - } - opts := loadOptions(options...) if expiry := opts.ExpiryDuration; expiry < 0 { @@ -119,6 +120,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { task: make(chan func(), workerChanCap), } } + if size <= 0 { + p.infinite = true + } if p.options.PreAlloc { p.workers = newWorkerArray(loopQueueType, size) } else { @@ -199,8 +203,7 @@ func (p *Pool) decRunning() { } // retrieveWorker returns a available worker to run the tasks. -func (p *Pool) retrieveWorker() *goWorker { - var w *goWorker +func (p *Pool) retrieveWorker() (w *goWorker) { spawnWorker := func() { w = p.workerCache.Get().(*goWorker) w.run() @@ -211,18 +214,21 @@ func (p *Pool) retrieveWorker() *goWorker { w = p.workers.detach() if w != nil { p.lock.Unlock() + } else if p.infinite { + p.lock.Unlock() + spawnWorker() } else if p.Running() < p.Cap() { p.lock.Unlock() spawnWorker() } else { if p.options.Nonblocking { p.lock.Unlock() - return nil + return } Reentry: if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() - return nil + return } p.blockingNum++ p.cond.Wait() @@ -230,7 +236,7 @@ func (p *Pool) retrieveWorker() *goWorker { if p.Running() == 0 { p.lock.Unlock() spawnWorker() - return w + return } w = p.workers.detach() @@ -240,12 +246,12 @@ func (p *Pool) retrieveWorker() *goWorker { p.lock.Unlock() } - return w + return } // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *goWorker) bool { - if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() { + if atomic.LoadInt32(&p.state) == CLOSED || (!p.infinite && p.Running() > p.Cap()) { return false } worker.recycleTime = time.Now() diff --git a/pool_func.go b/pool_func.go index 8752396..f7fa44b 100644 --- a/pool_func.go +++ b/pool_func.go @@ -223,8 +223,7 @@ func (p *PoolWithFunc) decRunning() { } // retrieveWorker returns a available worker to run the tasks. -func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { - var w *goWorkerWithFunc +func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { spawnWorker := func() { w = p.workerCache.Get().(*goWorkerWithFunc) w.run() @@ -244,12 +243,12 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { } else { if p.options.Nonblocking { p.lock.Unlock() - return nil + return } Reentry: if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() - return nil + return } p.blockingNum++ p.cond.Wait() @@ -257,7 +256,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { if p.Running() == 0 { p.lock.Unlock() spawnWorker() - return w + return } l := len(p.workers) - 1 if l < 0 { @@ -268,7 +267,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { p.workers = p.workers[:l] p.lock.Unlock() } - return w + return } // revertWorker puts a worker back into free pool, recycling the goroutines.