From 201ac20358578a34c3131049e3825c14bfb061a2 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 21 Aug 2019 22:07:19 +0800 Subject: [PATCH] Add functional options pattern for ants --- README.md | 92 +++++++++++++++++++++++++++++-------- README_ZH.md | 115 +++++++++++++++++++++++++++++++++++------------ ants.go | 64 +++++++++++++++++++++++++- ants_test.go | 95 +++++++++++++++++++-------------------- examples/main.go | 2 +- go.mod | 1 - pool.go | 82 ++++++++++++++++++--------------- pool_func.go | 84 +++++++++++++++++++--------------- worker.go | 10 ++--- worker_func.go | 10 ++--- 10 files changed, 371 insertions(+), 184 deletions(-) delete mode 100644 go.mod diff --git a/README.md b/README.md index c379b77..b7e6580 100644 --- a/README.md +++ b/README.md @@ -41,12 +41,6 @@ Library `ants` implements a goroutine pool with fixed capacity, managing and rec go get -u github.com/panjf2000/ants ``` -Or, using glide: - -``` sh -glide get github.com/panjf2000/ants -``` - ## How to use Just take a imagination that your program starts a massive number of goroutines, from which a vast amount of memory will be consumed. To mitigate that kind of situation, all you need to do is to import `ants` package and submit all your tasks to a default pool with fixed capacity, activated when package `ants` is imported: @@ -88,13 +82,13 @@ func main() { } for i := 0; i < runTimes; i++ { wg.Add(1) - ants.Submit(syncCalculateSum) + _ = ants.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") - // Use the pool with a method, + // Use the pool with a function, // set 10 to the capacity of goroutine pool and 1 second for expired duration. p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) @@ -104,7 +98,7 @@ func main() { // Submit tasks one by one. for i := 0; i < runTimes; i++ { wg.Add(1) - p.Invoke(int32(i)) + _ = p.Invoke(int32(i)) } wg.Wait() fmt.Printf("running goroutines: %d\n", p.Running()) @@ -129,7 +123,7 @@ type Request struct { } func main() { - pool, _ := ants.NewPoolWithFunc(100, func(payload interface{}) { + pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) { request, ok := payload.(*Request) if !ok { return @@ -142,7 +136,7 @@ func main() { }(request.Param) request.Result <- reverseParam - }) + }) defer pool.Release() http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) { @@ -167,20 +161,82 @@ func main() { } ``` -## Submit tasks -Tasks can be submitted by calling `ants.Submit(func())` +## Functional options for ants pool + ```go -ants.Submit(func(){}) +type Options struct { + // ExpiryDuration set the expired time (second) of every worker. + ExpiryDuration time.Duration + + // PreAlloc indicate whether to make memory pre-allocation when initializing Pool. + PreAlloc bool + + // Max number of goroutine blocking on pool.Submit. + // 0 (default value) means no such limit. + MaxBlockingTasks int + + // When Nonblocking is true, Pool.Submit will never be blocked. + // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. + // When Nonblocking is true, MaxBlockingTasks is inoperative. + Nonblocking bool + + // PanicHandler is used to handle panics from each worker goroutine. + // if nil, panics will be thrown out again from worker goroutines. + PanicHandler func(interface{}) +} + +func WithOptions(options Options) Option { + return func(opts *Options) { + *opts = options + } +} + +func WithExpiryDuration(expiryDuration time.Duration) Option { + return func(opts *Options) { + opts.ExpiryDuration = expiryDuration + } +} + +func WithPreAlloc(preAlloc bool) Option { + return func(opts *Options) { + opts.PreAlloc = preAlloc + } +} + +func WithMaxBlockingTasks(maxBlockingTasks int) Option { + return func(opts *Options) { + opts.MaxBlockingTasks = maxBlockingTasks + } +} + +func WithNonblocking(nonblocking bool) Option { + return func(opts *Options) { + opts.Nonblocking = nonblocking + } +} + +func WithPanicHandler(panicHandler func(interface{})) Option { + return func(opts *Options) { + opts.PanicHandler = panicHandler + } +} ``` +`ants.Options`contains all optional configurations of ants pool, which allow you to customize the goroutine pool by invoking option functions to set up each configuration in `NewPool`/`NewPoolWithFunc`method. + ## Customize limited pool + `ants` also supports customizing the capacity of pool. You can invoke the `NewPool` method to instantiate a pool with a given capacity, as following: ``` go // Set 10000 the size of goroutine pool p, _ := ants.NewPool(10000) -// Submit a task -p.Submit(func(){}) +``` + +## Submit tasks +Tasks can be submitted by calling `ants.Submit(func())` +```go +ants.Submit(func(){}) ``` ## Tune pool capacity in runtime @@ -199,7 +255,7 @@ Don't worry about the synchronous problems in this case, the method here is thre ```go // ants will pre-malloc the whole capacity of pool when you invoke this method -p, _ := ants.NewPoolPreMalloc(AntsSize) +p, _ := ants.NewPool(100000, ants.WithPreAlloc(true)) ``` ## Release Pool @@ -208,8 +264,6 @@ p, _ := ants.NewPoolPreMalloc(AntsSize) pool.Release() ``` - - ## About sequence All tasks submitted to `ants` pool will not be guaranteed to be addressed in order, because those tasks scatter among a series of concurrent workers, thus those tasks would be executed concurrently. diff --git a/README_ZH.md b/README_ZH.md index 1c9b9fb..ac1911e 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -16,15 +16,15 @@ A goroutine pool for Go [英文](README.md) | [项目博客](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) -`ants`是一个高性能的协程池,实现了对大规模goroutine的调度管理、goroutine复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。 +`ants`是一个高性能的协程池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。 ## 功能: -- 实现了自动调度并发的goroutine,复用goroutine -- 定时清理过期的goroutine,进一步节省资源 +- 实现了自动调度并发的 goroutine,复用 goroutine +- 定时清理过期的 goroutine,进一步节省资源 - 提供了友好的接口:任务提交、获取运行中的协程数量、动态调整协程池大小 -- 优雅处理panic,防止程序崩溃 -- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生goroutine并发具有更高的性能 +- 优雅处理 panic,防止程序崩溃 +- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能 ## 目前测试通过的Golang版本: @@ -41,12 +41,6 @@ A goroutine pool for Go go get -u github.com/panjf2000/ants ``` -使用包管理工具 glide 安装: - -``` sh -glide get github.com/panjf2000/ants -``` - ## 使用 写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 `ants`,可以实例化一个协程池,复用 goroutine ,节省资源,提升性能: @@ -88,7 +82,7 @@ func main() { } for i := 0; i < runTimes; i++ { wg.Add(1) - ants.Submit(syncCalculateSum) + _ = ants.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("running goroutines: %d\n", ants.Running()) @@ -104,7 +98,7 @@ func main() { // Submit tasks one by one. for i := 0; i < runTimes; i++ { wg.Add(1) - p.Invoke(int32(i)) + _ = p.Invoke(int32(i)) } wg.Wait() fmt.Printf("running goroutines: %d\n", p.Running()) @@ -112,7 +106,7 @@ func main() { } ``` -## 与http server集成 +## 与 http server 集成 ```go package main @@ -129,7 +123,7 @@ type Request struct { } func main() { - pool, _ := ants.NewPoolWithFunc(100, func(payload interface{}) { + pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) { request, ok := payload.(*Request) if !ok { return @@ -142,7 +136,7 @@ func main() { }(request.Param) request.Result <- reverseParam - }) + }) defer pool.Release() http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) { @@ -167,20 +161,83 @@ func main() { } ``` -## 任务提交 -提交任务通过调用 `ants.Submit(func())`方法: +## Pool 配置 + ```go -ants.Submit(func(){}) +type Options struct { + // ExpiryDuration set the expired time (second) of every worker. + ExpiryDuration time.Duration + + // PreAlloc indicate whether to make memory pre-allocation when initializing Pool. + PreAlloc bool + + // Max number of goroutine blocking on pool.Submit. + // 0 (default value) means no such limit. + MaxBlockingTasks int + + // When Nonblocking is true, Pool.Submit will never be blocked. + // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. + // When Nonblocking is true, MaxBlockingTasks is inoperative. + Nonblocking bool + + // PanicHandler is used to handle panics from each worker goroutine. + // if nil, panics will be thrown out again from worker goroutines. + PanicHandler func(interface{}) +} + +func WithOptions(options Options) Option { + return func(opts *Options) { + *opts = options + } +} + +func WithExpiryDuration(expiryDuration time.Duration) Option { + return func(opts *Options) { + opts.ExpiryDuration = expiryDuration + } +} + +func WithPreAlloc(preAlloc bool) Option { + return func(opts *Options) { + opts.PreAlloc = preAlloc + } +} + +func WithMaxBlockingTasks(maxBlockingTasks int) Option { + return func(opts *Options) { + opts.MaxBlockingTasks = maxBlockingTasks + } +} + +func WithNonblocking(nonblocking bool) Option { + return func(opts *Options) { + opts.Nonblocking = nonblocking + } +} + +func WithPanicHandler(panicHandler func(interface{})) Option { + return func(opts *Options) { + opts.PanicHandler = panicHandler + } +} ``` +通过在调用`NewPool`/`NewPoolWithFunc`之时使用各种 optional function,可以设置`ants.Options`中各个配置项的值,然后用它来定制化 goroutine pool. + + ## 自定义池 `ants`支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 Pool ,如下: ``` go // Set 10000 the size of goroutine pool p, _ := ants.NewPool(10000) -// Submit a task -p.Submit(func(){}) +``` + +## 任务提交 + +提交任务通过调用 `ants.Submit(func())`方法: +```go +ants.Submit(func(){}) ``` ## 动态调整协程池容量 @@ -193,13 +250,13 @@ pool.Tune(100000) // Tune its capacity to 100000 该方法是线程安全的。 -## 预先分配goroutine队列内存 +## 预先分配 goroutine 队列内存 -`ants`允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高协程池的性能。比如, 有一个场景需要一个超大容量的池,而且每个goroutine里面的任务都是耗时任务,这种情况下,预先分配goroutine队列内存将会减少re-slice时的复制内存损耗。 +`ants`允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高协程池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少 re-slice 时的复制内存损耗。 ```go // ants will pre-malloc the whole capacity of pool when you invoke this function -p, _ := ants.NewPoolPreMalloc(AntsSize) +p, _ := ants.NewPool(100000, ants.WithPreAlloc(true)) ``` @@ -225,9 +282,9 @@ Go Version: 1.9
-上图中的前两个 benchmark 测试结果是基于100w任务量的条件,剩下的几个是基于1000w任务量的测试结果,`ants`的默认池容量是5w。 +上图中的前两个 benchmark 测试结果是基于100w 任务量的条件,剩下的几个是基于 1000w 任务量的测试结果,`ants`的默认池容量是 5w。 -- BenchmarkGoroutine-4 代表原生goroutine +- BenchmarkGoroutine-4 代表原生 goroutine - BenchmarkPoolGroutine-4 代表使用协程池`ants` @@ -235,13 +292,13 @@ Go Version: 1.9 ![](https://user-images.githubusercontent.com/7496278/51515499-f187c500-1e4e-11e9-80e5-3df8f94fa70f.png) -**这里为了模拟大规模goroutine的场景,两次测试的并发次数分别是100w和1000w,前两个测试分别是执行100w个并发任务不使用Pool和使用了`ants`的Goroutine Pool的性能,后两个则是1000w个任务下的表现,可以直观的看出在执行速度和内存使用上,`ants`的Pool都占有明显的优势。100w的任务量,使用`ants`,执行速度与原生goroutine相当甚至略快,但只实际使用了不到5w个goroutine完成了全部任务,且内存消耗仅为原生并发的40%;而当任务量达到1000w,优势则更加明显了:用了70w左右的goroutine完成全部任务,执行速度比原生goroutine提高了100%,且内存消耗依旧保持在不使用Pool的40%左右。** +**这里为了模拟大规模 goroutine 的场景,两次测试的并发次数分别是 100w 和 1000w,前两个测试分别是执行 100w 个并发任务不使用 Pool 和使用了`ants`的 Goroutine Pool 的性能,后两个则是 1000w 个任务下的表现,可以直观的看出在执行速度和内存使用上,`ants`的 Pool 都占有明显的优势。100w 的任务量,使用`ants`,执行速度与原生 goroutine 相当甚至略快,但只实际使用了不到 5w 个 goroutine 完成了全部任务,且内存消耗仅为原生并发的 40%;而当任务量达到 1000w,优势则更加明显了:用了 70w 左右的 goroutine 完成全部任务,执行速度比原生 goroutine 提高了 100%,且内存消耗依旧保持在不使用 Pool 的 40% 左右。** ### Benchmarks with PoolWithFunc ![](https://user-images.githubusercontent.com/7496278/51515565-1e3bdc80-1e4f-11e9-8a08-452ab91d117e.png) -**因为`PoolWithFunc`这个Pool只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于`Pool`对原生goroutine在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生goroutine的300%,而内存消耗的优势已经达到了两位数的差距,原生goroutine的内存消耗达到了`ants`的35倍且原生goroutine的每次执行的内存分配次数也达到了`ants`45倍,1000w的任务量,`ants`的初始分配容量是5w,因此它完成了所有的任务依旧只使用了5w个goroutine!事实上,`ants`的Goroutine Pool的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。** +**因为`PoolWithFunc`这个 Pool 只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于`Pool`对原生 goroutine 在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生 goroutine 的 300%,而内存消耗的优势已经达到了两位数的差距,原生 goroutine 的内存消耗达到了`ants`的35倍且原生 goroutine 的每次执行的内存分配次数也达到了`ants`45倍,1000w 的任务量,`ants`的初始分配容量是 5w,因此它完成了所有的任务依旧只使用了 5w 个 goroutine!事实上,`ants`的 Goroutine Pool 的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。** ### 吞吐量测试(适用于那种只管提交异步任务而无须关心结果的场景) @@ -261,4 +318,4 @@ Go Version: 1.9 ![](https://user-images.githubusercontent.com/7496278/52989641-51b65a80-343f-11e9-86c0-e855d97343ea.gif) -**从该demo测试吞吐性能对比可以看出,使用`ants`的吞吐性能相较于原生goroutine可以保持在2-6倍的性能压制,而内存消耗则可以达到10-20倍的节省优势。** +**从该 demo 测试吞吐性能对比可以看出,使用`ants`的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。** \ No newline at end of file diff --git a/ants.go b/ants.go index b47b906..8c4c8f6 100644 --- a/ants.go +++ b/ants.go @@ -26,6 +26,7 @@ import ( "errors" "math" "runtime" + "time" ) const ( @@ -46,6 +47,9 @@ var ( // ErrInvalidPoolSize will be returned when setting a negative number as pool capacity. ErrInvalidPoolSize = errors.New("invalid size for pool") + // ErrLackPoolFunc will be returned when invokers don't provide function for pool. + ErrLackPoolFunc = errors.New("must provide function for pool") + // ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines. ErrInvalidPoolExpiry = errors.New("invalid expiry for pool") @@ -72,10 +76,68 @@ var ( return 1 }() + // Init a instance pool when importing ants. defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE) ) -// Init a instance pool when importing ants. +type Option func(opts *Options) + +type Options struct { + // ExpiryDuration set the expired time (second) of every worker. + ExpiryDuration time.Duration + + // PreAlloc indicate whether to make memory pre-allocation when initializing Pool. + PreAlloc bool + + // Max number of goroutine blocking on pool.Submit. + // 0 (default value) means no such limit. + MaxBlockingTasks int + + // When Nonblocking is true, Pool.Submit will never be blocked. + // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. + // When Nonblocking is true, MaxBlockingTasks is inoperative. + Nonblocking bool + + // PanicHandler is used to handle panics from each worker goroutine. + // if nil, panics will be thrown out again from worker goroutines. + PanicHandler func(interface{}) +} + +func WithOptions(options Options) Option { + return func(opts *Options) { + *opts = options + } +} + +func WithExpiryDuration(expiryDuration time.Duration) Option { + return func(opts *Options) { + opts.ExpiryDuration = expiryDuration + } +} + +func WithPreAlloc(preAlloc bool) Option { + return func(opts *Options) { + opts.PreAlloc = preAlloc + } +} + +func WithMaxBlockingTasks(maxBlockingTasks int) Option { + return func(opts *Options) { + opts.MaxBlockingTasks = maxBlockingTasks + } +} + +func WithNonblocking(nonblocking bool) Option { + return func(opts *Options) { + opts.Nonblocking = nonblocking + } +} + +func WithPanicHandler(panicHandler func(interface{})) Option { + return func(opts *Options) { + opts.PanicHandler = panicHandler + } +} // Submit submits a task to pool. func Submit(task func()) error { diff --git a/ants_test.go b/ants_test.go index 9394492..b51df3c 100644 --- a/ants_test.go +++ b/ants_test.go @@ -76,7 +76,7 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) { func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPoolPreMalloc(AntsSize) + p, _ := ants.NewPool(AntsSize, ants.WithPreAlloc(true)) defer p.Release() for i := 0; i < n; i++ { @@ -117,10 +117,10 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) { func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFuncPreMalloc(AntsSize, func(i interface{}) { + p, _ := ants.NewPoolWithFunc(AntsSize, func(i interface{}) { demoPoolFunc(i) wg.Done() - }) + }, ants.WithPreAlloc(true)) defer p.Release() for i := 0; i < n; i++ { @@ -172,7 +172,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) { func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) { dur := 10 - p, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc) + p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true)) defer p.Release() for i := 0; i < AntsSize; i++ { @@ -233,18 +233,17 @@ func TestAntsPool(t *testing.T) { //------------------------------------------------------------------------------------------- func TestPanicHandler(t *testing.T) { - p0, err := ants.NewPool(10) + var panicCounter int64 + var wg sync.WaitGroup + p0, err := ants.NewPool(10, ants.WithPanicHandler(func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + t.Logf("catch panic with PanicHandler: %v", p) + })) if err != nil { t.Fatalf("create new pool failed: %s", err.Error()) } defer p0.Release() - var panicCounter int64 - var wg sync.WaitGroup - p0.PanicHandler = func(p interface{}) { - defer wg.Done() - atomic.AddInt64(&panicCounter, 1) - t.Logf("catch panic with PanicHandler: %v", p) - } wg.Add(1) _ = p0.Submit(func() { panic("Oops!") @@ -258,17 +257,14 @@ func TestPanicHandler(t *testing.T) { t.Errorf("pool should be empty after panic") } - p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { - panic(p) - }) + p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + })) if err != nil { t.Fatalf("create new pool with func failed: %s", err.Error()) } defer p1.Release() - p1.PanicHandler = func(p interface{}) { - defer wg.Done() - atomic.AddInt64(&panicCounter, 1) - } wg.Add(1) _ = p1.Invoke("Oops!") wg.Wait() @@ -282,18 +278,17 @@ func TestPanicHandler(t *testing.T) { } func TestPanicHandlerPreMalloc(t *testing.T) { - p0, err := ants.NewPoolPreMalloc(10) + var panicCounter int64 + var wg sync.WaitGroup + p0, err := ants.NewPool(10, ants.WithPreAlloc(true), ants.WithPanicHandler(func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + t.Logf("catch panic with PanicHandler: %v", p) + })) if err != nil { t.Fatalf("create new pool failed: %s", err.Error()) } defer p0.Release() - var panicCounter int64 - var wg sync.WaitGroup - p0.PanicHandler = func(p interface{}) { - defer wg.Done() - atomic.AddInt64(&panicCounter, 1) - t.Logf("catch panic with PanicHandler: %v", p) - } wg.Add(1) _ = p0.Submit(func() { panic("Oops!") @@ -307,17 +302,14 @@ func TestPanicHandlerPreMalloc(t *testing.T) { t.Errorf("pool should be empty after panic") } - p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { - panic(p) - }) + p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + })) if err != nil { t.Fatalf("create new pool with func failed: %s", err.Error()) } defer p1.Release() - p1.PanicHandler = func(p interface{}) { - defer wg.Done() - atomic.AddInt64(&panicCounter, 1) - } wg.Add(1) _ = p1.Invoke("Oops!") wg.Wait() @@ -351,7 +343,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) { } func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { - p0, err := ants.NewPoolPreMalloc(10) + p0, err := ants.NewPool(10, ants.WithPreAlloc(true)) if err != nil { t.Fatalf("create new pool failed: %s", err.Error()) } @@ -394,7 +386,7 @@ func TestPurge(t *testing.T) { } func TestPurgePreMalloc(t *testing.T) { - p, err := ants.NewPoolPreMalloc(10) + p, err := ants.NewPool(10, ants.WithPreAlloc(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } @@ -418,11 +410,10 @@ func TestPurgePreMalloc(t *testing.T) { func TestNonblockingSubmit(t *testing.T) { poolSize := 10 - p, err := ants.NewPool(poolSize) + p, err := ants.NewPool(poolSize, ants.WithNonblocking(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } - p.Nonblocking = true defer p.Release() for i := 0; i < poolSize-1; i++ { if err := p.Submit(longRunningFunc); err != nil { @@ -450,11 +441,10 @@ func TestNonblockingSubmit(t *testing.T) { func TestMaxBlockingSubmit(t *testing.T) { poolSize := 10 - p, err := ants.NewPool(poolSize) + p, err := ants.NewPool(poolSize, ants.WithMaxBlockingTasks(1)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } - p.MaxBlockingTasks = 1 defer p.Release() for i := 0; i < poolSize-1; i++ { if err := p.Submit(longRunningFunc); err != nil { @@ -496,11 +486,10 @@ func TestMaxBlockingSubmit(t *testing.T) { func TestNonblockingSubmitWithFunc(t *testing.T) { poolSize := 10 - p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc) + p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithNonblocking(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } - p.Nonblocking = true defer p.Release() for i := 0; i < poolSize-1; i++ { if err := p.Invoke(nil); err != nil { @@ -525,11 +514,10 @@ func TestNonblockingSubmitWithFunc(t *testing.T) { func TestMaxBlockingSubmitWithFunc(t *testing.T) { poolSize := 10 - p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc) + p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithMaxBlockingTasks(1)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } - p.MaxBlockingTasks = 1 defer p.Release() for i := 0; i < poolSize-1; i++ { if err := p.Invoke(Param); err != nil { @@ -566,15 +554,22 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { } } func TestRestCodeCoverage(t *testing.T) { - _, err := ants.NewUltimatePool(-1, -1, false) + _, err := ants.NewPool(-1, ants.WithExpiryDuration(-1)) t.Log(err) - _, err = ants.NewUltimatePool(1, -1, false) + _, err = ants.NewPool(1, ants.WithExpiryDuration(-1)) t.Log(err) - _, err = ants.NewUltimatePoolWithFunc(-1, -1, demoPoolFunc, false) + _, err = ants.NewPoolWithFunc(-1, demoPoolFunc, ants.WithExpiryDuration(-1)) t.Log(err) - _, err = ants.NewUltimatePoolWithFunc(1, -1, demoPoolFunc, false) + _, err = ants.NewPoolWithFunc(1, demoPoolFunc, ants.WithExpiryDuration(-1)) t.Log(err) + options := ants.Options{} + options.ExpiryDuration = time.Duration(10) * time.Second + options.Nonblocking = true + options.PreAlloc = true + poolOpts, _ := ants.NewPool(1, ants.WithOptions(options)) + t.Logf("Pool with options, capacity: %d", poolOpts.Cap()) + p0, _ := ants.NewPool(TestSize) defer func() { _ = p0.Submit(demoFunc) @@ -590,7 +585,7 @@ func TestRestCodeCoverage(t *testing.T) { p0.Tune(TestSize / 10) t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running()) - pprem, _ := ants.NewPoolPreMalloc(TestSize) + pprem, _ := ants.NewPool(TestSize, ants.WithPreAlloc(true)) defer func() { _ = pprem.Submit(demoFunc) }() @@ -621,7 +616,7 @@ func TestRestCodeCoverage(t *testing.T) { p.Tune(TestSize / 10) t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running()) - ppremWithFunc, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc) + ppremWithFunc, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true)) defer func() { _ = ppremWithFunc.Invoke(Param) }() diff --git a/examples/main.go b/examples/main.go index 773abd6..f9c73c8 100644 --- a/examples/main.go +++ b/examples/main.go @@ -63,7 +63,7 @@ func main() { fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") - // Use the pool with a function, + // Use the pool with a method, // set 10 to the capacity of goroutine pool and 1 second for expired duration. p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) diff --git a/go.mod b/go.mod deleted file mode 100644 index 53277c0..0000000 --- a/go.mod +++ /dev/null @@ -1 +0,0 @@ -module github.com/panjf2000/ants diff --git a/pool.go b/pool.go index b51456f..a6fbe11 100644 --- a/pool.go +++ b/pool.go @@ -40,7 +40,7 @@ type Pool struct { expiryDuration time.Duration // workers is a slice that store the available workers. - workers []*Worker + workers []*goWorker // release is used to notice the pool to closed itself. release int32 @@ -57,22 +57,22 @@ type Pool struct { // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. workerCache sync.Pool - // PanicHandler is used to handle panics from each worker goroutine. + // panicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. - PanicHandler func(interface{}) + panicHandler func(interface{}) // Max number of goroutine blocking on pool.Submit. // 0 (default value) means no such limit. - MaxBlockingTasks int32 + maxBlockingTasks int32 // goroutine already been blocked on pool.Submit // protected by pool.lock blockingNum int32 - // When Nonblocking is true, Pool.Submit will never be blocked. + // When nonblocking is true, Pool.Submit will never be blocked. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. - // When Nonblocking is true, MaxBlockingTasks is inoperative. - Nonblocking bool + // When nonblocking is true, MaxBlockingTasks is inoperative. + nonblocking bool } // Clear expired workers periodically. @@ -80,7 +80,7 @@ func (p *Pool) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() - var expiredWorkers []*Worker + var expiredWorkers []*goWorker for range heartbeat.C { if atomic.LoadInt32(&p.release) == CLOSED { break @@ -122,38 +122,46 @@ func (p *Pool) periodicallyPurge() { } // NewPool generates an instance of ants pool. -func NewPool(size int) (*Pool, error) { - return NewUltimatePool(size, DEFAULT_CLEAN_INTERVAL_TIME, false) -} - -// NewPoolPreMalloc generates an instance of ants pool with the memory pre-allocation of pool size. -func NewPoolPreMalloc(size int) (*Pool, error) { - return NewUltimatePool(size, DEFAULT_CLEAN_INTERVAL_TIME, true) -} - -// NewUltimatePool generates an instance of ants pool with a custom timed task. -func NewUltimatePool(size, expiry int, preAlloc bool) (*Pool, error) { +func NewPool(size int, options ...Option) (*Pool, error) { if size <= 0 { return nil, ErrInvalidPoolSize } - if expiry <= 0 { - return nil, ErrInvalidPoolExpiry + + opts := new(Options) + for _, option := range options { + option(opts) } + + if expiry := opts.ExpiryDuration; expiry < 0 { + return nil, ErrInvalidPoolExpiry + } else if expiry == 0 { + opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second + } + var p *Pool - if preAlloc { + if opts.PreAlloc { p = &Pool{ - capacity: int32(size), - expiryDuration: time.Duration(expiry) * time.Second, - workers: make([]*Worker, 0, size), + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + workers: make([]*goWorker, 0, size), + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, } } else { p = &Pool{ - capacity: int32(size), - expiryDuration: time.Duration(expiry) * time.Second, + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, } } p.cond = sync.NewCond(&p.lock) + + // Start a goroutine to clean up expired workers periodically. go p.periodicallyPurge() + return p, nil } @@ -188,12 +196,12 @@ func (p *Pool) Cap() int { } // Tune changes the capacity of this pool. -func (p *Pool) Tune(size int) { - if p.Cap() == size { +func (p *Pool) Tune(size uint) { + if p.Cap() == int(size) { return } atomic.StoreInt32(&p.capacity, int32(size)) - diff := p.Running() - size + diff := p.Running() - int(size) for i := 0; i < diff; i++ { p.retrieveWorker().task <- nil } @@ -227,13 +235,13 @@ func (p *Pool) decRunning() { } // retrieveWorker returns a available worker to run the tasks. -func (p *Pool) retrieveWorker() *Worker { - var w *Worker +func (p *Pool) retrieveWorker() *goWorker { + var w *goWorker spawnWorker := func() { if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*Worker) + w = cacheWorker.(*goWorker) } else { - w = &Worker{ + w = &goWorker{ pool: p, task: make(chan func(), workerChanCap), } @@ -253,12 +261,12 @@ func (p *Pool) retrieveWorker() *Worker { p.lock.Unlock() spawnWorker() } else { - if p.Nonblocking { + if p.nonblocking { p.lock.Unlock() return nil } Reentry: - if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks { + if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks { p.lock.Unlock() return nil } @@ -283,7 +291,7 @@ func (p *Pool) retrieveWorker() *Worker { } // revertWorker puts a worker back into free pool, recycling the goroutines. -func (p *Pool) revertWorker(worker *Worker) bool { +func (p *Pool) revertWorker(worker *goWorker) bool { if atomic.LoadInt32(&p.release) == CLOSED { return false } diff --git a/pool_func.go b/pool_func.go index c772de6..95fce62 100644 --- a/pool_func.go +++ b/pool_func.go @@ -40,7 +40,7 @@ type PoolWithFunc struct { expiryDuration time.Duration // workers is a slice that store the available workers. - workers []*WorkerWithFunc + workers []*goWorkerWithFunc // release is used to notice the pool to closed itself. release int32 @@ -60,22 +60,22 @@ type PoolWithFunc struct { // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. workerCache sync.Pool - // PanicHandler is used to handle panics from each worker goroutine. + // panicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. - PanicHandler func(interface{}) + panicHandler func(interface{}) // Max number of goroutine blocking on pool.Submit. // 0 (default value) means no such limit. - MaxBlockingTasks int32 + maxBlockingTasks int32 // goroutine already been blocked on pool.Submit // protected by pool.lock blockingNum int32 - // When Nonblocking is true, Pool.Submit will never be blocked. + // When nonblocking is true, Pool.Submit will never be blocked. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. - // When Nonblocking is true, MaxBlockingTasks is inoperative. - Nonblocking bool + // When nonblocking is true, MaxBlockingTasks is inoperative. + nonblocking bool } // Clear expired workers periodically. @@ -83,7 +83,7 @@ func (p *PoolWithFunc) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() - var expiredWorkers []*WorkerWithFunc + var expiredWorkers []*goWorkerWithFunc for range heartbeat.C { if atomic.LoadInt32(&p.release) == CLOSED { break @@ -125,40 +125,52 @@ func (p *PoolWithFunc) periodicallyPurge() { } // NewPoolWithFunc generates an instance of ants pool with a specific function. -func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error) { - return NewUltimatePoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, false) -} - -// NewPoolWithFuncPreMalloc generates an instance of ants pool with a specific function and the memory pre-allocation of pool size. -func NewPoolWithFuncPreMalloc(size int, pf func(interface{})) (*PoolWithFunc, error) { - return NewUltimatePoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, true) -} - -// NewUltimatePoolWithFunc generates an instance of ants pool with a specific function and a custom timed task. -func NewUltimatePoolWithFunc(size, expiry int, pf func(interface{}), preAlloc bool) (*PoolWithFunc, error) { +func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { if size <= 0 { return nil, ErrInvalidPoolSize } - if expiry <= 0 { - return nil, ErrInvalidPoolExpiry + + if pf == nil { + return nil, ErrLackPoolFunc } + + opts := new(Options) + for _, option := range options { + option(opts) + } + + if expiry := opts.ExpiryDuration; expiry < 0 { + return nil, ErrInvalidPoolExpiry + } else if expiry == 0 { + opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second + } + var p *PoolWithFunc - if preAlloc { + if opts.PreAlloc { p = &PoolWithFunc{ - capacity: int32(size), - expiryDuration: time.Duration(expiry) * time.Second, - poolFunc: pf, - workers: make([]*WorkerWithFunc, 0, size), + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + poolFunc: pf, + workers: make([]*goWorkerWithFunc, 0, size), + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, } } else { p = &PoolWithFunc{ - capacity: int32(size), - expiryDuration: time.Duration(expiry) * time.Second, - poolFunc: pf, + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + poolFunc: pf, + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, } } p.cond = sync.NewCond(&p.lock) + + // Start a goroutine to clean up expired workers periodically. go p.periodicallyPurge() + return p, nil } @@ -232,13 +244,13 @@ func (p *PoolWithFunc) decRunning() { } // retrieveWorker returns a available worker to run the tasks. -func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { - var w *WorkerWithFunc +func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { + var w *goWorkerWithFunc spawnWorker := func() { if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*WorkerWithFunc) + w = cacheWorker.(*goWorkerWithFunc) } else { - w = &WorkerWithFunc{ + w = &goWorkerWithFunc{ pool: p, args: make(chan interface{}, workerChanCap), } @@ -258,12 +270,12 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { p.lock.Unlock() spawnWorker() } else { - if p.Nonblocking { + if p.nonblocking { p.lock.Unlock() return nil } Reentry: - if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks { + if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks { p.lock.Unlock() return nil } @@ -288,7 +300,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { } // revertWorker puts a worker back into free pool, recycling the goroutines. -func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) bool { +func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { if atomic.LoadInt32(&p.release) == CLOSED { return false } diff --git a/worker.go b/worker.go index 5904e39..b79d9b8 100644 --- a/worker.go +++ b/worker.go @@ -28,10 +28,10 @@ import ( "time" ) -// Worker is the actual executor who runs the tasks, +// goWorker is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. -type Worker struct { +type goWorker struct { // pool who owns this worker. pool *Pool @@ -44,15 +44,15 @@ type Worker struct { // run starts a goroutine to repeat the process // that performs the function calls. -func (w *Worker) run() { +func (w *goWorker) run() { w.pool.incRunning() go func() { defer func() { if p := recover(); p != nil { w.pool.decRunning() w.pool.workerCache.Put(w) - if w.pool.PanicHandler != nil { - w.pool.PanicHandler(p) + if w.pool.panicHandler != nil { + w.pool.panicHandler(p) } else { log.Printf("worker exits from a panic: %v\n", p) var buf [4096]byte diff --git a/worker_func.go b/worker_func.go index 76e96a3..befd966 100644 --- a/worker_func.go +++ b/worker_func.go @@ -28,10 +28,10 @@ import ( "time" ) -// WorkerWithFunc is the actual executor who runs the tasks, +// goWorkerWithFunc is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. -type WorkerWithFunc struct { +type goWorkerWithFunc struct { // pool who owns this worker. pool *PoolWithFunc @@ -44,15 +44,15 @@ type WorkerWithFunc struct { // run starts a goroutine to repeat the process // that performs the function calls. -func (w *WorkerWithFunc) run() { +func (w *goWorkerWithFunc) run() { w.pool.incRunning() go func() { defer func() { if p := recover(); p != nil { w.pool.decRunning() w.pool.workerCache.Put(w) - if w.pool.PanicHandler != nil { - w.pool.PanicHandler(p) + if w.pool.panicHandler != nil { + w.pool.panicHandler(p) } else { log.Printf("worker with func exits from a panic: %v\n", p) var buf [4096]byte