Merge pull request #307 from panjf2000/dev

ver: release 2.9.0
This commit is contained in:
Andy Pan 2023-11-21 13:43:56 +08:00 committed by GitHub
commit 5cecad0e71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 587 additions and 39 deletions

View File

@ -1,13 +1,5 @@
---
name: Pull request
about: Propose changes to the code
title: ''
labels: ''
assignees: ''
---
<!-- <!--
Thank you for contributing to `ants`! Please fill this out to help us make the most of your pull request. Thank you for contributing to `ants`! Please fill this out to help us review your pull request more efficiently.
Was this change discussed in an issue first? That can help save time in case the change is not a good fit for the project. Not all pull requests get merged. Was this change discussed in an issue first? That can help save time in case the change is not a good fit for the project. Not all pull requests get merged.

View File

@ -66,6 +66,12 @@ var (
// ErrTimeout will be returned after the operations timed out. // ErrTimeout will be returned after the operations timed out.
ErrTimeout = errors.New("operation timed out") ErrTimeout = errors.New("operation timed out")
// ErrInvalidPoolIndex will be returned when trying to retrieve a pool with an invalid index.
ErrInvalidPoolIndex = errors.New("invalid pool index")
// ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")
// workerChanCap determines whether the channel of a worker should be a buffered channel // workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at // to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139 // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139

View File

@ -25,6 +25,7 @@ package ants
import ( import (
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -47,18 +48,22 @@ func demoPoolFunc(args interface{}) {
time.Sleep(time.Duration(n) * time.Millisecond) time.Sleep(time.Duration(n) * time.Millisecond)
} }
var stopLongRunningFunc int32
func longRunningFunc() { func longRunningFunc() {
for { for atomic.LoadInt32(&stopLongRunningFunc) == 0 {
runtime.Gosched() runtime.Gosched()
} }
} }
var stopLongRunningPoolFunc int32
func longRunningPoolFunc(arg interface{}) { func longRunningPoolFunc(arg interface{}) {
if ch, ok := arg.(chan struct{}); ok { if ch, ok := arg.(chan struct{}); ok {
<-ch <-ch
return return
} }
for { for atomic.LoadInt32(&stopLongRunningPoolFunc) == 0 {
runtime.Gosched() runtime.Gosched()
} }
} }
@ -133,6 +138,24 @@ func BenchmarkAntsPool(b *testing.B) {
} }
} }
func BenchmarkAntsMultiPool(b *testing.B) {
var wg sync.WaitGroup
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
}
}
func BenchmarkGoroutinesThroughput(b *testing.B) { func BenchmarkGoroutinesThroughput(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {
@ -165,3 +188,15 @@ func BenchmarkAntsPoolThroughput(b *testing.B) {
} }
} }
} }
func BenchmarkAntsMultiPoolThroughput(b *testing.B) {
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
_ = p.Submit(demoFunc)
}
}
}

View File

@ -985,3 +985,113 @@ func TestDefaultPoolReleaseTimeout(t *testing.T) {
err := ReleaseTimeout(2 * time.Second) err := ReleaseTimeout(2 * time.Second)
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestMultiPool(t *testing.T) {
_, err := NewMultiPool(10, -1, 8)
assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy)
mp, err := NewMultiPool(10, 5, RoundRobin)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Submit(longRunningFunc)
assert.NoError(t, err)
}
assert.EqualValues(t, mp.Waiting(), 0)
_, err = mp.WaitingByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.WaitingByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Running())
_, err = mp.RunningByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.RunningByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 0, mp.Free())
_, err = mp.FreeByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.FreeByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Cap())
assert.False(t, mp.IsClosed())
for i := 0; i < 10; i++ {
n, _ := mp.WaitingByIndex(i)
assert.EqualValues(t, 0, n)
n, _ = mp.RunningByIndex(i)
assert.EqualValues(t, 5, n)
n, _ = mp.FreeByIndex(i)
assert.EqualValues(t, 0, n)
}
atomic.StoreInt32(&stopLongRunningFunc, 1)
assert.NoError(t, mp.ReleaseTimeout(3*time.Second))
assert.Zero(t, mp.Running())
assert.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 0)
}
testFn()
mp.Reboot()
testFn()
mp, err = NewMultiPool(10, 5, LeastTasks)
testFn()
mp.Reboot()
testFn()
mp.Tune(10)
}
func TestMultiPoolWithFunc(t *testing.T) {
_, err := NewMultiPoolWithFunc(10, -1, longRunningPoolFunc, 8)
assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy)
mp, err := NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, RoundRobin)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Invoke(i)
assert.NoError(t, err)
}
assert.EqualValues(t, mp.Waiting(), 0)
_, err = mp.WaitingByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.WaitingByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Running())
_, err = mp.RunningByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.RunningByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 0, mp.Free())
_, err = mp.FreeByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.FreeByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Cap())
assert.False(t, mp.IsClosed())
for i := 0; i < 10; i++ {
n, _ := mp.WaitingByIndex(i)
assert.EqualValues(t, 0, n)
n, _ = mp.RunningByIndex(i)
assert.EqualValues(t, 5, n)
n, _ = mp.FreeByIndex(i)
assert.EqualValues(t, 0, n)
}
atomic.StoreInt32(&stopLongRunningPoolFunc, 1)
assert.NoError(t, mp.ReleaseTimeout(3*time.Second))
assert.Zero(t, mp.Running())
assert.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningPoolFunc, 0)
}
testFn()
mp.Reboot()
testFn()
mp, err = NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, LeastTasks)
testFn()
mp.Reboot()
testFn()
mp.Tune(10)
}

211
multipool.go Normal file
View File

@ -0,0 +1,211 @@
// MIT License
// Copyright (c) 2023 Andy Pan
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
)
// LoadBalancingStrategy represents the type of load-balancing algorithm.
type LoadBalancingStrategy int
const (
// RoundRobin distributes task to a list of pools in rotation.
RoundRobin LoadBalancingStrategy = 1 << (iota + 1)
// LeastTasks always selects the pool with the least number of pending tasks.
LeastTasks
)
// MultiPool consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces
// the lock contention.
// MultiPool is a good fit for the scenario where you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPool struct {
pools []*Pool
index uint32
state int32
lbs LoadBalancingStrategy
}
// NewMultiPool instantiates a MultiPool with a size of the pool list and a size
// per pool, and the load-balancing strategy.
func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...Option) (*MultiPool, error) {
pools := make([]*Pool, size)
for i := 0; i < size; i++ {
pool, err := NewPool(sizePerPool, options...)
if err != nil {
return nil, err
}
pools[i] = pool
}
if lbs != RoundRobin && lbs != LeastTasks {
return nil, ErrInvalidLoadBalancingStrategy
}
return &MultiPool{pools: pools, lbs: lbs}, nil
}
func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) {
switch lbs {
case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0
}
return
case LeastTasks:
leastTasks := 1<<31 - 1
for i, pool := range mp.pools {
if n := pool.Running(); n < leastTasks {
leastTasks = n
idx = i
}
}
return
}
return -1
}
// Submit submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPool) Submit(task func()) (err error) {
if mp.IsClosed() {
return ErrPoolClosed
}
if err = mp.pools[mp.next(mp.lbs)].Submit(task); err == nil {
return
}
if err == ErrPoolOverload && mp.lbs == RoundRobin {
return mp.pools[mp.next(LeastTasks)].Submit(task)
}
return
}
// Running returns the number of the currently running workers across all pools.
func (mp *MultiPool) Running() (n int) {
for _, pool := range mp.pools {
n += pool.Running()
}
return
}
// RunningByIndex returns the number of the currently running workers in the specific pool.
func (mp *MultiPool) RunningByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Running(), nil
}
// Free returns the number of available workers across all pools.
func (mp *MultiPool) Free() (n int) {
for _, pool := range mp.pools {
n += pool.Free()
}
return
}
// FreeByIndex returns the number of available workers in the specific pool.
func (mp *MultiPool) FreeByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Free(), nil
}
// Waiting returns the number of the currently waiting tasks across all pools.
func (mp *MultiPool) Waiting() (n int) {
for _, pool := range mp.pools {
n += pool.Waiting()
}
return
}
// WaitingByIndex returns the number of the currently waiting tasks in the specific pool.
func (mp *MultiPool) WaitingByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Waiting(), nil
}
// Cap returns the capacity of this multi-pool.
func (mp *MultiPool) Cap() (n int) {
for _, pool := range mp.pools {
n += pool.Cap()
}
return
}
// Tune resizes each pool in multi-pool.
//
// Note that this method doesn't resize the overall
// capacity of multi-pool.
func (mp *MultiPool) Tune(size int) {
for _, pool := range mp.pools {
pool.Tune(size)
}
}
// IsClosed indicates whether the multi-pool is closed.
func (mp *MultiPool) IsClosed() bool {
return atomic.LoadInt32(&mp.state) == CLOSED
}
// ReleaseTimeout closes the multi-pool with a timeout,
// it waits all pools to be closed before timing out.
func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error {
if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) {
return ErrPoolClosed
}
var errStr strings.Builder
for i, pool := range mp.pools {
if err := pool.ReleaseTimeout(timeout); err != nil {
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
if i < len(mp.pools)-1 {
errStr.WriteString(" | ")
}
return err
}
}
if errStr.Len() == 0 {
return nil
}
return errors.New(errStr.String())
}
// Reboot reboots a released multi-pool.
func (mp *MultiPool) Reboot() {
if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) {
atomic.StoreUint32(&mp.index, 0)
for _, pool := range mp.pools {
pool.Reboot()
}
}
}

201
multipool_func.go Normal file
View File

@ -0,0 +1,201 @@
// MIT License
// Copyright (c) 2023 Andy Pan
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
)
// MultiPoolWithFunc consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces
// the lock contention.
// MultiPoolWithFunc is a good fit for the scenario where you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPoolWithFunc struct {
pools []*PoolWithFunc
index uint32
state int32
lbs LoadBalancingStrategy
}
// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size
// per pool, and the load-balancing strategy.
func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
pools := make([]*PoolWithFunc, size)
for i := 0; i < size; i++ {
pool, err := NewPoolWithFunc(sizePerPool, fn, options...)
if err != nil {
return nil, err
}
pools[i] = pool
}
if lbs != RoundRobin && lbs != LeastTasks {
return nil, ErrInvalidLoadBalancingStrategy
}
return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil
}
func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) {
switch lbs {
case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0
}
return
case LeastTasks:
leastTasks := 1<<31 - 1
for i, pool := range mp.pools {
if n := pool.Running(); n < leastTasks {
leastTasks = n
idx = i
}
}
return
}
return -1
}
// Invoke submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) {
if mp.IsClosed() {
return ErrPoolClosed
}
if err = mp.pools[mp.next(mp.lbs)].Invoke(args); err == nil {
return
}
if err == ErrPoolOverload && mp.lbs == RoundRobin {
return mp.pools[mp.next(LeastTasks)].Invoke(args)
}
return
}
// Running returns the number of the currently running workers across all pools.
func (mp *MultiPoolWithFunc) Running() (n int) {
for _, pool := range mp.pools {
n += pool.Running()
}
return
}
// RunningByIndex returns the number of the currently running workers in the specific pool.
func (mp *MultiPoolWithFunc) RunningByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Running(), nil
}
// Free returns the number of available workers across all pools.
func (mp *MultiPoolWithFunc) Free() (n int) {
for _, pool := range mp.pools {
n += pool.Free()
}
return
}
// FreeByIndex returns the number of available workers in the specific pool.
func (mp *MultiPoolWithFunc) FreeByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Free(), nil
}
// Waiting returns the number of the currently waiting tasks across all pools.
func (mp *MultiPoolWithFunc) Waiting() (n int) {
for _, pool := range mp.pools {
n += pool.Waiting()
}
return
}
// WaitingByIndex returns the number of the currently waiting tasks in the specific pool.
func (mp *MultiPoolWithFunc) WaitingByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Waiting(), nil
}
// Cap returns the capacity of this multi-pool.
func (mp *MultiPoolWithFunc) Cap() (n int) {
for _, pool := range mp.pools {
n += pool.Cap()
}
return
}
// Tune resizes each pool in multi-pool.
//
// Note that this method doesn't resize the overall
// capacity of multi-pool.
func (mp *MultiPoolWithFunc) Tune(size int) {
for _, pool := range mp.pools {
pool.Tune(size)
}
}
// IsClosed indicates whether the multi-pool is closed.
func (mp *MultiPoolWithFunc) IsClosed() bool {
return atomic.LoadInt32(&mp.state) == CLOSED
}
// ReleaseTimeout closes the multi-pool with a timeout,
// it waits all pools to be closed before timing out.
func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) {
return ErrPoolClosed
}
var errStr strings.Builder
for i, pool := range mp.pools {
if err := pool.ReleaseTimeout(timeout); err != nil {
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
if i < len(mp.pools)-1 {
errStr.WriteString(" | ")
}
return err
}
}
if errStr.Len() == 0 {
return nil
}
return errors.New(errStr.String())
}
// Reboot reboots a released multi-pool.
func (mp *MultiPoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) {
atomic.StoreUint32(&mp.index, 0)
for _, pool := range mp.pools {
pool.Reboot()
}
}
}

13
pool.go
View File

@ -31,7 +31,8 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync" syncx "github.com/panjf2000/ants/v2/internal/sync"
) )
// Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. // Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct { type Pool struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
@ -159,7 +160,7 @@ func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time) return p.now.Load().(time.Time)
} }
// NewPool generates an instance of ants pool. // NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) { func NewPool(size int, options ...Option) (*Pool, error) {
if size <= 0 { if size <= 0 {
size = -1 size = -1
@ -210,7 +211,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Submit submits a task to this pool. // Submit submits a task to this pool.
// //
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the latest // but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this, // Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true). // you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error { func (p *Pool) Submit(task func()) error {
@ -230,7 +231,7 @@ func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. // Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *Pool) Free() int { func (p *Pool) Free() int {
c := p.Cap() c := p.Cap()
if c < 0 { if c < 0 {
@ -239,7 +240,7 @@ func (p *Pool) Free() int {
return c - p.Running() return c - p.Running()
} }
// Waiting returns the number of tasks which are waiting be executed. // Waiting returns the number of tasks waiting to be executed.
func (p *Pool) Waiting() int { func (p *Pool) Waiting() int {
return int(atomic.LoadInt32(&p.waiting)) return int(atomic.LoadInt32(&p.waiting))
} }
@ -339,7 +340,7 @@ retry:
return return
} }
// If the worker queue is empty and we don't run out of the pool capacity, // If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine. // then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock() p.lock.Unlock()

View File

@ -31,7 +31,7 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync" syncx "github.com/panjf2000/ants/v2/internal/sync"
) )
// PoolWithFunc accepts the tasks from client, // PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines. // it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct { type PoolWithFunc struct {
// capacity of the pool. // capacity of the pool.
@ -160,7 +160,7 @@ func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time) return p.now.Load().(time.Time)
} }
// NewPoolWithFunc generates an instance of ants pool with a specific function. // NewPoolWithFunc instantiates a PoolWithFunc with customized options.
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
if size <= 0 { if size <= 0 {
size = -1 size = -1
@ -216,7 +216,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
// Invoke submits a task to pool. // Invoke submits a task to pool.
// //
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
// but what calls for special attention is that you will get blocked with the latest // but what calls for special attention is that you will get blocked with the last
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true). // you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
func (p *PoolWithFunc) Invoke(args interface{}) error { func (p *PoolWithFunc) Invoke(args interface{}) error {
@ -236,7 +236,7 @@ func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. // Free returns the number of available workers, -1 indicates this pool is unlimited.
func (p *PoolWithFunc) Free() int { func (p *PoolWithFunc) Free() int {
c := p.Cap() c := p.Cap()
if c < 0 { if c < 0 {
@ -245,7 +245,7 @@ func (p *PoolWithFunc) Free() int {
return c - p.Running() return c - p.Running()
} }
// Waiting returns the number of tasks which are waiting be executed. // Waiting returns the number of tasks waiting to be executed.
func (p *PoolWithFunc) Waiting() int { func (p *PoolWithFunc) Waiting() int {
return int(atomic.LoadInt32(&p.waiting)) return int(atomic.LoadInt32(&p.waiting))
} }
@ -345,7 +345,7 @@ retry:
return return
} }
// If the worker queue is empty and we don't run out of the pool capacity, // If the worker queue is empty, and we don't run out of the pool capacity,
// then just spawn a new worker goroutine. // then just spawn a new worker goroutine.
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock() p.lock.Unlock()

View File

@ -19,16 +19,13 @@ func newWorkerLoopQueue(size int) *loopQueue {
} }
func (wq *loopQueue) len() int { func (wq *loopQueue) len() int {
if wq.size == 0 { if wq.size == 0 || wq.isEmpty() {
return 0 return 0
} }
if wq.head == wq.tail { if wq.head == wq.tail && wq.isFull {
if wq.isFull {
return wq.size return wq.size
} }
return 0
}
if wq.tail > wq.head { if wq.tail > wq.head {
return wq.tail - wq.head return wq.tail - wq.head
@ -50,11 +47,8 @@ func (wq *loopQueue) insert(w worker) error {
return errQueueIsFull return errQueueIsFull
} }
wq.items[wq.tail] = w wq.items[wq.tail] = w
wq.tail++ wq.tail = (wq.tail + 1) % wq.size
if wq.tail == wq.size {
wq.tail = 0
}
if wq.tail == wq.head { if wq.tail == wq.head {
wq.isFull = true wq.isFull = true
} }
@ -69,10 +63,8 @@ func (wq *loopQueue) detach() worker {
w := wq.items[wq.head] w := wq.items[wq.head]
wq.items[wq.head] = nil wq.items[wq.head] = nil
wq.head++ wq.head = (wq.head + 1) % wq.size
if wq.head == wq.size {
wq.head = 0
}
wq.isFull = false wq.isFull = false
return w return w
@ -134,7 +126,7 @@ func (wq *loopQueue) binarySearch(expiryTime time.Time) int {
basel = wq.head basel = wq.head
l := 0 l := 0
for l <= r { for l <= r {
mid = l + ((r - l) >> 1) mid = l + ((r - l) >> 1) // avoid overflow when computing mid
// calculate true mid position from mapped mid position // calculate true mid position from mapped mid position
tmid = (mid + basel + nlen) % nlen tmid = (mid + basel + nlen) % nlen
if expiryTime.Before(wq.items[tmid].lastUsedTime()) { if expiryTime.Before(wq.items[tmid].lastUsedTime()) {

View File

@ -62,7 +62,7 @@ func (wq *workerStack) refresh(duration time.Duration) []worker {
func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int { func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
for l <= r { for l <= r {
mid := int(uint(l+r) >> 1) // avoid overflow when computing mid mid := l + ((r - l) >> 1) // avoid overflow when computing mid
if expiryTime.Before(wq.items[mid].lastUsedTime()) { if expiryTime.Before(wq.items[mid].lastUsedTime()) {
r = mid - 1 r = mid - 1
} else { } else {