diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 8f1280b..83cd920 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -138,6 +138,24 @@ func BenchmarkAntsPool(b *testing.B) { } } +func BenchmarkAntsMultiPool(b *testing.B) { + var wg sync.WaitGroup + p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime)) + defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(RunTimes) + for j := 0; j < RunTimes; j++ { + _ = p.Submit(func() { + demoFunc() + wg.Done() + }) + } + wg.Wait() + } +} + func BenchmarkGoroutinesThroughput(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { @@ -170,3 +188,15 @@ func BenchmarkAntsPoolThroughput(b *testing.B) { } } } + +func BenchmarkAntsMultiPoolThroughput(b *testing.B) { + p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime)) + defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < RunTimes; j++ { + _ = p.Submit(demoFunc) + } + } +} diff --git a/multipool.go b/multipool.go index 4e17f27..1e8b27b 100644 --- a/multipool.go +++ b/multipool.go @@ -44,7 +44,7 @@ const ( // MultiPool consists of multiple pools, from which you will benefit the // performance improvement on basis of the fine-grained locking that reduces // the lock contention. -// MultiPool is a good fit for the scenario that you have a large number of +// MultiPool is a good fit for the scenario where you have a large number of // tasks to submit, and you don't want the single pool to be the bottleneck. type MultiPool struct { pools []*Pool @@ -70,8 +70,8 @@ func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...O return &MultiPool{pools: pools, lbs: lbs}, nil } -func (mp *MultiPool) next() (idx int) { - switch mp.lbs { +func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) { + switch lbs { case RoundRobin: if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { idx = 0 @@ -91,11 +91,17 @@ func (mp *MultiPool) next() (idx int) { } // Submit submits a task to a pool selected by the load-balancing strategy. -func (mp *MultiPool) Submit(task func()) error { +func (mp *MultiPool) Submit(task func()) (err error) { if mp.IsClosed() { return ErrPoolClosed } - return mp.pools[mp.next()].Submit(task) + if err = mp.pools[mp.next(mp.lbs)].Submit(task); err == nil { + return + } + if err == ErrPoolOverload && mp.lbs == RoundRobin { + return mp.pools[mp.next(LeastTasks)].Submit(task) + } + return } // Running returns the number of the currently running workers across all pools. diff --git a/multipool_func.go b/multipool_func.go index 627ab78..8101d31 100644 --- a/multipool_func.go +++ b/multipool_func.go @@ -33,7 +33,7 @@ import ( // MultiPoolWithFunc consists of multiple pools, from which you will benefit the // performance improvement on basis of the fine-grained locking that reduces // the lock contention. -// MultiPoolWithFunc is a good fit for the scenario that you have a large number of +// MultiPoolWithFunc is a good fit for the scenario where you have a large number of // tasks to submit, and you don't want the single pool to be the bottleneck. type MultiPoolWithFunc struct { pools []*PoolWithFunc @@ -59,8 +59,8 @@ func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadB return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil } -func (mp *MultiPoolWithFunc) next() (idx int) { - switch mp.lbs { +func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) { + switch lbs { case RoundRobin: if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { idx = 0 @@ -80,11 +80,18 @@ func (mp *MultiPoolWithFunc) next() (idx int) { } // Invoke submits a task to a pool selected by the load-balancing strategy. -func (mp *MultiPoolWithFunc) Invoke(args interface{}) error { +func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) { if mp.IsClosed() { return ErrPoolClosed } - return mp.pools[mp.next()].Invoke(args) + + if err = mp.pools[mp.next(mp.lbs)].Invoke(args); err == nil { + return + } + if err == ErrPoolOverload && mp.lbs == RoundRobin { + return mp.pools[mp.next(LeastTasks)].Invoke(args) + } + return } // Running returns the number of the currently running workers across all pools. diff --git a/pool.go b/pool.go index 0475430..c69dde5 100644 --- a/pool.go +++ b/pool.go @@ -31,7 +31,8 @@ import ( syncx "github.com/panjf2000/ants/v2/internal/sync" ) -// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. +// Pool accepts the tasks and process them concurrently, +// it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool @@ -210,7 +211,7 @@ func NewPool(size int, options ...Option) (*Pool, error) { // Submit submits a task to this pool. // // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), -// but what calls for special attention is that you will get blocked with the latest +// but what calls for special attention is that you will get blocked with the last // Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this, // you should instantiate a Pool with ants.WithNonblocking(true). func (p *Pool) Submit(task func()) error { @@ -230,7 +231,7 @@ func (p *Pool) Running() int { return int(atomic.LoadInt32(&p.running)) } -// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. +// Free returns the number of available workers, -1 indicates this pool is unlimited. func (p *Pool) Free() int { c := p.Cap() if c < 0 { @@ -239,7 +240,7 @@ func (p *Pool) Free() int { return c - p.Running() } -// Waiting returns the number of tasks which are waiting be executed. +// Waiting returns the number of tasks waiting to be executed. func (p *Pool) Waiting() int { return int(atomic.LoadInt32(&p.waiting)) } @@ -339,7 +340,7 @@ retry: return } - // If the worker queue is empty and we don't run out of the pool capacity, + // If the worker queue is empty, and we don't run out of the pool capacity, // then just spawn a new worker goroutine. if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() diff --git a/pool_func.go b/pool_func.go index 08fd31c..6840384 100644 --- a/pool_func.go +++ b/pool_func.go @@ -31,7 +31,7 @@ import ( syncx "github.com/panjf2000/ants/v2/internal/sync" ) -// PoolWithFunc accepts the tasks from client, +// PoolWithFunc accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. @@ -216,7 +216,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi // Invoke submits a task to pool. // // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), -// but what calls for special attention is that you will get blocked with the latest +// but what calls for special attention is that you will get blocked with the last // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // you should instantiate a PoolWithFunc with ants.WithNonblocking(true). func (p *PoolWithFunc) Invoke(args interface{}) error { @@ -236,7 +236,7 @@ func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } -// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. +// Free returns the number of available workers, -1 indicates this pool is unlimited. func (p *PoolWithFunc) Free() int { c := p.Cap() if c < 0 { @@ -245,7 +245,7 @@ func (p *PoolWithFunc) Free() int { return c - p.Running() } -// Waiting returns the number of tasks which are waiting be executed. +// Waiting returns the number of tasks waiting to be executed. func (p *PoolWithFunc) Waiting() int { return int(atomic.LoadInt32(&p.waiting)) } @@ -345,7 +345,7 @@ retry: return } - // If the worker queue is empty and we don't run out of the pool capacity, + // If the worker queue is empty, and we don't run out of the pool capacity, // then just spawn a new worker goroutine. if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock()