Add functional options pattern for ants

This commit is contained in:
Andy Pan 2019-08-21 22:07:19 +08:00
parent 1c767b45a1
commit 201ac20358
10 changed files with 371 additions and 184 deletions

View File

@ -41,12 +41,6 @@ Library `ants` implements a goroutine pool with fixed capacity, managing and rec
go get -u github.com/panjf2000/ants go get -u github.com/panjf2000/ants
``` ```
Or, using glide:
``` sh
glide get github.com/panjf2000/ants
```
## How to use ## How to use
Just take a imagination that your program starts a massive number of goroutines, from which a vast amount of memory will be consumed. To mitigate that kind of situation, all you need to do is to import `ants` package and submit all your tasks to a default pool with fixed capacity, activated when package `ants` is imported: Just take a imagination that your program starts a massive number of goroutines, from which a vast amount of memory will be consumed. To mitigate that kind of situation, all you need to do is to import `ants` package and submit all your tasks to a default pool with fixed capacity, activated when package `ants` is imported:
@ -88,13 +82,13 @@ func main() {
} }
for i := 0; i < runTimes; i++ { for i := 0; i < runTimes; i++ {
wg.Add(1) wg.Add(1)
ants.Submit(syncCalculateSum) _ = ants.Submit(syncCalculateSum)
} }
wg.Wait() wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// Use the pool with a method, // Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration. // set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i) myFunc(i)
@ -104,7 +98,7 @@ func main() {
// Submit tasks one by one. // Submit tasks one by one.
for i := 0; i < runTimes; i++ { for i := 0; i < runTimes; i++ {
wg.Add(1) wg.Add(1)
p.Invoke(int32(i)) _ = p.Invoke(int32(i))
} }
wg.Wait() wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Printf("running goroutines: %d\n", p.Running())
@ -129,7 +123,7 @@ type Request struct {
} }
func main() { func main() {
pool, _ := ants.NewPoolWithFunc(100, func(payload interface{}) { pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) {
request, ok := payload.(*Request) request, ok := payload.(*Request)
if !ok { if !ok {
return return
@ -167,20 +161,82 @@ func main() {
} }
``` ```
## Submit tasks ## Functional options for ants pool
Tasks can be submitted by calling `ants.Submit(func())`
```go ```go
ants.Submit(func(){}) type Options struct {
// ExpiryDuration set the expired time (second) of every worker.
ExpiryDuration time.Duration
// PreAlloc indicate whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
}
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
``` ```
`ants.Options`contains all optional configurations of ants pool, which allow you to customize the goroutine pool by invoking option functions to set up each configuration in `NewPool`/`NewPoolWithFunc`method.
## Customize limited pool ## Customize limited pool
`ants` also supports customizing the capacity of pool. You can invoke the `NewPool` method to instantiate a pool with a given capacity, as following: `ants` also supports customizing the capacity of pool. You can invoke the `NewPool` method to instantiate a pool with a given capacity, as following:
``` go ``` go
// Set 10000 the size of goroutine pool // Set 10000 the size of goroutine pool
p, _ := ants.NewPool(10000) p, _ := ants.NewPool(10000)
// Submit a task ```
p.Submit(func(){})
## Submit tasks
Tasks can be submitted by calling `ants.Submit(func())`
```go
ants.Submit(func(){})
``` ```
## Tune pool capacity in runtime ## Tune pool capacity in runtime
@ -199,7 +255,7 @@ Don't worry about the synchronous problems in this case, the method here is thre
```go ```go
// ants will pre-malloc the whole capacity of pool when you invoke this method // ants will pre-malloc the whole capacity of pool when you invoke this method
p, _ := ants.NewPoolPreMalloc(AntsSize) p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
``` ```
## Release Pool ## Release Pool
@ -208,8 +264,6 @@ p, _ := ants.NewPoolPreMalloc(AntsSize)
pool.Release() pool.Release()
``` ```
## About sequence ## About sequence
All tasks submitted to `ants` pool will not be guaranteed to be addressed in order, because those tasks scatter among a series of concurrent workers, thus those tasks would be executed concurrently. All tasks submitted to `ants` pool will not be guaranteed to be addressed in order, because those tasks scatter among a series of concurrent workers, thus those tasks would be executed concurrently.

View File

@ -41,12 +41,6 @@ A goroutine pool for Go
go get -u github.com/panjf2000/ants go get -u github.com/panjf2000/ants
``` ```
使用包管理工具 glide 安装:
``` sh
glide get github.com/panjf2000/ants
```
## 使用 ## 使用
写 go 并发程序的时候如果程序会启动大量的 goroutine 势必会消耗大量的系统资源内存CPU通过使用 `ants`,可以实例化一个协程池,复用 goroutine ,节省资源,提升性能: 写 go 并发程序的时候如果程序会启动大量的 goroutine 势必会消耗大量的系统资源内存CPU通过使用 `ants`,可以实例化一个协程池,复用 goroutine ,节省资源,提升性能:
@ -88,7 +82,7 @@ func main() {
} }
for i := 0; i < runTimes; i++ { for i := 0; i < runTimes; i++ {
wg.Add(1) wg.Add(1)
ants.Submit(syncCalculateSum) _ = ants.Submit(syncCalculateSum)
} }
wg.Wait() wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("running goroutines: %d\n", ants.Running())
@ -104,7 +98,7 @@ func main() {
// Submit tasks one by one. // Submit tasks one by one.
for i := 0; i < runTimes; i++ { for i := 0; i < runTimes; i++ {
wg.Add(1) wg.Add(1)
p.Invoke(int32(i)) _ = p.Invoke(int32(i))
} }
wg.Wait() wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Printf("running goroutines: %d\n", p.Running())
@ -129,7 +123,7 @@ type Request struct {
} }
func main() { func main() {
pool, _ := ants.NewPoolWithFunc(100, func(payload interface{}) { pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) {
request, ok := payload.(*Request) request, ok := payload.(*Request)
if !ok { if !ok {
return return
@ -167,20 +161,83 @@ func main() {
} }
``` ```
## 任务提交 ## Pool 配置
提交任务通过调用 `ants.Submit(func())`方法:
```go ```go
ants.Submit(func(){}) type Options struct {
// ExpiryDuration set the expired time (second) of every worker.
ExpiryDuration time.Duration
// PreAlloc indicate whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
}
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
``` ```
通过在调用`NewPool`/`NewPoolWithFunc`之时使用各种 optional function可以设置`ants.Options`中各个配置项的值,然后用它来定制化 goroutine pool.
## 自定义池 ## 自定义池
`ants`支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 Pool ,如下: `ants`支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 Pool ,如下:
``` go ``` go
// Set 10000 the size of goroutine pool // Set 10000 the size of goroutine pool
p, _ := ants.NewPool(10000) p, _ := ants.NewPool(10000)
// Submit a task ```
p.Submit(func(){})
## 任务提交
提交任务通过调用 `ants.Submit(func())`方法:
```go
ants.Submit(func(){})
``` ```
## 动态调整协程池容量 ## 动态调整协程池容量
@ -199,7 +256,7 @@ pool.Tune(100000) // Tune its capacity to 100000
```go ```go
// ants will pre-malloc the whole capacity of pool when you invoke this function // ants will pre-malloc the whole capacity of pool when you invoke this function
p, _ := ants.NewPoolPreMalloc(AntsSize) p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
``` ```

64
ants.go
View File

@ -26,6 +26,7 @@ import (
"errors" "errors"
"math" "math"
"runtime" "runtime"
"time"
) )
const ( const (
@ -46,6 +47,9 @@ var (
// ErrInvalidPoolSize will be returned when setting a negative number as pool capacity. // ErrInvalidPoolSize will be returned when setting a negative number as pool capacity.
ErrInvalidPoolSize = errors.New("invalid size for pool") ErrInvalidPoolSize = errors.New("invalid size for pool")
// ErrLackPoolFunc will be returned when invokers don't provide function for pool.
ErrLackPoolFunc = errors.New("must provide function for pool")
// ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines. // ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines.
ErrInvalidPoolExpiry = errors.New("invalid expiry for pool") ErrInvalidPoolExpiry = errors.New("invalid expiry for pool")
@ -72,10 +76,68 @@ var (
return 1 return 1
}() }()
// Init a instance pool when importing ants.
defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE) defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE)
) )
// Init a instance pool when importing ants. type Option func(opts *Options)
type Options struct {
// ExpiryDuration set the expired time (second) of every worker.
ExpiryDuration time.Duration
// PreAlloc indicate whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
}
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
// Submit submits a task to pool. // Submit submits a task to pool.
func Submit(task func()) error { func Submit(task func()) error {

View File

@ -76,7 +76,7 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) {
func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) { func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
p, _ := ants.NewPoolPreMalloc(AntsSize) p, _ := ants.NewPool(AntsSize, ants.WithPreAlloc(true))
defer p.Release() defer p.Release()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -117,10 +117,10 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) { func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
p, _ := ants.NewPoolWithFuncPreMalloc(AntsSize, func(i interface{}) { p, _ := ants.NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i) demoPoolFunc(i)
wg.Done() wg.Done()
}) }, ants.WithPreAlloc(true))
defer p.Release() defer p.Release()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -172,7 +172,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) { func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
dur := 10 dur := 10
p, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc) p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true))
defer p.Release() defer p.Release()
for i := 0; i < AntsSize; i++ { for i := 0; i < AntsSize; i++ {
@ -233,18 +233,17 @@ func TestAntsPool(t *testing.T) {
//------------------------------------------------------------------------------------------- //-------------------------------------------------------------------------------------------
func TestPanicHandler(t *testing.T) { func TestPanicHandler(t *testing.T) {
p0, err := ants.NewPool(10) var panicCounter int64
var wg sync.WaitGroup
p0, err := ants.NewPool(10, ants.WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
}))
if err != nil { if err != nil {
t.Fatalf("create new pool failed: %s", err.Error()) t.Fatalf("create new pool failed: %s", err.Error())
} }
defer p0.Release() defer p0.Release()
var panicCounter int64
var wg sync.WaitGroup
p0.PanicHandler = func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
}
wg.Add(1) wg.Add(1)
_ = p0.Submit(func() { _ = p0.Submit(func() {
panic("Oops!") panic("Oops!")
@ -258,17 +257,14 @@ func TestPanicHandler(t *testing.T) {
t.Errorf("pool should be empty after panic") t.Errorf("pool should be empty after panic")
} }
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) {
panic(p) defer wg.Done()
}) atomic.AddInt64(&panicCounter, 1)
}))
if err != nil { if err != nil {
t.Fatalf("create new pool with func failed: %s", err.Error()) t.Fatalf("create new pool with func failed: %s", err.Error())
} }
defer p1.Release() defer p1.Release()
p1.PanicHandler = func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}
wg.Add(1) wg.Add(1)
_ = p1.Invoke("Oops!") _ = p1.Invoke("Oops!")
wg.Wait() wg.Wait()
@ -282,18 +278,17 @@ func TestPanicHandler(t *testing.T) {
} }
func TestPanicHandlerPreMalloc(t *testing.T) { func TestPanicHandlerPreMalloc(t *testing.T) {
p0, err := ants.NewPoolPreMalloc(10) var panicCounter int64
var wg sync.WaitGroup
p0, err := ants.NewPool(10, ants.WithPreAlloc(true), ants.WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
}))
if err != nil { if err != nil {
t.Fatalf("create new pool failed: %s", err.Error()) t.Fatalf("create new pool failed: %s", err.Error())
} }
defer p0.Release() defer p0.Release()
var panicCounter int64
var wg sync.WaitGroup
p0.PanicHandler = func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
}
wg.Add(1) wg.Add(1)
_ = p0.Submit(func() { _ = p0.Submit(func() {
panic("Oops!") panic("Oops!")
@ -307,17 +302,14 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
t.Errorf("pool should be empty after panic") t.Errorf("pool should be empty after panic")
} }
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) {
panic(p) defer wg.Done()
}) atomic.AddInt64(&panicCounter, 1)
}))
if err != nil { if err != nil {
t.Fatalf("create new pool with func failed: %s", err.Error()) t.Fatalf("create new pool with func failed: %s", err.Error())
} }
defer p1.Release() defer p1.Release()
p1.PanicHandler = func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}
wg.Add(1) wg.Add(1)
_ = p1.Invoke("Oops!") _ = p1.Invoke("Oops!")
wg.Wait() wg.Wait()
@ -351,7 +343,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
} }
func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
p0, err := ants.NewPoolPreMalloc(10) p0, err := ants.NewPool(10, ants.WithPreAlloc(true))
if err != nil { if err != nil {
t.Fatalf("create new pool failed: %s", err.Error()) t.Fatalf("create new pool failed: %s", err.Error())
} }
@ -394,7 +386,7 @@ func TestPurge(t *testing.T) {
} }
func TestPurgePreMalloc(t *testing.T) { func TestPurgePreMalloc(t *testing.T) {
p, err := ants.NewPoolPreMalloc(10) p, err := ants.NewPool(10, ants.WithPreAlloc(true))
if err != nil { if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error()) t.Fatalf("create TimingPool failed: %s", err.Error())
} }
@ -418,11 +410,10 @@ func TestPurgePreMalloc(t *testing.T) {
func TestNonblockingSubmit(t *testing.T) { func TestNonblockingSubmit(t *testing.T) {
poolSize := 10 poolSize := 10
p, err := ants.NewPool(poolSize) p, err := ants.NewPool(poolSize, ants.WithNonblocking(true))
if err != nil { if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error()) t.Fatalf("create TimingPool failed: %s", err.Error())
} }
p.Nonblocking = true
defer p.Release() defer p.Release()
for i := 0; i < poolSize-1; i++ { for i := 0; i < poolSize-1; i++ {
if err := p.Submit(longRunningFunc); err != nil { if err := p.Submit(longRunningFunc); err != nil {
@ -450,11 +441,10 @@ func TestNonblockingSubmit(t *testing.T) {
func TestMaxBlockingSubmit(t *testing.T) { func TestMaxBlockingSubmit(t *testing.T) {
poolSize := 10 poolSize := 10
p, err := ants.NewPool(poolSize) p, err := ants.NewPool(poolSize, ants.WithMaxBlockingTasks(1))
if err != nil { if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error()) t.Fatalf("create TimingPool failed: %s", err.Error())
} }
p.MaxBlockingTasks = 1
defer p.Release() defer p.Release()
for i := 0; i < poolSize-1; i++ { for i := 0; i < poolSize-1; i++ {
if err := p.Submit(longRunningFunc); err != nil { if err := p.Submit(longRunningFunc); err != nil {
@ -496,11 +486,10 @@ func TestMaxBlockingSubmit(t *testing.T) {
func TestNonblockingSubmitWithFunc(t *testing.T) { func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10 poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc) p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithNonblocking(true))
if err != nil { if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error()) t.Fatalf("create TimingPool failed: %s", err.Error())
} }
p.Nonblocking = true
defer p.Release() defer p.Release()
for i := 0; i < poolSize-1; i++ { for i := 0; i < poolSize-1; i++ {
if err := p.Invoke(nil); err != nil { if err := p.Invoke(nil); err != nil {
@ -525,11 +514,10 @@ func TestNonblockingSubmitWithFunc(t *testing.T) {
func TestMaxBlockingSubmitWithFunc(t *testing.T) { func TestMaxBlockingSubmitWithFunc(t *testing.T) {
poolSize := 10 poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc) p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithMaxBlockingTasks(1))
if err != nil { if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error()) t.Fatalf("create TimingPool failed: %s", err.Error())
} }
p.MaxBlockingTasks = 1
defer p.Release() defer p.Release()
for i := 0; i < poolSize-1; i++ { for i := 0; i < poolSize-1; i++ {
if err := p.Invoke(Param); err != nil { if err := p.Invoke(Param); err != nil {
@ -566,15 +554,22 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
} }
} }
func TestRestCodeCoverage(t *testing.T) { func TestRestCodeCoverage(t *testing.T) {
_, err := ants.NewUltimatePool(-1, -1, false) _, err := ants.NewPool(-1, ants.WithExpiryDuration(-1))
t.Log(err) t.Log(err)
_, err = ants.NewUltimatePool(1, -1, false) _, err = ants.NewPool(1, ants.WithExpiryDuration(-1))
t.Log(err) t.Log(err)
_, err = ants.NewUltimatePoolWithFunc(-1, -1, demoPoolFunc, false) _, err = ants.NewPoolWithFunc(-1, demoPoolFunc, ants.WithExpiryDuration(-1))
t.Log(err) t.Log(err)
_, err = ants.NewUltimatePoolWithFunc(1, -1, demoPoolFunc, false) _, err = ants.NewPoolWithFunc(1, demoPoolFunc, ants.WithExpiryDuration(-1))
t.Log(err) t.Log(err)
options := ants.Options{}
options.ExpiryDuration = time.Duration(10) * time.Second
options.Nonblocking = true
options.PreAlloc = true
poolOpts, _ := ants.NewPool(1, ants.WithOptions(options))
t.Logf("Pool with options, capacity: %d", poolOpts.Cap())
p0, _ := ants.NewPool(TestSize) p0, _ := ants.NewPool(TestSize)
defer func() { defer func() {
_ = p0.Submit(demoFunc) _ = p0.Submit(demoFunc)
@ -590,7 +585,7 @@ func TestRestCodeCoverage(t *testing.T) {
p0.Tune(TestSize / 10) p0.Tune(TestSize / 10)
t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running()) t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
pprem, _ := ants.NewPoolPreMalloc(TestSize) pprem, _ := ants.NewPool(TestSize, ants.WithPreAlloc(true))
defer func() { defer func() {
_ = pprem.Submit(demoFunc) _ = pprem.Submit(demoFunc)
}() }()
@ -621,7 +616,7 @@ func TestRestCodeCoverage(t *testing.T) {
p.Tune(TestSize / 10) p.Tune(TestSize / 10)
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running()) t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
ppremWithFunc, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc) ppremWithFunc, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true))
defer func() { defer func() {
_ = ppremWithFunc.Invoke(Param) _ = ppremWithFunc.Invoke(Param)
}() }()

View File

@ -63,7 +63,7 @@ func main() {
fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// Use the pool with a function, // Use the pool with a method,
// set 10 to the capacity of goroutine pool and 1 second for expired duration. // set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i) myFunc(i)

1
go.mod
View File

@ -1 +0,0 @@
module github.com/panjf2000/ants

78
pool.go
View File

@ -40,7 +40,7 @@ type Pool struct {
expiryDuration time.Duration expiryDuration time.Duration
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers []*Worker workers []*goWorker
// release is used to notice the pool to closed itself. // release is used to notice the pool to closed itself.
release int32 release int32
@ -57,22 +57,22 @@ type Pool struct {
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool workerCache sync.Pool
// PanicHandler is used to handle panics from each worker goroutine. // panicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) panicHandler func(interface{})
// Max number of goroutine blocking on pool.Submit. // Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit. // 0 (default value) means no such limit.
MaxBlockingTasks int32 maxBlockingTasks int32
// goroutine already been blocked on pool.Submit // goroutine already been blocked on pool.Submit
// protected by pool.lock // protected by pool.lock
blockingNum int32 blockingNum int32
// When Nonblocking is true, Pool.Submit will never be blocked. // When nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative. // When nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool nonblocking bool
} }
// Clear expired workers periodically. // Clear expired workers periodically.
@ -80,7 +80,7 @@ func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration) heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop() defer heartbeat.Stop()
var expiredWorkers []*Worker var expiredWorkers []*goWorker
for range heartbeat.C { for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
break break
@ -122,38 +122,46 @@ func (p *Pool) periodicallyPurge() {
} }
// NewPool generates an instance of ants pool. // NewPool generates an instance of ants pool.
func NewPool(size int) (*Pool, error) { func NewPool(size int, options ...Option) (*Pool, error) {
return NewUltimatePool(size, DEFAULT_CLEAN_INTERVAL_TIME, false)
}
// NewPoolPreMalloc generates an instance of ants pool with the memory pre-allocation of pool size.
func NewPoolPreMalloc(size int) (*Pool, error) {
return NewUltimatePool(size, DEFAULT_CLEAN_INTERVAL_TIME, true)
}
// NewUltimatePool generates an instance of ants pool with a custom timed task.
func NewUltimatePool(size, expiry int, preAlloc bool) (*Pool, error) {
if size <= 0 { if size <= 0 {
return nil, ErrInvalidPoolSize return nil, ErrInvalidPoolSize
} }
if expiry <= 0 {
return nil, ErrInvalidPoolExpiry opts := new(Options)
for _, option := range options {
option(opts)
} }
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second
}
var p *Pool var p *Pool
if preAlloc { if opts.PreAlloc {
p = &Pool{ p = &Pool{
capacity: int32(size), capacity: int32(size),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: opts.ExpiryDuration,
workers: make([]*Worker, 0, size), workers: make([]*goWorker, 0, size),
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
} }
} else { } else {
p = &Pool{ p = &Pool{
capacity: int32(size), capacity: int32(size),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: opts.ExpiryDuration,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
} }
} }
p.cond = sync.NewCond(&p.lock) p.cond = sync.NewCond(&p.lock)
// Start a goroutine to clean up expired workers periodically.
go p.periodicallyPurge() go p.periodicallyPurge()
return p, nil return p, nil
} }
@ -188,12 +196,12 @@ func (p *Pool) Cap() int {
} }
// Tune changes the capacity of this pool. // Tune changes the capacity of this pool.
func (p *Pool) Tune(size int) { func (p *Pool) Tune(size uint) {
if p.Cap() == size { if p.Cap() == int(size) {
return return
} }
atomic.StoreInt32(&p.capacity, int32(size)) atomic.StoreInt32(&p.capacity, int32(size))
diff := p.Running() - size diff := p.Running() - int(size)
for i := 0; i < diff; i++ { for i := 0; i < diff; i++ {
p.retrieveWorker().task <- nil p.retrieveWorker().task <- nil
} }
@ -227,13 +235,13 @@ func (p *Pool) decRunning() {
} }
// retrieveWorker returns a available worker to run the tasks. // retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker() *Worker { func (p *Pool) retrieveWorker() *goWorker {
var w *Worker var w *goWorker
spawnWorker := func() { spawnWorker := func() {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil { if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker) w = cacheWorker.(*goWorker)
} else { } else {
w = &Worker{ w = &goWorker{
pool: p, pool: p,
task: make(chan func(), workerChanCap), task: make(chan func(), workerChanCap),
} }
@ -253,12 +261,12 @@ func (p *Pool) retrieveWorker() *Worker {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
} else { } else {
if p.Nonblocking { if p.nonblocking {
p.lock.Unlock() p.lock.Unlock()
return nil return nil
} }
Reentry: Reentry:
if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks { if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks {
p.lock.Unlock() p.lock.Unlock()
return nil return nil
} }
@ -283,7 +291,7 @@ func (p *Pool) retrieveWorker() *Worker {
} }
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *Worker) bool { func (p *Pool) revertWorker(worker *goWorker) bool {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
return false return false
} }

View File

@ -40,7 +40,7 @@ type PoolWithFunc struct {
expiryDuration time.Duration expiryDuration time.Duration
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers []*WorkerWithFunc workers []*goWorkerWithFunc
// release is used to notice the pool to closed itself. // release is used to notice the pool to closed itself.
release int32 release int32
@ -60,22 +60,22 @@ type PoolWithFunc struct {
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool workerCache sync.Pool
// PanicHandler is used to handle panics from each worker goroutine. // panicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) panicHandler func(interface{})
// Max number of goroutine blocking on pool.Submit. // Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit. // 0 (default value) means no such limit.
MaxBlockingTasks int32 maxBlockingTasks int32
// goroutine already been blocked on pool.Submit // goroutine already been blocked on pool.Submit
// protected by pool.lock // protected by pool.lock
blockingNum int32 blockingNum int32
// When Nonblocking is true, Pool.Submit will never be blocked. // When nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative. // When nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool nonblocking bool
} }
// Clear expired workers periodically. // Clear expired workers periodically.
@ -83,7 +83,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration) heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop() defer heartbeat.Stop()
var expiredWorkers []*WorkerWithFunc var expiredWorkers []*goWorkerWithFunc
for range heartbeat.C { for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
break break
@ -125,40 +125,52 @@ func (p *PoolWithFunc) periodicallyPurge() {
} }
// NewPoolWithFunc generates an instance of ants pool with a specific function. // NewPoolWithFunc generates an instance of ants pool with a specific function.
func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error) { func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
return NewUltimatePoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, false)
}
// NewPoolWithFuncPreMalloc generates an instance of ants pool with a specific function and the memory pre-allocation of pool size.
func NewPoolWithFuncPreMalloc(size int, pf func(interface{})) (*PoolWithFunc, error) {
return NewUltimatePoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, true)
}
// NewUltimatePoolWithFunc generates an instance of ants pool with a specific function and a custom timed task.
func NewUltimatePoolWithFunc(size, expiry int, pf func(interface{}), preAlloc bool) (*PoolWithFunc, error) {
if size <= 0 { if size <= 0 {
return nil, ErrInvalidPoolSize return nil, ErrInvalidPoolSize
} }
if expiry <= 0 {
return nil, ErrInvalidPoolExpiry if pf == nil {
return nil, ErrLackPoolFunc
} }
opts := new(Options)
for _, option := range options {
option(opts)
}
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second
}
var p *PoolWithFunc var p *PoolWithFunc
if preAlloc { if opts.PreAlloc {
p = &PoolWithFunc{ p = &PoolWithFunc{
capacity: int32(size), capacity: int32(size),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: opts.ExpiryDuration,
poolFunc: pf, poolFunc: pf,
workers: make([]*WorkerWithFunc, 0, size), workers: make([]*goWorkerWithFunc, 0, size),
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
} }
} else { } else {
p = &PoolWithFunc{ p = &PoolWithFunc{
capacity: int32(size), capacity: int32(size),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: opts.ExpiryDuration,
poolFunc: pf, poolFunc: pf,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
} }
} }
p.cond = sync.NewCond(&p.lock) p.cond = sync.NewCond(&p.lock)
// Start a goroutine to clean up expired workers periodically.
go p.periodicallyPurge() go p.periodicallyPurge()
return p, nil return p, nil
} }
@ -232,13 +244,13 @@ func (p *PoolWithFunc) decRunning() {
} }
// retrieveWorker returns a available worker to run the tasks. // retrieveWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
var w *WorkerWithFunc var w *goWorkerWithFunc
spawnWorker := func() { spawnWorker := func() {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil { if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*WorkerWithFunc) w = cacheWorker.(*goWorkerWithFunc)
} else { } else {
w = &WorkerWithFunc{ w = &goWorkerWithFunc{
pool: p, pool: p,
args: make(chan interface{}, workerChanCap), args: make(chan interface{}, workerChanCap),
} }
@ -258,12 +270,12 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
} else { } else {
if p.Nonblocking { if p.nonblocking {
p.lock.Unlock() p.lock.Unlock()
return nil return nil
} }
Reentry: Reentry:
if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks { if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks {
p.lock.Unlock() p.lock.Unlock()
return nil return nil
} }
@ -288,7 +300,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
} }
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) bool { func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
return false return false
} }

View File

@ -28,10 +28,10 @@ import (
"time" "time"
) )
// Worker is the actual executor who runs the tasks, // goWorker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and // it starts a goroutine that accepts tasks and
// performs function calls. // performs function calls.
type Worker struct { type goWorker struct {
// pool who owns this worker. // pool who owns this worker.
pool *Pool pool *Pool
@ -44,15 +44,15 @@ type Worker struct {
// run starts a goroutine to repeat the process // run starts a goroutine to repeat the process
// that performs the function calls. // that performs the function calls.
func (w *Worker) run() { func (w *goWorker) run() {
w.pool.incRunning() w.pool.incRunning()
go func() { go func() {
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
w.pool.decRunning() w.pool.decRunning()
w.pool.workerCache.Put(w) w.pool.workerCache.Put(w)
if w.pool.PanicHandler != nil { if w.pool.panicHandler != nil {
w.pool.PanicHandler(p) w.pool.panicHandler(p)
} else { } else {
log.Printf("worker exits from a panic: %v\n", p) log.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte var buf [4096]byte

View File

@ -28,10 +28,10 @@ import (
"time" "time"
) )
// WorkerWithFunc is the actual executor who runs the tasks, // goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and // it starts a goroutine that accepts tasks and
// performs function calls. // performs function calls.
type WorkerWithFunc struct { type goWorkerWithFunc struct {
// pool who owns this worker. // pool who owns this worker.
pool *PoolWithFunc pool *PoolWithFunc
@ -44,15 +44,15 @@ type WorkerWithFunc struct {
// run starts a goroutine to repeat the process // run starts a goroutine to repeat the process
// that performs the function calls. // that performs the function calls.
func (w *WorkerWithFunc) run() { func (w *goWorkerWithFunc) run() {
w.pool.incRunning() w.pool.incRunning()
go func() { go func() {
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
w.pool.decRunning() w.pool.decRunning()
w.pool.workerCache.Put(w) w.pool.workerCache.Put(w)
if w.pool.PanicHandler != nil { if w.pool.panicHandler != nil {
w.pool.PanicHandler(p) w.pool.panicHandler(p)
} else { } else {
log.Printf("worker with func exits from a panic: %v\n", p) log.Printf("worker with func exits from a panic: %v\n", p)
var buf [4096]byte var buf [4096]byte