mirror of https://github.com/panjf2000/ants.git
parent
1d11f39375
commit
ad86bfa6d2
7
ants.go
7
ants.go
|
@ -51,7 +51,8 @@ var (
|
||||||
// Error types for the Ants API.
|
// Error types for the Ants API.
|
||||||
//---------------------------------------------------------------------------
|
//---------------------------------------------------------------------------
|
||||||
|
|
||||||
// ErrInvalidPoolSize will be returned when setting a negative number as pool capacity.
|
// ErrInvalidPoolSize will be returned when setting a negative number as pool capacity, this error will be only used
|
||||||
|
// by pool with func because pool without func can be infinite by setting up a negative capacity.
|
||||||
ErrInvalidPoolSize = errors.New("invalid size for pool")
|
ErrInvalidPoolSize = errors.New("invalid size for pool")
|
||||||
|
|
||||||
// ErrLackPoolFunc will be returned when invokers don't provide function for pool.
|
// ErrLackPoolFunc will be returned when invokers don't provide function for pool.
|
||||||
|
@ -65,6 +66,10 @@ var (
|
||||||
|
|
||||||
// ErrPoolOverload will be returned when the pool is full and no workers available.
|
// ErrPoolOverload will be returned when the pool is full and no workers available.
|
||||||
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")
|
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")
|
||||||
|
|
||||||
|
// ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode.
|
||||||
|
ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode")
|
||||||
|
|
||||||
//---------------------------------------------------------------------------
|
//---------------------------------------------------------------------------
|
||||||
|
|
||||||
// workerChanCap determines whether the channel of a worker should be a buffered channel
|
// workerChanCap determines whether the channel of a worker should be a buffered channel
|
||||||
|
|
|
@ -547,6 +547,15 @@ func TestInfinitePool(t *testing.T) {
|
||||||
if n := p.Running(); n != 2 {
|
if n := p.Running(); n != 2 {
|
||||||
t.Errorf("expect 2 workers running, but got %d", n)
|
t.Errorf("expect 2 workers running, but got %d", n)
|
||||||
}
|
}
|
||||||
|
p.Tune(10)
|
||||||
|
if capacity := p.Cap(); capacity != -1 {
|
||||||
|
t.Fatalf("expect capacity: -1 but got %d", capacity)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
p, err = NewPool(-1, WithPreAlloc(true))
|
||||||
|
if err != ErrInvalidPreAllocSize {
|
||||||
|
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRestCodeCoverage(t *testing.T) {
|
func TestRestCodeCoverage(t *testing.T) {
|
||||||
|
|
29
pool.go
29
pool.go
|
@ -32,7 +32,9 @@ import (
|
||||||
|
|
||||||
// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
|
// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
// capacity of the pool.
|
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
|
||||||
|
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
|
||||||
|
// which submits a new task to the same pool.
|
||||||
capacity int32
|
capacity int32
|
||||||
|
|
||||||
// running is the number of the currently running goroutines.
|
// running is the number of the currently running goroutines.
|
||||||
|
@ -56,11 +58,6 @@ type Pool struct {
|
||||||
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
|
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
|
||||||
blockingNum int
|
blockingNum int
|
||||||
|
|
||||||
// infinite indicates whether the pool capacity is limitless, an infinite pool is used to avoid
|
|
||||||
// potential the endless blocking caused by nested usage of a pool: submitting a task to pool
|
|
||||||
// which submits a new task to the same pool.
|
|
||||||
infinite bool
|
|
||||||
|
|
||||||
options *Options
|
options *Options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,6 +96,10 @@ func (p *Pool) periodicallyPurge() {
|
||||||
func NewPool(size int, options ...Option) (*Pool, error) {
|
func NewPool(size int, options ...Option) (*Pool, error) {
|
||||||
opts := loadOptions(options...)
|
opts := loadOptions(options...)
|
||||||
|
|
||||||
|
if size <= 0 {
|
||||||
|
size = -1
|
||||||
|
}
|
||||||
|
|
||||||
if expiry := opts.ExpiryDuration; expiry < 0 {
|
if expiry := opts.ExpiryDuration; expiry < 0 {
|
||||||
return nil, ErrInvalidPoolExpiry
|
return nil, ErrInvalidPoolExpiry
|
||||||
} else if expiry == 0 {
|
} else if expiry == 0 {
|
||||||
|
@ -120,10 +121,10 @@ func NewPool(size int, options ...Option) (*Pool, error) {
|
||||||
task: make(chan func(), workerChanCap),
|
task: make(chan func(), workerChanCap),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if size <= 0 {
|
|
||||||
p.infinite = true
|
|
||||||
}
|
|
||||||
if p.options.PreAlloc {
|
if p.options.PreAlloc {
|
||||||
|
if size == -1 {
|
||||||
|
return nil, ErrInvalidPreAllocSize
|
||||||
|
}
|
||||||
p.workers = newWorkerArray(loopQueueType, size)
|
p.workers = newWorkerArray(loopQueueType, size)
|
||||||
} else {
|
} else {
|
||||||
p.workers = newWorkerArray(stackType, 0)
|
p.workers = newWorkerArray(stackType, 0)
|
||||||
|
@ -167,9 +168,9 @@ func (p *Pool) Cap() int {
|
||||||
return int(atomic.LoadInt32(&p.capacity))
|
return int(atomic.LoadInt32(&p.capacity))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tune changes the capacity of this pool.
|
// Tune changes the capacity of this pool, this method is noneffective to the infinite pool.
|
||||||
func (p *Pool) Tune(size int) {
|
func (p *Pool) Tune(size int) {
|
||||||
if size < 0 || p.Cap() == size || p.options.PreAlloc {
|
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
atomic.StoreInt32(&p.capacity, int32(size))
|
atomic.StoreInt32(&p.capacity, int32(size))
|
||||||
|
@ -214,10 +215,10 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
|
||||||
w = p.workers.detach()
|
w = p.workers.detach()
|
||||||
if w != nil {
|
if w != nil {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
} else if p.infinite {
|
} else if capacity := p.Cap(); capacity == -1 {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
spawnWorker()
|
spawnWorker()
|
||||||
} else if p.Running() < p.Cap() {
|
} else if p.Running() < capacity {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
spawnWorker()
|
spawnWorker()
|
||||||
} else {
|
} else {
|
||||||
|
@ -251,7 +252,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
|
||||||
|
|
||||||
// revertWorker puts a worker back into free pool, recycling the goroutines.
|
// revertWorker puts a worker back into free pool, recycling the goroutines.
|
||||||
func (p *Pool) revertWorker(worker *goWorker) bool {
|
func (p *Pool) revertWorker(worker *goWorker) bool {
|
||||||
if atomic.LoadInt32(&p.state) == CLOSED || (!p.infinite && p.Running() > p.Cap()) {
|
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || atomic.LoadInt32(&p.state) == CLOSED {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
worker.recycleTime = time.Now()
|
worker.recycleTime = time.Now()
|
||||||
|
|
|
@ -185,7 +185,7 @@ func (p *PoolWithFunc) Cap() int {
|
||||||
|
|
||||||
// Tune changes the capacity of this pool.
|
// Tune changes the capacity of this pool.
|
||||||
func (p *PoolWithFunc) Tune(size int) {
|
func (p *PoolWithFunc) Tune(size int) {
|
||||||
if size < 0 || p.Cap() == size || p.options.PreAlloc {
|
if size <= 0 || size == p.Cap() || p.options.PreAlloc {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
atomic.StoreInt32(&p.capacity, int32(size))
|
atomic.StoreInt32(&p.capacity, int32(size))
|
||||||
|
|
Loading…
Reference in New Issue