diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml index 36e3b5e..5f14746 100644 --- a/.github/workflows/release-drafter.yml +++ b/.github/workflows/release-drafter.yml @@ -32,7 +32,7 @@ jobs: # echo "GHE_HOST=${GITHUB_SERVER_URL##https:\/\/}" >> $GITHUB_ENV # Drafts your next Release notes as Pull Requests are merged into "master" - - uses: release-drafter/release-drafter@v5 + - uses: release-drafter/release-drafter@v6 # (Optional) specify config name to use, relative to .github/. Default: release-drafter.yml # with: # config-name: my-config.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d304abe..0434350 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,26 +36,41 @@ jobs: name: Run golangci-lint runs-on: ${{ matrix.os }} steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Setup Go uses: actions/setup-go@v5 with: go-version: '^1.16' - - - name: Checkout repository - uses: actions/checkout@v4 + cache: false - name: Setup and run golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v4 with: - version: v1.55.2 + version: v1.57.2 args: --timeout 5m -v -E gofumpt -E gocritic -E misspell -E revive -E godot test: needs: lint strategy: fail-fast: false matrix: - go: [1.13, 1.21] + go: [1.13, 1.22] os: [ubuntu-latest, macos-latest, windows-latest] + include: + # TODO(panjf2000): There is an uncanny issue arising when downloading + # go modules on macOS 13 for Go1.13. So we use macOS 12 for now, + # but try to figure it out and use macOS once it's resolved. + # https://github.com/panjf2000/ants/actions/runs/9546726268/job/26310385582 + - go: 1.13 + os: macos-12 + exclude: + # Starting macOS 14 GitHub Actions runners are arm-based, + # but Go didn't support arm64 until 1.16. Thus, we must + # replace the macOS 14 runner with macOS 12 runner for Go 1.13. + # Ref: https://github.com/actions/runner-images/issues/9741 + - go: 1.13 + os: macos-latest name: Go ${{ matrix.go }} @ ${{ matrix.os }} runs-on: ${{ matrix.os}} steps: @@ -82,16 +97,6 @@ jobs: echo "SHORT_SHA=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT echo "GO_CACHE=$(go env GOCACHE)" >> $GITHUB_OUTPUT - - name: Cache go modules - uses: actions/cache@v4 - with: - path: | - ${{ steps.go-env.outputs.GO_CACHE }} - ~/go/pkg/mod - key: ${{ runner.os }}-${{ matrix.go }}-go-ci-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-${{ matrix.go }}-go-ci - - name: Run unit tests and integrated tests run: go test -v -race -coverprofile="codecov.report" -covermode=atomic diff --git a/README.md b/README.md index b06a61e..bdf7487 100644 --- a/README.md +++ b/README.md @@ -293,53 +293,12 @@ pool.Reboot() All tasks submitted to `ants` pool will not be guaranteed to be addressed in order, because those tasks scatter among a series of concurrent workers, thus those tasks would be executed concurrently. -## 🧲 Benchmarks - -
- In this benchmark result, the first and second benchmarks performed test cases with 1M tasks, and the rest of the benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool, and the capacity of this `ants` goroutine pool was limited to 50K. - -- BenchmarkGoroutine-4 represents the benchmarks with unlimited goroutines in Golang. - -- BenchmarkPoolGroutine-4 represents the benchmarks with an `ants` pool. - -### Benchmarks with Pool - -![](https://user-images.githubusercontent.com/7496278/51515499-f187c500-1e4e-11e9-80e5-3df8f94fa70f.png) - -In the above benchmark result, the first and second benchmarks performed test cases with 1M tasks, and the rest of the benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool and the capacity of this `ants` goroutine-pool was limited to 50K. - -**As you can see, `ants` performs 2 times faster than goroutines without a pool (10M tasks) and it only consumes half the memory compared with goroutines without a pool. (both in 1M and 10M tasks)** - -### Benchmarks with PoolWithFunc - -![](https://user-images.githubusercontent.com/7496278/51515565-1e3bdc80-1e4f-11e9-8a08-452ab91d117e.png) - -### Throughput (it is suitable for scenarios where tasks are submitted asynchronously without waiting for the final results) - -#### 100K tasks - -![](https://user-images.githubusercontent.com/7496278/51515590-36abf700-1e4f-11e9-91e4-7bd3dcb5f4a5.png) - -#### 1M tasks - -![](https://user-images.githubusercontent.com/7496278/51515596-44617c80-1e4f-11e9-89e3-01e19d2979a1.png) - -#### 10M tasks - -![](https://user-images.githubusercontent.com/7496278/52987732-537c2000-3437-11e9-86a6-177f00d7a1d6.png) - -## 📊 Performance Summary - -![](https://user-images.githubusercontent.com/7496278/63449727-3ae6d400-c473-11e9-81e3-8b3280d8288a.gif) - -**In conclusion, `ants` performs 2~6 times faster than goroutines without a pool and the memory consumption is reduced by 10 to 20 times.** - ## 👏 Contributors Please read our [Contributing Guidelines](CONTRIBUTING.md) before opening a PR and thank you to all the developers who already made contributions to `ants`! - + ## 📄 License @@ -359,7 +318,105 @@ The source code in `ants` is available under the [MIT License](/LICENSE). The following companies/organizations use `ants` in production. -                           + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + +
### open-source software @@ -422,7 +479,47 @@ Become a bronze sponsor with a monthly donation of $10 and get your logo on our ## 💵 Patrons -Patrick Othmer Jimmy ChenZhen Mai Yang 王开帅 Unger Alejandro Weng Wei + + + + + + + + + + + + +
+ + Patrick Othmer + + + + Jimmy + + + + ChenZhen + + + + Mai Yang + + + + 王开帅 + + + + Unger Alejandro + + + + Weng Wei + +
## 🔋 Sponsorship diff --git a/README_ZH.md b/README_ZH.md index cefcd87..fa041cb 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -294,47 +294,6 @@ pool.Reboot() `ants` 并不保证提交的任务被执行的顺序,执行的顺序也不是和提交的顺序保持一致,因为在 `ants` 是并发地处理所有提交的任务,提交的任务会被分派到正在并发运行的 workers 上去,因此那些任务将会被并发且无序地被执行。 -## 🧲 Benchmarks - -
-上图中的前两个 benchmark 测试结果是基于100w 任务量的条件,剩下的几个是基于 1000w 任务量的测试结果,`ants` 的默认池容量是 5w。 - -- BenchmarkGoroutine-4 代表原生 goroutine - -- BenchmarkPoolGroutine-4 代表使用 goroutine 池 `ants` - -### Benchmarks with Pool - -![](https://user-images.githubusercontent.com/7496278/51515499-f187c500-1e4e-11e9-80e5-3df8f94fa70f.png) - -**这里为了模拟大规模 goroutine 的场景,两次测试的并发次数分别是 100w 和 1000w,前两个测试分别是执行 100w 个并发任务不使用 Pool 和使用了`ants`的 Goroutine Pool 的性能,后两个则是 1000w 个任务下的表现,可以直观的看出在执行速度和内存使用上,`ants`的 Pool 都占有明显的优势。100w 的任务量,使用`ants`,执行速度与原生 goroutine 相当甚至略快,但只实际使用了不到 5w 个 goroutine 完成了全部任务,且内存消耗仅为原生并发的 40%;而当任务量达到 1000w,优势则更加明显了:用了 70w 左右的 goroutine 完成全部任务,执行速度比原生 goroutine 提高了 100%,且内存消耗依旧保持在不使用 Pool 的 40% 左右。** - -### Benchmarks with PoolWithFunc - -![](https://user-images.githubusercontent.com/7496278/51515565-1e3bdc80-1e4f-11e9-8a08-452ab91d117e.png) - -**因为`PoolWithFunc`这个 Pool 只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于`Pool`对原生 goroutine 在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生 goroutine 的 300%,而内存消耗的优势已经达到了两位数的差距,原生 goroutine 的内存消耗达到了`ants`的35倍且原生 goroutine 的每次执行的内存分配次数也达到了`ants`45倍,1000w 的任务量,`ants`的初始分配容量是 5w,因此它完成了所有的任务依旧只使用了 5w 个 goroutine!事实上,`ants`的 Goroutine Pool 的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。** - -### 吞吐量测试(适用于那种只管提交异步任务而无须关心结果的场景) - -#### 10w 任务量 - -![](https://user-images.githubusercontent.com/7496278/51515590-36abf700-1e4f-11e9-91e4-7bd3dcb5f4a5.png) - -#### 100w 任务量 - -![](https://user-images.githubusercontent.com/7496278/51515596-44617c80-1e4f-11e9-89e3-01e19d2979a1.png) - -#### 1000w 任务量 - -![](https://user-images.githubusercontent.com/7496278/52987732-537c2000-3437-11e9-86a6-177f00d7a1d6.png) - -## 📊 性能小结 - -![](https://user-images.githubusercontent.com/7496278/63449727-3ae6d400-c473-11e9-81e3-8b3280d8288a.gif) - -**从该 demo 测试吞吐性能对比可以看出,使用`ants`的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。** - ## 👏 贡献者 请在提 PR 之前仔细阅读 [Contributing Guidelines](CONTRIBUTING.md),感谢那些为 `ants` 贡献过代码的开发者! @@ -360,7 +319,105 @@ pool.Reboot() 以下公司/组织在生产环境上使用了 `ants`。 -                           + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
+ + + +
### 开源软件 @@ -423,7 +480,47 @@ pool.Reboot() ## 资助者 -Patrick Othmer Jimmy ChenZhen Mai Yang 王开帅 Unger Alejandro Weng Wei + + + + + + + + + + + + +
+ + Patrick Othmer + + + + Jimmy + + + + ChenZhen + + + + Mai Yang + + + + 王开帅 + + + + Unger Alejandro + + + + Weng Wei + +
## 🔋 赞助商 diff --git a/ants_test.go b/ants_test.go index a78ff2f..5297243 100644 --- a/ants_test.go +++ b/ants_test.go @@ -242,7 +242,7 @@ func TestPanicHandler(t *testing.T) { c := atomic.LoadInt64(&panicCounter) assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c) assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic") - p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) })) @@ -274,7 +274,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) { c := atomic.LoadInt64(&panicCounter) assert.EqualValuesf(t, 1, c, "panic handler didn't work, panicCounter: %d", c) assert.EqualValues(t, 0, p0.Running(), "pool should be empty after panic") - p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(_ interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) })) @@ -503,7 +503,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { func TestRebootDefaultPool(t *testing.T) { defer Release() - Reboot() + Reboot() // should do nothing inside var wg sync.WaitGroup wg.Add(1) _ = Submit(func() { @@ -511,7 +511,7 @@ func TestRebootDefaultPool(t *testing.T) { wg.Done() }) wg.Wait() - Release() + assert.NoError(t, ReleaseTimeout(time.Second)) assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed") Reboot() wg.Add(1) @@ -530,7 +530,7 @@ func TestRebootNewPool(t *testing.T) { wg.Done() }) wg.Wait() - p.Release() + assert.NoError(t, p.ReleaseTimeout(time.Second)) assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed") p.Reboot() wg.Add(1) @@ -546,7 +546,7 @@ func TestRebootNewPool(t *testing.T) { wg.Add(1) _ = p1.Invoke(1) wg.Wait() - p1.Release() + assert.NoError(t, p1.ReleaseTimeout(time.Second)) assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed") p1.Reboot() wg.Add(1) @@ -667,7 +667,7 @@ func TestWithDisablePurgePoolFunc(t *testing.T) { var wg1, wg2 sync.WaitGroup wg1.Add(numWorker) wg2.Add(numWorker) - p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { + p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) { wg1.Done() <-sig wg2.Done() @@ -682,7 +682,7 @@ func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) { wg1.Add(numWorker) wg2.Add(numWorker) expiredDuration := time.Millisecond * 100 - p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { + p, _ := NewPoolWithFunc(numWorker, func(_ interface{}) { wg1.Done() <-sig wg2.Done() @@ -914,7 +914,7 @@ func TestPoolTuneScaleUp(t *testing.T) { p.Release() // test PoolWithFunc - pf, _ := NewPoolWithFunc(2, func(i interface{}) { + pf, _ := NewPoolWithFunc(2, func(_ interface{}) { <-c }) for i := 0; i < 2; i++ { @@ -975,7 +975,7 @@ func TestReleaseTimeout(t *testing.T) { } func TestDefaultPoolReleaseTimeout(t *testing.T) { - Reboot() + Reboot() // should do nothing inside for i := 0; i < 5; i++ { _ = Submit(func() { time.Sleep(time.Second) diff --git a/multipool.go b/multipool.go index 1de75ae..ad5db10 100644 --- a/multipool.go +++ b/multipool.go @@ -28,6 +28,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/sync/errgroup" ) // LoadBalancingStrategy represents the type of load-balancing algorithm. @@ -182,14 +184,35 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { return ErrPoolClosed } - var errStr strings.Builder + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group for i, pool := range mp.pools { - if err := pool.ReleaseTimeout(timeout); err != nil { - errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) - if i < len(mp.pools)-1 { - errStr.WriteString(" | ") - } - return err + func(p *Pool, idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var ( + i int + errStr strings.Builder + ) + for err := range errCh { + i++ + if i == len(mp.pools) { + break + } + if err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") } } @@ -197,7 +220,7 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { return nil } - return errors.New(errStr.String()) + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } // Reboot reboots a released multi-pool. diff --git a/multipool_func.go b/multipool_func.go index c7d31ff..257b16e 100644 --- a/multipool_func.go +++ b/multipool_func.go @@ -28,6 +28,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/sync/errgroup" ) // MultiPoolWithFunc consists of multiple pools, from which you will benefit the @@ -172,14 +174,35 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { return ErrPoolClosed } - var errStr strings.Builder + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group for i, pool := range mp.pools { - if err := pool.ReleaseTimeout(timeout); err != nil { - errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) - if i < len(mp.pools)-1 { - errStr.WriteString(" | ") - } - return err + func(p *PoolWithFunc, idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var ( + i int + errStr strings.Builder + ) + for err := range errCh { + i++ + if i == len(mp.pools) { + break + } + if err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") } } @@ -187,7 +210,7 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { return nil } - return errors.New(errStr.String()) + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) } // Reboot reboots a released multi-pool. diff --git a/pool.go b/pool.go index c69dde5..33e46ed 100644 --- a/pool.go +++ b/pool.go @@ -31,9 +31,7 @@ import ( syncx "github.com/panjf2000/ants/v2/internal/sync" ) -// Pool accepts the tasks and process them concurrently, -// it limits the total of goroutines to a given number by recycling goroutines. -type Pool struct { +type poolCommon struct { // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool // which submits a new task to the same pool. @@ -54,6 +52,11 @@ type Pool struct { // cond for waiting to get an idle worker. cond *sync.Cond + // done is used to indicate that all workers are done. + allDone chan struct{} + // once is used to make sure the pool is closed just once. + once *sync.Once + // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. workerCache sync.Pool @@ -61,9 +64,11 @@ type Pool struct { waiting int32 purgeDone int32 + purgeCtx context.Context stopPurge context.CancelFunc ticktockDone int32 + ticktockCtx context.Context stopTicktock context.CancelFunc now atomic.Value @@ -71,8 +76,14 @@ type Pool struct { options *Options } +// Pool accepts the tasks and process them concurrently, +// it limits the total of goroutines to a given number by recycling goroutines. +type Pool struct { + poolCommon +} + // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *Pool) purgeStaleWorkers(ctx context.Context) { +func (p *Pool) purgeStaleWorkers() { ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { @@ -80,9 +91,10 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { atomic.StoreInt32(&p.purgeDone, 1) }() + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-ctx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -116,16 +128,17 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *Pool) ticktock(ctx context.Context) { +func (p *Pool) ticktock() { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() atomic.StoreInt32(&p.ticktockDone, 1) }() + ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-ctx.Done(): + case <-ticktockCtx.Done(): return case <-ticker.C: } @@ -144,16 +157,14 @@ func (p *Pool) goPurge() { } // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers(ctx) + p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers() } func (p *Pool) goTicktock() { p.now.Store(time.Now()) - var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock(ctx) + p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock() } func (p *Pool) nowTime() time.Time { @@ -180,11 +191,13 @@ func NewPool(size int, options ...Option) (*Pool, error) { opts.Logger = defaultLogger } - p := &Pool{ + p := &Pool{poolCommon: poolCommon{ capacity: int32(size), + allDone: make(chan struct{}), lock: syncx.NewSpinLock(), + once: &sync.Once{}, options: opts, - } + }} p.workerCache.New = func() interface{} { return &goWorker{ pool: p, @@ -281,8 +294,10 @@ func (p *Pool) Release() { p.stopPurge() p.stopPurge = nil } - p.stopTicktock() - p.stopTicktock = nil + if p.stopTicktock != nil { + p.stopTicktock() + p.stopTicktock = nil + } p.lock.Lock() p.workers.reset() @@ -297,32 +312,57 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } + p.Release() - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } - time.Sleep(10 * time.Millisecond) + var purgeCh <-chan struct{} + if !p.options.DisablePurge { + purgeCh = p.purgeCtx.Done() + } else { + purgeCh = p.allDone + } + + if p.Running() == 0 { + p.once.Do(func() { + close(p.allDone) + }) + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-timer.C: + return ErrTimeout + case <-p.allDone: + <-purgeCh + <-p.ticktockCtx.Done() + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { + return nil + } + } } - return ErrTimeout } -// Reboot reboots a closed pool. +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) p.goTicktock() + p.allDone = make(chan struct{}) + p.once = &sync.Once{} } } -func (p *Pool) addRunning(delta int) { - atomic.AddInt32(&p.running, int32(delta)) +func (p *Pool) addRunning(delta int) int { + return int(atomic.AddInt32(&p.running, int32(delta))) } func (p *Pool) addWaiting(delta int) { diff --git a/pool_func.go b/pool_func.go index 6840384..140d5fe 100644 --- a/pool_func.go +++ b/pool_func.go @@ -34,55 +34,24 @@ import ( // PoolWithFunc accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { - // capacity of the pool. - capacity int32 - - // running is the number of the currently running goroutines. - running int32 - - // lock for protecting the worker queue. - lock sync.Locker - - // workers is a slice that store the available workers. - workers workerQueue - - // state is used to notice the pool to closed itself. - state int32 - - // cond for waiting to get an idle worker. - cond *sync.Cond + poolCommon // poolFunc is the function for processing tasks. poolFunc func(interface{}) - - // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. - workerCache sync.Pool - - // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock - waiting int32 - - purgeDone int32 - stopPurge context.CancelFunc - - ticktockDone int32 - stopTicktock context.CancelFunc - - now atomic.Value - - options *Options } // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { +func (p *PoolWithFunc) purgeStaleWorkers() { ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { ticker.Stop() atomic.StoreInt32(&p.purgeDone, 1) }() + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-ctx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -116,16 +85,17 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *PoolWithFunc) ticktock(ctx context.Context) { +func (p *PoolWithFunc) ticktock() { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() atomic.StoreInt32(&p.ticktockDone, 1) }() + ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-ctx.Done(): + case <-ticktockCtx.Done(): return case <-ticker.C: } @@ -144,16 +114,14 @@ func (p *PoolWithFunc) goPurge() { } // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers(ctx) + p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers() } func (p *PoolWithFunc) goTicktock() { p.now.Store(time.Now()) - var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock(ctx) + p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock() } func (p *PoolWithFunc) nowTime() time.Time { @@ -185,10 +153,14 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p := &PoolWithFunc{ - capacity: int32(size), + poolCommon: poolCommon{ + capacity: int32(size), + allDone: make(chan struct{}), + lock: syncx.NewSpinLock(), + once: &sync.Once{}, + options: opts, + }, poolFunc: pf, - lock: syncx.NewSpinLock(), - options: opts, } p.workerCache.New = func() interface{} { return &goWorkerWithFunc{ @@ -286,8 +258,10 @@ func (p *PoolWithFunc) Release() { p.stopPurge() p.stopPurge = nil } - p.stopTicktock() - p.stopTicktock = nil + if p.stopTicktock != nil { + p.stopTicktock() + p.stopTicktock = nil + } p.lock.Lock() p.workers.reset() @@ -302,32 +276,57 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } + p.Release() - endTime := time.Now().Add(timeout) - for time.Now().Before(endTime) { - if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } - time.Sleep(10 * time.Millisecond) + var purgeCh <-chan struct{} + if !p.options.DisablePurge { + purgeCh = p.purgeCtx.Done() + } else { + purgeCh = p.allDone + } + + if p.Running() == 0 { + p.once.Do(func() { + close(p.allDone) + }) + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-timer.C: + return ErrTimeout + case <-p.allDone: + <-purgeCh + <-p.ticktockCtx.Done() + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { + return nil + } + } } - return ErrTimeout } -// Reboot reboots a closed pool. +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) p.goTicktock() + p.allDone = make(chan struct{}) + p.once = &sync.Once{} } } -func (p *PoolWithFunc) addRunning(delta int) { - atomic.AddInt32(&p.running, int32(delta)) +func (p *PoolWithFunc) addRunning(delta int) int { + return int(atomic.AddInt32(&p.running, int32(delta))) } func (p *PoolWithFunc) addWaiting(delta int) { diff --git a/worker.go b/worker.go index 887bb98..73166f8 100644 --- a/worker.go +++ b/worker.go @@ -47,7 +47,11 @@ func (w *goWorker) run() { w.pool.addRunning(1) go func() { defer func() { - w.pool.addRunning(-1) + if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() { + w.pool.once.Do(func() { + close(w.pool.allDone) + }) + } w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { diff --git a/worker_func.go b/worker_func.go index 701a076..a25f4f9 100644 --- a/worker_func.go +++ b/worker_func.go @@ -47,7 +47,11 @@ func (w *goWorkerWithFunc) run() { w.pool.addRunning(1) go func() { defer func() { - w.pool.addRunning(-1) + if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() { + w.pool.once.Do(func() { + close(w.pool.allDone) + }) + } w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil {