opt: fall back to LeastTasks when RoundRobin can't find a worker (#306)

Besides, update a few comments and add new benchmarks for multi-pool
This commit is contained in:
Andy Pan 2023-11-21 13:22:02 +08:00 committed by GitHub
parent 19bd1ea02b
commit fb82167503
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 64 additions and 20 deletions

View File

@ -138,6 +138,24 @@ func BenchmarkAntsPool(b *testing.B) {
} }
} }
func BenchmarkAntsMultiPool(b *testing.B) {
var wg sync.WaitGroup
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
}
}
func BenchmarkGoroutinesThroughput(b *testing.B) { func BenchmarkGoroutinesThroughput(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {
@ -170,3 +188,15 @@ func BenchmarkAntsPoolThroughput(b *testing.B) {
} }
} }
} }
func BenchmarkAntsMultiPoolThroughput(b *testing.B) {
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
_ = p.Submit(demoFunc)
}
}
}

View File

@ -44,7 +44,7 @@ const (
// MultiPool consists of multiple pools, from which you will benefit the // MultiPool consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces // performance improvement on basis of the fine-grained locking that reduces
// the lock contention. // the lock contention.
// MultiPool is a good fit for the scenario that you have a large number of // MultiPool is a good fit for the scenario where you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck. // tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPool struct { type MultiPool struct {
pools []*Pool pools []*Pool
@ -70,8 +70,8 @@ func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...O
return &MultiPool{pools: pools, lbs: lbs}, nil return &MultiPool{pools: pools, lbs: lbs}, nil
} }
func (mp *MultiPool) next() (idx int) { func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) {
switch mp.lbs { switch lbs {
case RoundRobin: case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0 idx = 0
@ -91,11 +91,17 @@ func (mp *MultiPool) next() (idx int) {
} }
// Submit submits a task to a pool selected by the load-balancing strategy. // Submit submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPool) Submit(task func()) error { func (mp *MultiPool) Submit(task func()) (err error) {
if mp.IsClosed() { if mp.IsClosed() {
return ErrPoolClosed return ErrPoolClosed
} }
return mp.pools[mp.next()].Submit(task) if err = mp.pools[mp.next(mp.lbs)].Submit(task); err == nil {
return
}
if err == ErrPoolOverload && mp.lbs == RoundRobin {
return mp.pools[mp.next(LeastTasks)].Submit(task)
}
return
} }
// Running returns the number of the currently running workers across all pools. // Running returns the number of the currently running workers across all pools.

View File

@ -33,7 +33,7 @@ import (
// MultiPoolWithFunc consists of multiple pools, from which you will benefit the // MultiPoolWithFunc consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces // performance improvement on basis of the fine-grained locking that reduces
// the lock contention. // the lock contention.
// MultiPoolWithFunc is a good fit for the scenario that you have a large number of // MultiPoolWithFunc is a good fit for the scenario where you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck. // tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPoolWithFunc struct { type MultiPoolWithFunc struct {
pools []*PoolWithFunc pools []*PoolWithFunc
@ -59,8 +59,8 @@ func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadB
return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil
} }
func (mp *MultiPoolWithFunc) next() (idx int) { func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
switch mp.lbs { switch lbs {
case RoundRobin: case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0 idx = 0
@ -80,11 +80,18 @@ func (mp *MultiPoolWithFunc) next() (idx int) {
} }
// Invoke submits a task to a pool selected by the load-balancing strategy. // Invoke submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPoolWithFunc) Invoke(args interface{}) error { func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) {
if mp.IsClosed() { if mp.IsClosed() {
return ErrPoolClosed return ErrPoolClosed
} }
return mp.pools[mp.next()].Invoke(args)
if err = mp.pools[mp.next(mp.lbs)].Invoke(args); err == nil {
return
}
if err == ErrPoolOverload && mp.lbs == RoundRobin {
return mp.pools[mp.next(LeastTasks)].Invoke(args)
}
return
} }
// Running returns the number of the currently running workers across all pools. // Running returns the number of the currently running workers across all pools.

11
pool.go
View File

@ -31,7 +31,8 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync" syncx "github.com/panjf2000/ants/v2/internal/sync"
) )
// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. // Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct { type Pool struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
@ -210,7 +211,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Submit submits a task to this pool. // Submit submits a task to this pool.
// //
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the latest // but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this, // Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true). // you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error { func (p *Pool) Submit(task func()) error {
@ -230,7 +231,7 @@ func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. // Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *Pool) Free() int { func (p *Pool) Free() int {
c := p.Cap() c := p.Cap()
if c < 0 { if c < 0 {
@ -239,7 +240,7 @@ func (p *Pool) Free() int {
return c - p.Running() return c - p.Running()
} }
// Waiting returns the number of tasks which are waiting be executed. // Waiting returns the number of tasks waiting to be executed.
func (p *Pool) Waiting() int { func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting)) return int(atomic.LoadInt32(&p.waiting))
} }
@ -339,7 +340,7 @@ retry:
return return
} }
// If the worker queue is empty and we don't run out of the pool capacity, // If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine. // then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock() p.lock.Unlock()

View File

@ -31,7 +31,7 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync" syncx "github.com/panjf2000/ants/v2/internal/sync"
) )
// PoolWithFunc accepts the tasks from client, // PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines. // it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct { type PoolWithFunc struct {
// capacity of the pool. // capacity of the pool.
@ -216,7 +216,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
// Invoke submits a task to pool. // Invoke submits a task to pool.
// //
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
// but what calls for special attention is that you will get blocked with the latest // but what calls for special attention is that you will get blocked with the last
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true). // you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
func (p *PoolWithFunc) Invoke(args interface{}) error { func (p *PoolWithFunc) Invoke(args interface{}) error {
@ -236,7 +236,7 @@ func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. // Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *PoolWithFunc) Free() int { func (p *PoolWithFunc) Free() int {
c := p.Cap() c := p.Cap()
if c < 0 { if c < 0 {
@ -245,7 +245,7 @@ func (p *PoolWithFunc) Free() int {
return c - p.Running() return c - p.Running()
} }
// Waiting returns the number of tasks which are waiting be executed. // Waiting returns the number of tasks waiting to be executed.
func (p *PoolWithFunc) Waiting() int { func (p *PoolWithFunc) Waiting() int {
return int(atomic.LoadInt32(&p.waiting)) return int(atomic.LoadInt32(&p.waiting))
} }
@ -345,7 +345,7 @@ retry:
return return
} }
// If the worker queue is empty and we don't run out of the pool capacity, // If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine. // then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock() p.lock.Unlock()