feat: add MultiPool and MultiPoolWithFunc (#305)

This commit is contained in:
Andy Pan 2023-11-21 11:53:46 +08:00 committed by GitHub
parent 27685ba408
commit 19bd1ea02b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 524 additions and 4 deletions

View File

@ -66,6 +66,12 @@ var (
// ErrTimeout will be returned after the operations timed out. // ErrTimeout will be returned after the operations timed out.
ErrTimeout = errors.New("operation timed out") ErrTimeout = errors.New("operation timed out")
// ErrInvalidPoolIndex will be returned when trying to retrieve a pool with an invalid index.
ErrInvalidPoolIndex = errors.New("invalid pool index")
// ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")
// workerChanCap determines whether the channel of a worker should be a buffered channel // workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at // to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139 // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139

View File

@ -25,6 +25,7 @@ package ants
import ( import (
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -47,18 +48,22 @@ func demoPoolFunc(args interface{}) {
time.Sleep(time.Duration(n) * time.Millisecond) time.Sleep(time.Duration(n) * time.Millisecond)
} }
var stopLongRunningFunc int32
func longRunningFunc() { func longRunningFunc() {
for { for atomic.LoadInt32(&stopLongRunningFunc) == 0 {
runtime.Gosched() runtime.Gosched()
} }
} }
var stopLongRunningPoolFunc int32
func longRunningPoolFunc(arg interface{}) { func longRunningPoolFunc(arg interface{}) {
if ch, ok := arg.(chan struct{}); ok { if ch, ok := arg.(chan struct{}); ok {
<-ch <-ch
return return
} }
for { for atomic.LoadInt32(&stopLongRunningPoolFunc) == 0 {
runtime.Gosched() runtime.Gosched()
} }
} }

View File

@ -985,3 +985,113 @@ func TestDefaultPoolReleaseTimeout(t *testing.T) {
err := ReleaseTimeout(2 * time.Second) err := ReleaseTimeout(2 * time.Second)
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestMultiPool(t *testing.T) {
_, err := NewMultiPool(10, -1, 8)
assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy)
mp, err := NewMultiPool(10, 5, RoundRobin)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Submit(longRunningFunc)
assert.NoError(t, err)
}
assert.EqualValues(t, mp.Waiting(), 0)
_, err = mp.WaitingByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.WaitingByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Running())
_, err = mp.RunningByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.RunningByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 0, mp.Free())
_, err = mp.FreeByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.FreeByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Cap())
assert.False(t, mp.IsClosed())
for i := 0; i < 10; i++ {
n, _ := mp.WaitingByIndex(i)
assert.EqualValues(t, 0, n)
n, _ = mp.RunningByIndex(i)
assert.EqualValues(t, 5, n)
n, _ = mp.FreeByIndex(i)
assert.EqualValues(t, 0, n)
}
atomic.StoreInt32(&stopLongRunningFunc, 1)
assert.NoError(t, mp.ReleaseTimeout(3*time.Second))
assert.Zero(t, mp.Running())
assert.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 0)
}
testFn()
mp.Reboot()
testFn()
mp, err = NewMultiPool(10, 5, LeastTasks)
testFn()
mp.Reboot()
testFn()
mp.Tune(10)
}
func TestMultiPoolWithFunc(t *testing.T) {
_, err := NewMultiPoolWithFunc(10, -1, longRunningPoolFunc, 8)
assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy)
mp, err := NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, RoundRobin)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Invoke(i)
assert.NoError(t, err)
}
assert.EqualValues(t, mp.Waiting(), 0)
_, err = mp.WaitingByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.WaitingByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Running())
_, err = mp.RunningByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.RunningByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 0, mp.Free())
_, err = mp.FreeByIndex(-1)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
_, err = mp.FreeByIndex(11)
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
assert.EqualValues(t, 50, mp.Cap())
assert.False(t, mp.IsClosed())
for i := 0; i < 10; i++ {
n, _ := mp.WaitingByIndex(i)
assert.EqualValues(t, 0, n)
n, _ = mp.RunningByIndex(i)
assert.EqualValues(t, 5, n)
n, _ = mp.FreeByIndex(i)
assert.EqualValues(t, 0, n)
}
atomic.StoreInt32(&stopLongRunningPoolFunc, 1)
assert.NoError(t, mp.ReleaseTimeout(3*time.Second))
assert.Zero(t, mp.Running())
assert.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningPoolFunc, 0)
}
testFn()
mp.Reboot()
testFn()
mp, err = NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, LeastTasks)
testFn()
mp.Reboot()
testFn()
mp.Tune(10)
}

205
multipool.go Normal file
View File

@ -0,0 +1,205 @@
// MIT License
// Copyright (c) 2023 Andy Pan
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
)
// LoadBalancingStrategy represents the type of load-balancing algorithm.
type LoadBalancingStrategy int
const (
// RoundRobin distributes task to a list of pools in rotation.
RoundRobin LoadBalancingStrategy = 1 << (iota + 1)
// LeastTasks always selects the pool with the least number of pending tasks.
LeastTasks
)
// MultiPool consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces
// the lock contention.
// MultiPool is a good fit for the scenario that you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPool struct {
pools []*Pool
index uint32
state int32
lbs LoadBalancingStrategy
}
// NewMultiPool instantiates a MultiPool with a size of the pool list and a size
// per pool, and the load-balancing strategy.
func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...Option) (*MultiPool, error) {
pools := make([]*Pool, size)
for i := 0; i < size; i++ {
pool, err := NewPool(sizePerPool, options...)
if err != nil {
return nil, err
}
pools[i] = pool
}
if lbs != RoundRobin && lbs != LeastTasks {
return nil, ErrInvalidLoadBalancingStrategy
}
return &MultiPool{pools: pools, lbs: lbs}, nil
}
func (mp *MultiPool) next() (idx int) {
switch mp.lbs {
case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0
}
return
case LeastTasks:
leastTasks := 1<<31 - 1
for i, pool := range mp.pools {
if n := pool.Running(); n < leastTasks {
leastTasks = n
idx = i
}
}
return
}
return -1
}
// Submit submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPool) Submit(task func()) error {
if mp.IsClosed() {
return ErrPoolClosed
}
return mp.pools[mp.next()].Submit(task)
}
// Running returns the number of the currently running workers across all pools.
func (mp *MultiPool) Running() (n int) {
for _, pool := range mp.pools {
n += pool.Running()
}
return
}
// RunningByIndex returns the number of the currently running workers in the specific pool.
func (mp *MultiPool) RunningByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Running(), nil
}
// Free returns the number of available workers across all pools.
func (mp *MultiPool) Free() (n int) {
for _, pool := range mp.pools {
n += pool.Free()
}
return
}
// FreeByIndex returns the number of available workers in the specific pool.
func (mp *MultiPool) FreeByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Free(), nil
}
// Waiting returns the number of the currently waiting tasks across all pools.
func (mp *MultiPool) Waiting() (n int) {
for _, pool := range mp.pools {
n += pool.Waiting()
}
return
}
// WaitingByIndex returns the number of the currently waiting tasks in the specific pool.
func (mp *MultiPool) WaitingByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Waiting(), nil
}
// Cap returns the capacity of this multi-pool.
func (mp *MultiPool) Cap() (n int) {
for _, pool := range mp.pools {
n += pool.Cap()
}
return
}
// Tune resizes each pool in multi-pool.
//
// Note that this method doesn't resize the overall
// capacity of multi-pool.
func (mp *MultiPool) Tune(size int) {
for _, pool := range mp.pools {
pool.Tune(size)
}
}
// IsClosed indicates whether the multi-pool is closed.
func (mp *MultiPool) IsClosed() bool {
return atomic.LoadInt32(&mp.state) == CLOSED
}
// ReleaseTimeout closes the multi-pool with a timeout,
// it waits all pools to be closed before timing out.
func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error {
if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) {
return ErrPoolClosed
}
var errStr strings.Builder
for i, pool := range mp.pools {
if err := pool.ReleaseTimeout(timeout); err != nil {
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
if i < len(mp.pools)-1 {
errStr.WriteString(" | ")
}
return err
}
}
if errStr.Len() == 0 {
return nil
}
return errors.New(errStr.String())
}
// Reboot reboots a released multi-pool.
func (mp *MultiPool) Reboot() {
if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) {
atomic.StoreUint32(&mp.index, 0)
for _, pool := range mp.pools {
pool.Reboot()
}
}
}

194
multipool_func.go Normal file
View File

@ -0,0 +1,194 @@
// MIT License
// Copyright (c) 2023 Andy Pan
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
)
// MultiPoolWithFunc consists of multiple pools, from which you will benefit the
// performance improvement on basis of the fine-grained locking that reduces
// the lock contention.
// MultiPoolWithFunc is a good fit for the scenario that you have a large number of
// tasks to submit, and you don't want the single pool to be the bottleneck.
type MultiPoolWithFunc struct {
pools []*PoolWithFunc
index uint32
state int32
lbs LoadBalancingStrategy
}
// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size
// per pool, and the load-balancing strategy.
func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) {
pools := make([]*PoolWithFunc, size)
for i := 0; i < size; i++ {
pool, err := NewPoolWithFunc(sizePerPool, fn, options...)
if err != nil {
return nil, err
}
pools[i] = pool
}
if lbs != RoundRobin && lbs != LeastTasks {
return nil, ErrInvalidLoadBalancingStrategy
}
return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil
}
func (mp *MultiPoolWithFunc) next() (idx int) {
switch mp.lbs {
case RoundRobin:
if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 {
idx = 0
}
return
case LeastTasks:
leastTasks := 1<<31 - 1
for i, pool := range mp.pools {
if n := pool.Running(); n < leastTasks {
leastTasks = n
idx = i
}
}
return
}
return -1
}
// Invoke submits a task to a pool selected by the load-balancing strategy.
func (mp *MultiPoolWithFunc) Invoke(args interface{}) error {
if mp.IsClosed() {
return ErrPoolClosed
}
return mp.pools[mp.next()].Invoke(args)
}
// Running returns the number of the currently running workers across all pools.
func (mp *MultiPoolWithFunc) Running() (n int) {
for _, pool := range mp.pools {
n += pool.Running()
}
return
}
// RunningByIndex returns the number of the currently running workers in the specific pool.
func (mp *MultiPoolWithFunc) RunningByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Running(), nil
}
// Free returns the number of available workers across all pools.
func (mp *MultiPoolWithFunc) Free() (n int) {
for _, pool := range mp.pools {
n += pool.Free()
}
return
}
// FreeByIndex returns the number of available workers in the specific pool.
func (mp *MultiPoolWithFunc) FreeByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Free(), nil
}
// Waiting returns the number of the currently waiting tasks across all pools.
func (mp *MultiPoolWithFunc) Waiting() (n int) {
for _, pool := range mp.pools {
n += pool.Waiting()
}
return
}
// WaitingByIndex returns the number of the currently waiting tasks in the specific pool.
func (mp *MultiPoolWithFunc) WaitingByIndex(idx int) (int, error) {
if idx < 0 || idx >= len(mp.pools) {
return -1, ErrInvalidPoolIndex
}
return mp.pools[idx].Waiting(), nil
}
// Cap returns the capacity of this multi-pool.
func (mp *MultiPoolWithFunc) Cap() (n int) {
for _, pool := range mp.pools {
n += pool.Cap()
}
return
}
// Tune resizes each pool in multi-pool.
//
// Note that this method doesn't resize the overall
// capacity of multi-pool.
func (mp *MultiPoolWithFunc) Tune(size int) {
for _, pool := range mp.pools {
pool.Tune(size)
}
}
// IsClosed indicates whether the multi-pool is closed.
func (mp *MultiPoolWithFunc) IsClosed() bool {
return atomic.LoadInt32(&mp.state) == CLOSED
}
// ReleaseTimeout closes the multi-pool with a timeout,
// it waits all pools to be closed before timing out.
func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) {
return ErrPoolClosed
}
var errStr strings.Builder
for i, pool := range mp.pools {
if err := pool.ReleaseTimeout(timeout); err != nil {
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
if i < len(mp.pools)-1 {
errStr.WriteString(" | ")
}
return err
}
}
if errStr.Len() == 0 {
return nil
}
return errors.New(errStr.String())
}
// Reboot reboots a released multi-pool.
func (mp *MultiPoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) {
atomic.StoreUint32(&mp.index, 0)
for _, pool := range mp.pools {
pool.Reboot()
}
}
}

View File

@ -159,7 +159,7 @@ func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time) return p.now.Load().(time.Time)
} }
// NewPool generates an instance of ants pool. // NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) { func NewPool(size int, options ...Option) (*Pool, error) {
if size <= 0 { if size <= 0 {
size = -1 size = -1

View File

@ -160,7 +160,7 @@ func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time) return p.now.Load().(time.Time)
} }
// NewPoolWithFunc generates an instance of ants pool with a specific function. // NewPoolWithFunc instantiates a PoolWithFunc with customized options.
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
if size <= 0 { if size <= 0 {
size = -1 size = -1