diff --git a/ants.go b/ants.go index ed14312..d00d190 100644 --- a/ants.go +++ b/ants.go @@ -51,7 +51,8 @@ var ( // Error types for the Ants API. //--------------------------------------------------------------------------- - // ErrInvalidPoolSize will be returned when setting a negative number as pool capacity. + // ErrInvalidPoolSize will be returned when setting a negative number as pool capacity, this error will be only used + // by pool with func because pool without func can be infinite by setting up a negative capacity. ErrInvalidPoolSize = errors.New("invalid size for pool") // ErrLackPoolFunc will be returned when invokers don't provide function for pool. @@ -65,6 +66,10 @@ var ( // ErrPoolOverload will be returned when the pool is full and no workers available. ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") + + // ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode. + ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode") + //--------------------------------------------------------------------------- // workerChanCap determines whether the channel of a worker should be a buffered channel diff --git a/ants_test.go b/ants_test.go index 5672f19..cd74d2d 100644 --- a/ants_test.go +++ b/ants_test.go @@ -547,6 +547,15 @@ func TestInfinitePool(t *testing.T) { if n := p.Running(); n != 2 { t.Errorf("expect 2 workers running, but got %d", n) } + p.Tune(10) + if capacity := p.Cap(); capacity != -1 { + t.Fatalf("expect capacity: -1 but got %d", capacity) + } + var err error + p, err = NewPool(-1, WithPreAlloc(true)) + if err != ErrInvalidPreAllocSize { + t.Errorf("expect ErrInvalidPreAllocSize but got %v", err) + } } func TestRestCodeCoverage(t *testing.T) { diff --git a/pool.go b/pool.go index d54f7ac..1f72f77 100644 --- a/pool.go +++ b/pool.go @@ -32,7 +32,9 @@ import ( // Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { - // capacity of the pool. + // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to + // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool + // which submits a new task to the same pool. capacity int32 // running is the number of the currently running goroutines. @@ -56,11 +58,6 @@ type Pool struct { // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock blockingNum int - // infinite indicates whether the pool capacity is limitless, an infinite pool is used to avoid - // potential the endless blocking caused by nested usage of a pool: submitting a task to pool - // which submits a new task to the same pool. - infinite bool - options *Options } @@ -99,6 +96,10 @@ func (p *Pool) periodicallyPurge() { func NewPool(size int, options ...Option) (*Pool, error) { opts := loadOptions(options...) + if size <= 0 { + size = -1 + } + if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { @@ -120,10 +121,10 @@ func NewPool(size int, options ...Option) (*Pool, error) { task: make(chan func(), workerChanCap), } } - if size <= 0 { - p.infinite = true - } if p.options.PreAlloc { + if size == -1 { + return nil, ErrInvalidPreAllocSize + } p.workers = newWorkerArray(loopQueueType, size) } else { p.workers = newWorkerArray(stackType, 0) @@ -167,9 +168,9 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Tune changes the capacity of this pool. +// Tune changes the capacity of this pool, this method is noneffective to the infinite pool. func (p *Pool) Tune(size int) { - if size < 0 || p.Cap() == size || p.options.PreAlloc { + if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) @@ -214,10 +215,10 @@ func (p *Pool) retrieveWorker() (w *goWorker) { w = p.workers.detach() if w != nil { p.lock.Unlock() - } else if p.infinite { + } else if capacity := p.Cap(); capacity == -1 { p.lock.Unlock() spawnWorker() - } else if p.Running() < p.Cap() { + } else if p.Running() < capacity { p.lock.Unlock() spawnWorker() } else { @@ -251,7 +252,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *goWorker) bool { - if atomic.LoadInt32(&p.state) == CLOSED || (!p.infinite && p.Running() > p.Cap()) { + if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || atomic.LoadInt32(&p.state) == CLOSED { return false } worker.recycleTime = time.Now() diff --git a/pool_func.go b/pool_func.go index f7fa44b..099040d 100644 --- a/pool_func.go +++ b/pool_func.go @@ -185,7 +185,7 @@ func (p *PoolWithFunc) Cap() int { // Tune changes the capacity of this pool. func (p *PoolWithFunc) Tune(size int) { - if size < 0 || p.Cap() == size || p.options.PreAlloc { + if size <= 0 || size == p.Cap() || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size))