From 9df33f340cea2bb5c3817e3536395b5a2f660f2d Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 15 Apr 2024 17:50:28 +0800 Subject: [PATCH 01/13] chore: retire the benchmark data --- README.md | 41 ----------------------------------------- README_ZH.md | 41 ----------------------------------------- 2 files changed, 82 deletions(-) diff --git a/README.md b/README.md index b06a61e..6370b3f 100644 --- a/README.md +++ b/README.md @@ -293,47 +293,6 @@ pool.Reboot() All tasks submitted to `ants` pool will not be guaranteed to be addressed in order, because those tasks scatter among a series of concurrent workers, thus those tasks would be executed concurrently. -## 🧲 Benchmarks - -
- In this benchmark result, the first and second benchmarks performed test cases with 1M tasks, and the rest of the benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool, and the capacity of this `ants` goroutine pool was limited to 50K. - -- BenchmarkGoroutine-4 represents the benchmarks with unlimited goroutines in Golang. - -- BenchmarkPoolGroutine-4 represents the benchmarks with an `ants` pool. - -### Benchmarks with Pool - -![](https://user-images.githubusercontent.com/7496278/51515499-f187c500-1e4e-11e9-80e5-3df8f94fa70f.png) - -In the above benchmark result, the first and second benchmarks performed test cases with 1M tasks, and the rest of the benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool and the capacity of this `ants` goroutine-pool was limited to 50K. - -**As you can see, `ants` performs 2 times faster than goroutines without a pool (10M tasks) and it only consumes half the memory compared with goroutines without a pool. (both in 1M and 10M tasks)** - -### Benchmarks with PoolWithFunc - -![](https://user-images.githubusercontent.com/7496278/51515565-1e3bdc80-1e4f-11e9-8a08-452ab91d117e.png) - -### Throughput (it is suitable for scenarios where tasks are submitted asynchronously without waiting for the final results) - -#### 100K tasks - -![](https://user-images.githubusercontent.com/7496278/51515590-36abf700-1e4f-11e9-91e4-7bd3dcb5f4a5.png) - -#### 1M tasks - -![](https://user-images.githubusercontent.com/7496278/51515596-44617c80-1e4f-11e9-89e3-01e19d2979a1.png) - -#### 10M tasks - -![](https://user-images.githubusercontent.com/7496278/52987732-537c2000-3437-11e9-86a6-177f00d7a1d6.png) - -## 📊 Performance Summary - -![](https://user-images.githubusercontent.com/7496278/63449727-3ae6d400-c473-11e9-81e3-8b3280d8288a.gif) - -**In conclusion, `ants` performs 2~6 times faster than goroutines without a pool and the memory consumption is reduced by 10 to 20 times.** - ## 👏 Contributors Please read our [Contributing Guidelines](CONTRIBUTING.md) before opening a PR and thank you to all the developers who already made contributions to `ants`! diff --git a/README_ZH.md b/README_ZH.md index cefcd87..419630c 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -294,47 +294,6 @@ pool.Reboot() `ants` 并不保证提交的任务被执行的顺序,执行的顺序也不是和提交的顺序保持一致,因为在 `ants` 是并发地处理所有提交的任务,提交的任务会被分派到正在并发运行的 workers 上去,因此那些任务将会被并发且无序地被执行。 -## 🧲 Benchmarks - -
-上图中的前两个 benchmark 测试结果是基于100w 任务量的条件,剩下的几个是基于 1000w 任务量的测试结果,`ants` 的默认池容量是 5w。 - -- BenchmarkGoroutine-4 代表原生 goroutine - -- BenchmarkPoolGroutine-4 代表使用 goroutine 池 `ants` - -### Benchmarks with Pool - -![](https://user-images.githubusercontent.com/7496278/51515499-f187c500-1e4e-11e9-80e5-3df8f94fa70f.png) - -**这里为了模拟大规模 goroutine 的场景,两次测试的并发次数分别是 100w 和 1000w,前两个测试分别是执行 100w 个并发任务不使用 Pool 和使用了`ants`的 Goroutine Pool 的性能,后两个则是 1000w 个任务下的表现,可以直观的看出在执行速度和内存使用上,`ants`的 Pool 都占有明显的优势。100w 的任务量,使用`ants`,执行速度与原生 goroutine 相当甚至略快,但只实际使用了不到 5w 个 goroutine 完成了全部任务,且内存消耗仅为原生并发的 40%;而当任务量达到 1000w,优势则更加明显了:用了 70w 左右的 goroutine 完成全部任务,执行速度比原生 goroutine 提高了 100%,且内存消耗依旧保持在不使用 Pool 的 40% 左右。** - -### Benchmarks with PoolWithFunc - -![](https://user-images.githubusercontent.com/7496278/51515565-1e3bdc80-1e4f-11e9-8a08-452ab91d117e.png) - -**因为`PoolWithFunc`这个 Pool 只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于`Pool`对原生 goroutine 在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生 goroutine 的 300%,而内存消耗的优势已经达到了两位数的差距,原生 goroutine 的内存消耗达到了`ants`的35倍且原生 goroutine 的每次执行的内存分配次数也达到了`ants`45倍,1000w 的任务量,`ants`的初始分配容量是 5w,因此它完成了所有的任务依旧只使用了 5w 个 goroutine!事实上,`ants`的 Goroutine Pool 的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。** - -### 吞吐量测试(适用于那种只管提交异步任务而无须关心结果的场景) - -#### 10w 任务量 - -![](https://user-images.githubusercontent.com/7496278/51515590-36abf700-1e4f-11e9-91e4-7bd3dcb5f4a5.png) - -#### 100w 任务量 - -![](https://user-images.githubusercontent.com/7496278/51515596-44617c80-1e4f-11e9-89e3-01e19d2979a1.png) - -#### 1000w 任务量 - -![](https://user-images.githubusercontent.com/7496278/52987732-537c2000-3437-11e9-86a6-177f00d7a1d6.png) - -## 📊 性能小结 - -![](https://user-images.githubusercontent.com/7496278/63449727-3ae6d400-c473-11e9-81e3-8b3280d8288a.gif) - -**从该 demo 测试吞吐性能对比可以看出,使用`ants`的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。** - ## 👏 贡献者 请在提 PR 之前仔细阅读 [Contributing Guidelines](CONTRIBUTING.md),感谢那些为 `ants` 贡献过代码的开发者! From 83817c11bbfcca5a28f8e6a320030cafb2f92670 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 15 Apr 2024 18:07:18 +0800 Subject: [PATCH 02/13] chore: fix some warnings for GitHub Actions --- .github/workflows/release-drafter.yml | 2 +- .github/workflows/test.yml | 21 ++++++--------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml index 36e3b5e..5f14746 100644 --- a/.github/workflows/release-drafter.yml +++ b/.github/workflows/release-drafter.yml @@ -32,7 +32,7 @@ jobs: # echo "GHE_HOST=${GITHUB_SERVER_URL##https:\/\/}" >> $GITHUB_ENV # Drafts your next Release notes as Pull Requests are merged into "master" - - uses: release-drafter/release-drafter@v5 + - uses: release-drafter/release-drafter@v6 # (Optional) specify config name to use, relative to .github/. Default: release-drafter.yml # with: # config-name: my-config.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d304abe..99501a1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,18 +36,19 @@ jobs: name: Run golangci-lint runs-on: ${{ matrix.os }} steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup Go uses: actions/setup-go@v5 with: go-version: '^1.16' - - - name: Checkout repository - uses: actions/checkout@v4 + cache: false - name: Setup and run golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v4 with: - version: v1.55.2 + version: v1.57.2 args: --timeout 5m -v -E gofumpt -E gocritic -E misspell -E revive -E godot test: needs: lint @@ -82,16 +83,6 @@ jobs: echo "SHORT_SHA=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT echo "GO_CACHE=$(go env GOCACHE)" >> $GITHUB_OUTPUT - - name: Cache go modules - uses: actions/cache@v4 - with: - path: | - ${{ steps.go-env.outputs.GO_CACHE }} - ~/go/pkg/mod - key: ${{ runner.os }}-${{ matrix.go }}-go-ci-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-${{ matrix.go }}-go-ci - - name: Run unit tests and integrated tests run: go test -v -race -coverprofile="codecov.report" -covermode=atomic From 34ff2c228286420cfdadf2a6f8b7b7a108952d02 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 15 Apr 2024 18:13:28 +0800 Subject: [PATCH 03/13] chore: fix a few lint issues in code --- ants_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ants_test.go b/ants_test.go index a78ff2f..230972f 100644 --- a/ants_test.go +++ b/ants_test.go @@ -242,7 +242,7 @@ func TestPanicHandler(t *testing.T) { c := atomic.LoadInt64(&panicCounter) assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c) assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic") - p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) })) @@ -274,7 +274,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) { c := atomic.LoadInt64(&panicCounter) assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c) assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic") - p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) })) @@ -667,7 +667,7 @@ func TestWithDisablePurgePoolFunc(t *testing.T) { var wg1, wg2 sync.WaitGroup wg1.Add(numWorker) wg2.Add(numWorker) - p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { + p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) { wg1.Done() <-sig wg2.Done() @@ -682,7 +682,7 @@ func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) { wg1.Add(numWorker) wg2.Add(numWorker) expiredDuration := time.Millisecond * 100 - p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { + p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) { wg1.Done() <-sig wg2.Done() @@ -914,7 +914,7 @@ func TestPoolTuneScaleUp(t *testing.T) { p.Release() // test PoolWithFunc - pf, _ := NewPoolWithFunc(2, func(i interface{}) { + pf, _ := NewPoolWithFunc(2, func(_ interface{}) { <-c }) for i := 0; i < 2; i++ { From 0729518fc6cc841cfa109451684b68d9d683a012 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 16 Jun 2024 15:16:18 +0800 Subject: [PATCH 04/13] chore: update READMEs --- README.md | 137 +++++++++++++++++++++++++++++++++++++++++++++++++-- README_ZH.md | 135 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 267 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6370b3f..a38d767 100644 --- a/README.md +++ b/README.md @@ -298,7 +298,7 @@ All tasks submitted to `ants` pool will not be guaranteed to be addressed in ord Please read our [Contributing Guidelines](CONTRIBUTING.md) before opening a PR and thank you to all the developers who already made contributions to `ants`! - + ## 📄 License @@ -318,7 +318,98 @@ The source code in `ants` is available under the [MIT License](/LICENSE). The following companies/organizations use `ants` in production. -                           + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
### open-source software @@ -381,7 +472,47 @@ Become a bronze sponsor with a monthly donation of $10 and get your logo on our ## 💵 Patrons -Patrick Othmer Jimmy ChenZhen Mai Yang 王开帅 Unger Alejandro Weng Wei + + + + + + + + + + + + +
+ + Patrick Othmer + + + + Jimmy + + + + ChenZhen + + + + Mai Yang + + + + 王开帅 + + + + Unger Alejandro + + + + Weng Wei + +
## 🔋 Sponsorship diff --git a/README_ZH.md b/README_ZH.md index 419630c..9fc787e 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -319,7 +319,98 @@ pool.Reboot() 以下公司/组织在生产环境上使用了 `ants`。 -                           + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
### 开源软件 @@ -382,7 +473,47 @@ pool.Reboot() ## 资助者 -Patrick Othmer Jimmy ChenZhen Mai Yang 王开帅 Unger Alejandro Weng Wei + + + + + + + + + + + + +
+ + Patrick Othmer + + + + Jimmy + + + + ChenZhen + + + + Mai Yang + + + + 王开帅 + + + + Unger Alejandro + + + + Weng Wei + +
## 🔋 赞助商 From ee5a7183d954de7bb9831846c2407886fe8f9d42 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 17 Jun 2024 17:56:12 +0800 Subject: [PATCH 05/13] chore: add new use case --- README.md | 7 +++++++ README_ZH.md | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/README.md b/README.md index a38d767..bdf7487 100644 --- a/README.md +++ b/README.md @@ -408,6 +408,13 @@ The following companies/organizations use `ants` in production. + + + + + + + diff --git a/README_ZH.md b/README_ZH.md index 9fc787e..fa041cb 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -409,6 +409,13 @@ pool.Reboot() + + + + + + + From 0d650f5c1e6ecde449db0f401064e36ad06d012c Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 17 Jun 2024 18:21:01 +0800 Subject: [PATCH 06/13] opt: increase the interval of waiting in ReleaseTimeout() (#325) --- ants.go | 5 ++++- pool.go | 2 +- pool_func.go | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ants.go b/ants.go index 4b61ba2..2da0304 100644 --- a/ants.go +++ b/ants.go @@ -96,7 +96,10 @@ var ( defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) -const nowTimeUpdateInterval = 500 * time.Millisecond +const ( + nowTimeUpdateInterval = 500 * time.Millisecond + releaseTimeoutInterval = 100 * time.Millisecond +) // Logger is used for logging formatted messages. type Logger interface { diff --git a/pool.go b/pool.go index c69dde5..5ef1874 100644 --- a/pool.go +++ b/pool.go @@ -306,7 +306,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } - time.Sleep(10 * time.Millisecond) + time.Sleep(releaseTimeoutInterval) } return ErrTimeout } diff --git a/pool_func.go b/pool_func.go index 6840384..ef3a664 100644 --- a/pool_func.go +++ b/pool_func.go @@ -311,7 +311,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } - time.Sleep(10 * time.Millisecond) + time.Sleep(releaseTimeoutInterval) } return ErrTimeout } From b2374d5ae4b905b0f25344e379dd2bfeb58048a5 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 17 Jun 2024 20:03:43 +0800 Subject: [PATCH 07/13] ci: replace macos-latest with macos-12 for go1.13 (#326) --- .github/workflows/test.yml | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 99501a1..0434350 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -55,8 +55,22 @@ jobs: strategy: fail-fast: false matrix: - go: [1.13, 1.21] + go: [1.13, 1.22] os: [ubuntu-latest, macos-latest, windows-latest] + include: + # TODO(panjf2000): There is an uncanny issue arising when downloading + # go modules on macOS 13 for Go1.13. So we use macOS 12 for now, + # but try to figure it out and use macOS once it's resolved. + # https://github.com/panjf2000/ants/actions/runs/9546726268/job/26310385582 + - go: 1.13 + os: macos-12 + exclude: + # Starting macOS 14 GitHub Actions runners are arm-based, + # but Go didn't support arm64 until 1.16. Thus, we must + # replace the macOS 14 runner with macOS 12 runner for Go 1.13. + # Ref: https://github.com/actions/runner-images/issues/9741 + - go: 1.13 + os: macos-latest name: Go ${{ matrix.go }} @ ${{ matrix.os }} runs-on: ${{ matrix.os}} steps: From 3ffd3daa372c14cdc187acdd5c13d446416e487a Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 17 Jun 2024 20:13:15 +0800 Subject: [PATCH 08/13] opt: calculate the interval for ReleaseTimeout() based on a default count (#327) This PR reverts #325 to some extent. --- ants.go | 4 ++-- pool.go | 3 ++- pool_func.go | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ants.go b/ants.go index 2da0304..246d91c 100644 --- a/ants.go +++ b/ants.go @@ -97,8 +97,8 @@ var ( ) const ( - nowTimeUpdateInterval = 500 * time.Millisecond - releaseTimeoutInterval = 100 * time.Millisecond + nowTimeUpdateInterval = 500 * time.Millisecond + releaseTimeoutCount = 10 ) // Logger is used for logging formatted messages. diff --git a/pool.go b/pool.go index 5ef1874..8406da4 100644 --- a/pool.go +++ b/pool.go @@ -299,6 +299,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { } p.Release() + interval := timeout / releaseTimeoutCount endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { if p.Running() == 0 && @@ -306,7 +307,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } - time.Sleep(releaseTimeoutInterval) + time.Sleep(interval) } return ErrTimeout } diff --git a/pool_func.go b/pool_func.go index ef3a664..290ae36 100644 --- a/pool_func.go +++ b/pool_func.go @@ -304,6 +304,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { } p.Release() + interval := timeout / releaseTimeoutCount endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { if p.Running() == 0 && @@ -311,7 +312,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } - time.Sleep(releaseTimeoutInterval) + time.Sleep(interval) } return ErrTimeout } From 15e896153d22d6f7bb5028b2dc5e6919105d4e1f Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 18 Jun 2024 01:06:48 +0800 Subject: [PATCH 09/13] opt: make ReleaseTimeout() more efficient in waiting workers to exit (#329) --- pool.go | 92 +++++++++++++++++++++++++++------------- pool_func.go | 113 +++++++++++++++++++++++-------------------------- worker.go | 6 ++- worker_func.go | 6 ++- 4 files changed, 126 insertions(+), 91 deletions(-) diff --git a/pool.go b/pool.go index 8406da4..aeab50b 100644 --- a/pool.go +++ b/pool.go @@ -31,9 +31,7 @@ import ( syncx "github.com/panjf2000/ants/v2/internal/sync" ) -// Pool accepts the tasks and process them concurrently, -// it limits the total of goroutines to a given number by recycling goroutines. -type Pool struct { +type poolCommon struct { // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool // which submits a new task to the same pool. @@ -54,6 +52,11 @@ type Pool struct { // cond for waiting to get an idle worker. cond *sync.Cond + // done is used to indicate that all workers are done. + allDone chan struct{} + // once is used to make sure the pool is closed just once. + once *sync.Once + // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. workerCache sync.Pool @@ -61,9 +64,11 @@ type Pool struct { waiting int32 purgeDone int32 + purgeCtx context.Context stopPurge context.CancelFunc ticktockDone int32 + ticktockCtx context.Context stopTicktock context.CancelFunc now atomic.Value @@ -71,8 +76,14 @@ type Pool struct { options *Options } +// Pool accepts the tasks and process them concurrently, +// it limits the total of goroutines to a given number by recycling goroutines. +type Pool struct { + poolCommon +} + // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *Pool) purgeStaleWorkers(ctx context.Context) { +func (p *Pool) purgeStaleWorkers() { ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { @@ -82,7 +93,7 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-p.purgeCtx.Done(): return case <-ticker.C: } @@ -116,7 +127,7 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *Pool) ticktock(ctx context.Context) { +func (p *Pool) ticktock() { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() @@ -125,7 +136,7 @@ func (p *Pool) ticktock(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-p.ticktockCtx.Done(): return case <-ticker.C: } @@ -144,16 +155,14 @@ func (p *Pool) goPurge() { } // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers(ctx) + p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers() } func (p *Pool) goTicktock() { p.now.Store(time.Now()) - var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock(ctx) + p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock() } func (p *Pool) nowTime() time.Time { @@ -180,11 +189,13 @@ func NewPool(size int, options ...Option) (*Pool, error) { opts.Logger = defaultLogger } - p := &Pool{ + p := &Pool{poolCommon: poolCommon{ capacity: int32(size), + allDone: make(chan struct{}), lock: syncx.NewSpinLock(), + once: &sync.Once{}, options: opts, - } + }} p.workerCache.New = func() interface{} { return &goWorker{ pool: p, @@ -281,8 +292,10 @@ func (p *Pool) Release() { p.stopPurge() p.stopPurge = nil } - p.stopTicktock() - p.stopTicktock = nil + if p.stopTicktock != nil { + p.stopTicktock() + p.stopTicktock = nil + } p.lock.Lock() p.workers.reset() @@ -297,19 +310,38 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } + p.Release() - interval := timeout / releaseTimeoutCount - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } - time.Sleep(interval) + var purgeCh <-chan struct{} + if !p.options.DisablePurge { + purgeCh = p.purgeCtx.Done() + } else { + purgeCh = p.allDone + } + + if p.Running() == 0 { + p.once.Do(func() { + close(p.allDone) + }) + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-timer.C: + return ErrTimeout + case <-p.allDone: + <-purgeCh + <-p.ticktockCtx.Done() + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { + return nil + } + } } - return ErrTimeout } // Reboot reboots a closed pool. @@ -319,11 +351,13 @@ func (p *Pool) Reboot() { p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) p.goTicktock() + p.allDone = make(chan struct{}) + p.once = &sync.Once{} } } -func (p *Pool) addRunning(delta int) { - atomic.AddInt32(&p.running, int32(delta)) +func (p *Pool) addRunning(delta int) int { + return int(atomic.AddInt32(&p.running, int32(delta))) } func (p *Pool) addWaiting(delta int) { diff --git a/pool_func.go b/pool_func.go index 290ae36..c9ab90e 100644 --- a/pool_func.go +++ b/pool_func.go @@ -34,46 +34,14 @@ import ( // PoolWithFunc accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { - // capacity of the pool. - capacity int32 - - // running is the number of the currently running goroutines. - running int32 - - // lock for protecting the worker queue. - lock sync.Locker - - // workers is a slice that store the available workers. - workers workerQueue - - // state is used to notice the pool to closed itself. - state int32 - - // cond for waiting to get an idle worker. - cond *sync.Cond + poolCommon // poolFunc is the function for processing tasks. poolFunc func(interface{}) - - // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. - workerCache sync.Pool - - // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock - waiting int32 - - purgeDone int32 - stopPurge context.CancelFunc - - ticktockDone int32 - stopTicktock context.CancelFunc - - now atomic.Value - - options *Options } // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { +func (p *PoolWithFunc) purgeStaleWorkers() { ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { ticker.Stop() @@ -82,7 +50,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-p.purgeCtx.Done(): return case <-ticker.C: } @@ -116,7 +84,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *PoolWithFunc) ticktock(ctx context.Context) { +func (p *PoolWithFunc) ticktock() { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() @@ -125,7 +93,7 @@ func (p *PoolWithFunc) ticktock(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-p.ticktockCtx.Done(): return case <-ticker.C: } @@ -144,16 +112,14 @@ func (p *PoolWithFunc) goPurge() { } // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers(ctx) + p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers() } func (p *PoolWithFunc) goTicktock() { p.now.Store(time.Now()) - var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock(ctx) + p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock() } func (p *PoolWithFunc) nowTime() time.Time { @@ -185,10 +151,14 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p := &PoolWithFunc{ - capacity: int32(size), + poolCommon: poolCommon{ + capacity: int32(size), + allDone: make(chan struct{}), + lock: syncx.NewSpinLock(), + once: &sync.Once{}, + options: opts, + }, poolFunc: pf, - lock: syncx.NewSpinLock(), - options: opts, } p.workerCache.New = func() interface{} { return &goWorkerWithFunc{ @@ -286,8 +256,10 @@ func (p *PoolWithFunc) Release() { p.stopPurge() p.stopPurge = nil } - p.stopTicktock() - p.stopTicktock = nil + if p.stopTicktock != nil { + p.stopTicktock() + p.stopTicktock = nil + } p.lock.Lock() p.workers.reset() @@ -302,19 +274,38 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } + p.Release() - interval := timeout / releaseTimeoutCount - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } - time.Sleep(interval) + var purgeCh <-chan struct{} + if !p.options.DisablePurge { + purgeCh = p.purgeCtx.Done() + } else { + purgeCh = p.allDone + } + + if p.Running() == 0 { + p.once.Do(func() { + close(p.allDone) + }) + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-timer.C: + return ErrTimeout + case <-p.allDone: + <-purgeCh + <-p.ticktockCtx.Done() + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { + return nil + } + } } - return ErrTimeout } // Reboot reboots a closed pool. @@ -324,11 +315,13 @@ func (p *PoolWithFunc) Reboot() { p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) p.goTicktock() + p.allDone = make(chan struct{}) + p.once = &sync.Once{} } } -func (p *PoolWithFunc) addRunning(delta int) { - atomic.AddInt32(&p.running, int32(delta)) +func (p *PoolWithFunc) addRunning(delta int) int { + return int(atomic.AddInt32(&p.running, int32(delta))) } func (p *PoolWithFunc) addWaiting(delta int) { diff --git a/worker.go b/worker.go index 887bb98..73166f8 100644 --- a/worker.go +++ b/worker.go @@ -47,7 +47,11 @@ func (w *goWorker) run() { w.pool.addRunning(1) go func() { defer func() { - w.pool.addRunning(-1) + if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() { + w.pool.once.Do(func() { + close(w.pool.allDone) + }) + } w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { diff --git a/worker_func.go b/worker_func.go index 701a076..a25f4f9 100644 --- a/worker_func.go +++ b/worker_func.go @@ -47,7 +47,11 @@ func (w *goWorkerWithFunc) run() { w.pool.addRunning(1) go func() { defer func() { - w.pool.addRunning(-1) + if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() { + w.pool.once.Do(func() { + close(w.pool.allDone) + }) + } w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { From 1933478e2e2b753f923f45093f39ac008648ff21 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 18 Jun 2024 01:09:42 +0800 Subject: [PATCH 10/13] chore: remove the unused constant of releaseTimeoutCount --- ants.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/ants.go b/ants.go index 246d91c..4b61ba2 100644 --- a/ants.go +++ b/ants.go @@ -96,10 +96,7 @@ var ( defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) -const ( - nowTimeUpdateInterval = 500 * time.Millisecond - releaseTimeoutCount = 10 -) +const nowTimeUpdateInterval = 500 * time.Millisecond // Logger is used for logging formatted messages. type Logger interface { From 95dad45c7d353b1cd30e46414d5bfd73ce145ba1 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 18 Jun 2024 02:00:36 +0800 Subject: [PATCH 11/13] bug: alleviate the data race between Release() and Reboot() (#330) --- ants_test.go | 10 +++++----- pool.go | 8 ++++++-- pool_func.go | 8 ++++++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/ants_test.go b/ants_test.go index 230972f..5297243 100644 --- a/ants_test.go +++ b/ants_test.go @@ -503,7 +503,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { func TestRebootDefaultPool(t *testing.T) { defer Release() - Reboot() + Reboot() // should do nothing inside var wg sync.WaitGroup wg.Add(1) _ = Submit(func() { @@ -511,7 +511,7 @@ func TestRebootDefaultPool(t *testing.T) { wg.Done() }) wg.Wait() - Release() + assert.NoError(t, ReleaseTimeout(time.Second)) assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed") Reboot() wg.Add(1) @@ -530,7 +530,7 @@ func TestRebootNewPool(t *testing.T) { wg.Done() }) wg.Wait() - p.Release() + assert.NoError(t, p.ReleaseTimeout(time.Second)) assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed") p.Reboot() wg.Add(1) @@ -546,7 +546,7 @@ func TestRebootNewPool(t *testing.T) { wg.Add(1) _ = p1.Invoke(1) wg.Wait() - p1.Release() + assert.NoError(t, p1.ReleaseTimeout(time.Second)) assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed") p1.Reboot() wg.Add(1) @@ -975,7 +975,7 @@ func TestReleaseTimeout(t *testing.T) { } func TestDefaultPoolReleaseTimeout(t *testing.T) { - Reboot() + Reboot() // should do nothing inside for i := 0; i < 5; i++ { _ = Submit(func() { time.Sleep(time.Second) diff --git a/pool.go b/pool.go index aeab50b..9c28874 100644 --- a/pool.go +++ b/pool.go @@ -91,9 +91,10 @@ func (p *Pool) purgeStaleWorkers() { atomic.StoreInt32(&p.purgeDone, 1) }() + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-p.purgeCtx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -344,7 +345,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { } } -// Reboot reboots a closed pool. +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) diff --git a/pool_func.go b/pool_func.go index c9ab90e..a73d979 100644 --- a/pool_func.go +++ b/pool_func.go @@ -48,9 +48,10 @@ func (p *PoolWithFunc) purgeStaleWorkers() { atomic.StoreInt32(&p.purgeDone, 1) }() + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-p.purgeCtx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -308,7 +309,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { } } -// Reboot reboots a closed pool. +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) From da22980e2cb200e137ffb530882fd822d0e1b28e Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 18 Jun 2024 02:42:55 +0800 Subject: [PATCH 12/13] opt: speed up ReleaseTimeout() for multi-pool (#332) --- multipool.go | 39 +++++++++++++++++++++++++++++++-------- multipool_func.go | 39 +++++++++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/multipool.go b/multipool.go index 1de75ae..ad5db10 100644 --- a/multipool.go +++ b/multipool.go @@ -28,6 +28,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/sync/errgroup" ) // LoadBalancingStrategy represents the type of load-balancing algorithm. @@ -182,14 +184,35 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { return ErrPoolClosed } - var errStr strings.Builder + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group for i, pool := range mp.pools { - if err := pool.ReleaseTimeout(timeout); err != nil { - errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) - if i < len(mp.pools)-1 { - errStr.WriteString(" | ") - } - return err + func(p *Pool, idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var ( + i int + errStr strings.Builder + ) + for err := range errCh { + i++ + if i == len(mp.pools) { + break + } + if err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") } } @@ -197,7 +220,7 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { return nil } - return errors.New(errStr.String()) + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } // Reboot reboots a released multi-pool. diff --git a/multipool_func.go b/multipool_func.go index c7d31ff..257b16e 100644 --- a/multipool_func.go +++ b/multipool_func.go @@ -28,6 +28,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/sync/errgroup" ) // MultiPoolWithFunc consists of multiple pools, from which you will benefit the @@ -172,14 +174,35 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { return ErrPoolClosed } - var errStr strings.Builder + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group for i, pool := range mp.pools { - if err := pool.ReleaseTimeout(timeout); err != nil { - errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) - if i < len(mp.pools)-1 { - errStr.WriteString(" | ") - } - return err + func(p *PoolWithFunc, idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var ( + i int + errStr strings.Builder + ) + for err := range errCh { + i++ + if i == len(mp.pools) { + break + } + if err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") } } @@ -187,7 +210,7 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { return nil } - return errors.New(errStr.String()) + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } // Reboot reboots a released multi-pool. From b40e489286d17a26c88461e70e018fabefa23c95 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 18 Jun 2024 03:05:09 +0800 Subject: [PATCH 13/13] bug: alleviate the data race between Release() and Reboot() (#333) --- pool.go | 3 ++- pool_func.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index 9c28874..33e46ed 100644 --- a/pool.go +++ b/pool.go @@ -135,9 +135,10 @@ func (p *Pool) ticktock() { atomic.StoreInt32(&p.ticktockDone, 1) }() + ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-p.ticktockCtx.Done(): + case <-ticktockCtx.Done(): return case <-ticker.C: } diff --git a/pool_func.go b/pool_func.go index a73d979..140d5fe 100644 --- a/pool_func.go +++ b/pool_func.go @@ -92,9 +92,10 @@ func (p *PoolWithFunc) ticktock() { atomic.StoreInt32(&p.ticktockDone, 1) }() + ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-p.ticktockCtx.Done(): + case <-ticktockCtx.Done(): return case <-ticker.C: }