From a2ad870d2dd95931970ebf33c3442915645c0997 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 27 Apr 2021 08:14:28 +0800 Subject: [PATCH] Returns -1 from Free() by unlimited pool Fixes #152 --- ants_test.go | 32 +++++++++++++++++++++++++++++++- pool.go | 13 +++++++------ pool_func.go | 22 +++++++++++++++------- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/ants_test.go b/ants_test.go index 48c4577..1193aff 100644 --- a/ants_test.go +++ b/ants_test.go @@ -548,12 +548,42 @@ func TestInfinitePool(t *testing.T) { if n := p.Running(); n != 2 { t.Errorf("expect 2 workers running, but got %d", n) } + if n := p.Free(); n != -1 { + t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n) + } p.Tune(10) if capacity := p.Cap(); capacity != -1 { t.Fatalf("expect capacity: -1 but got %d", capacity) } var err error - p, err = NewPool(-1, WithPreAlloc(true)) + _, err = NewPool(-1, WithPreAlloc(true)) + if err != ErrInvalidPreAllocSize { + t.Errorf("expect ErrInvalidPreAllocSize but got %v", err) + } +} + +func TestInfinitePoolWithFunc(t *testing.T) { + c := make(chan struct{}) + p, _ := NewPoolWithFunc(-1, func(i interface{}) { + demoPoolFunc(i) + <-c + }) + _ = p.Invoke(10) + _ = p.Invoke(10) + c <- struct{}{} + c <- struct{}{} + if n := p.Running(); n != 2 { + t.Errorf("expect 2 workers running, but got %d", n) + } + if n := p.Free(); n != -1 { + t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n) + } + p.Tune(10) + if capacity := p.Cap(); capacity != -1 { + t.Fatalf("expect capacity: -1 but got %d", capacity) + } + var err error + _, err = NewPoolWithFunc(-1, demoPoolFunc, WithPreAlloc(true)) if err != ErrInvalidPreAllocSize { t.Errorf("expect ErrInvalidPreAllocSize but got %v", err) } diff --git a/pool.go b/pool.go index 37151cc..751ea65 100644 --- a/pool.go +++ b/pool.go @@ -159,9 +159,13 @@ func (p *Pool) Running() int { return int(atomic.LoadInt32(&p.running)) } -// Free returns the available goroutines to work. +// Free returns the available goroutines to work, -1 indicates this pool is unlimited. func (p *Pool) Free() int { - return p.Cap() - p.Running() + c := p.Cap() + if c < 0 { + return -1 + } + return c - p.Running() } // Cap returns the capacity of this pool. @@ -224,10 +228,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) { w = p.workers.detach() if w != nil { p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 { - p.lock.Unlock() - spawnWorker() - } else if p.Running() < capacity { + } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() spawnWorker() } else { diff --git a/pool_func.go b/pool_func.go index b14be42..6add14e 100644 --- a/pool_func.go +++ b/pool_func.go @@ -111,7 +111,7 @@ func (p *PoolWithFunc) purgePeriodically() { // NewPoolWithFunc generates an instance of ants pool with a specific function. func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { if size <= 0 { - return nil, ErrInvalidPoolSize + size = -1 } if pf == nil { @@ -143,6 +143,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } } if p.options.PreAlloc { + if size == -1 { + return nil, ErrInvalidPreAllocSize + } p.workers = make([]*goWorkerWithFunc, 0, size) } p.cond = sync.NewCond(p.lock) @@ -173,9 +176,13 @@ func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } -// Free returns a available goroutines to work. +// Free returns a available goroutines to work, -1 indicates this pool is unlimited. func (p *PoolWithFunc) Free() int { - return p.Cap() - p.Running() + c := p.Cap() + if c < 0 { + return -1 + } + return c - p.Running() } // Cap returns the capacity of this pool. @@ -185,7 +192,7 @@ func (p *PoolWithFunc) Cap() int { // Tune changes the capacity of this pool. func (p *PoolWithFunc) Tune(size int) { - if size <= 0 || size == p.Cap() || p.options.PreAlloc { + if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) @@ -245,7 +252,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { idleWorkers[n] = nil p.workers = idleWorkers[:n] p.lock.Unlock() - } else if p.Running() < p.Cap() { + } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() spawnWorker() } else { @@ -261,7 +268,8 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { p.blockingNum++ p.cond.Wait() p.blockingNum-- - if p.Running() == 0 { + var nw int + if nw = p.Running(); nw == 0 { p.lock.Unlock() if !p.IsClosed() { spawnWorker() @@ -270,7 +278,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { } l := len(p.workers) - 1 if l < 0 { - if p.Running() < p.Cap() { + if nw < capacity { p.lock.Unlock() spawnWorker() return