mirror of https://github.com/panjf2000/ants.git
commit
bd6ee4bf45
|
@ -44,6 +44,7 @@ autolabeler:
|
|||
title:
|
||||
- /fix/i
|
||||
- /bug/i
|
||||
- /patch/i
|
||||
- label: docs
|
||||
files:
|
||||
- '*.md'
|
||||
|
@ -65,11 +66,27 @@ autolabeler:
|
|||
- /feature/i
|
||||
- /implement/i
|
||||
- /add/i
|
||||
- /minor/i
|
||||
- label: dependencies
|
||||
title:
|
||||
- /dependencies/i
|
||||
- /upgrade/i
|
||||
- /bump up/i
|
||||
- label: chores
|
||||
title:
|
||||
- /chore/i
|
||||
- /\bmisc\b/i
|
||||
- /cleanup/i
|
||||
- /clean up/i
|
||||
- label: major
|
||||
title:
|
||||
- /major:/i
|
||||
- label: minor
|
||||
title:
|
||||
- /minor:/i
|
||||
- label: patch
|
||||
title:
|
||||
- /patch:/i
|
||||
template: |
|
||||
## Changelogs
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
go: [1.13, 1.19]
|
||||
go: [1.13, 1.21]
|
||||
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
|
||||
runs-on: ${{ matrix.os}}
|
||||
|
@ -92,7 +92,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-${{ matrix.go }}-go-ci
|
||||
|
||||
- name: Run unit tests
|
||||
- name: Run unit tests and integrated tests
|
||||
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic
|
||||
|
||||
- name: Upload code coverage report to Codecov
|
||||
|
|
15
README.md
15
README.md
|
@ -327,21 +327,30 @@ The source code in `ants` is available under the [MIT License](/LICENSE).
|
|||
|
||||
The following companies/organizations use `ants` in production.
|
||||
|
||||
<a href="https://www.tencent.com"><img src="http://img.taohuawu.club/gallery/tencent_logo.png" width="250" align="middle"/></a> <a href="https://www.bytedance.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/ByteDance_Logo.png" width="250" align="middle"/></a> <a href="https://tieba.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-tieba-logo.png" width="300" align="middle"/></a> <a href="https://www.sina.com.cn/" target="_blank"><img src="http://img.taohuawu.club/gallery/sina-logo.png" width="200" align="middle"/></a> <a href="https://www.163.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/netease-logo.png" width="150" align="middle"/></a> <a href="https://www.tencentmusic.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/tencent-music-logo.png" width="250" align="middle"/></a> <a href="https://www.futuhk.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/futu-logo.png" width="250" align="middle"/></a> <a href="https://www.shopify.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/shopify-logo.png" width="250" align="middle"/></a> <a href="https://www.wechat.com/en/" target="_blank"><img src="http://img.taohuawu.club/gallery/wechat-logo.png" width="250" align="middle"/></a><a href="https://www.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-mobile.png" width="250" align="middle"/></a>
|
||||
<a href="https://www.tencent.com"><img src="http://img.taohuawu.club/gallery/tencent_logo.png" width="250" align="middle"/></a> <a href="https://www.bytedance.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/ByteDance_Logo.png" width="250" align="middle"/></a> <a href="https://tieba.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-tieba-logo.png" width="300" align="middle"/></a> <a href="https://www.sina.com.cn/" target="_blank"><img src="http://img.taohuawu.club/gallery/sina-logo.png" width="200" align="middle"/></a> <a href="https://www.163.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/netease-logo.png" width="150" align="middle"/></a> <a href="https://www.tencentmusic.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/tencent-music-logo.png" width="250" align="middle"/></a> <a href="https://www.futuhk.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/futu-logo.png" width="250" align="middle"/></a> <a href="https://www.shopify.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/shopify-logo.png" width="250" align="middle"/></a> <a href="https://www.wechat.com/en/" target="_blank"><img src="http://img.taohuawu.club/gallery/wechat-logo.png" width="250" align="middle"/></a><a href="https://www.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-mobile.png" width="250" align="middle"/></a> <a href="https://www.360.com" target="_blank"><img src="http://img.taohuawu.club/gallery/360-logo.png" width="250" align="middle"/></a><a href="https://www.huaweicloud.com/intl/en-us/" target="_blank"><img src="https://res-static.hc-cdn.cn/cloudbu-site/china/zh-cn/%E7%BB%84%E4%BB%B6%E9%AA%8C%E8%AF%81/pep-common-header/logo-en.png" width="250" align="middle"/></a> <a href="https://www.matrixorigin.io" target="_blank"><img src="https://www.matrixorigin.io/_next/static/media/logo-light-en.42553c69.svg" width="250" align="middle"/></a> <a href="https://adguard-dns.io" target="_blank"><img src="https://cdn.adtidy.org/website/images/AdGuardDNS_black.svg" width="250" align="middle"/></a> <a href="https://bk.tencent.com" target="_blank"><img src="https://static.apiseven.com/2022/11/14/6371adab14119.png" width="250" align="middle"/></a>
|
||||
|
||||
### open-source software
|
||||
|
||||
The open-source projects below do concurrent programming with the help of `ants`.
|
||||
|
||||
- [gnet](https://github.com/panjf2000/gnet): A high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go.
|
||||
- [nps](https://github.com/ehang-io/nps): A lightweight, high-performance, powerful intranet penetration proxy server, with a powerful web management terminal.
|
||||
- [milvus](https://github.com/milvus-io/milvus): An open-source vector database for scalable similarity search and AI applications.
|
||||
- [nps](https://github.com/ehang-io/nps): A lightweight, high-performance, powerful intranet penetration proxy server, with a powerful web management terminal.
|
||||
- [siyuan](https://github.com/siyuan-note/siyuan): SiYuan is a local-first personal knowledge management system that supports complete offline use, as well as end-to-end encrypted synchronization.
|
||||
- [osmedeus](https://github.com/j3ssie/osmedeus): A Workflow Engine for Offensive Security.
|
||||
- [jitsu](https://github.com/jitsucom/jitsu): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days.
|
||||
- [jitsu](https://github.com/jitsucom/jitsu/tree/master): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days.
|
||||
- [triangula](https://github.com/RH12503/triangula): Generate high-quality triangulated and polygonal art from images.
|
||||
- [teler](https://github.com/kitabisa/teler): Real-time HTTP Intrusion Detection.
|
||||
- [bsc](https://github.com/binance-chain/bsc): A Binance Smart Chain client based on the go-ethereum fork.
|
||||
- [jaeles](https://github.com/jaeles-project/jaeles): The Swiss Army knife for automated Web Application Testing.
|
||||
- [devlake](https://github.com/apache/incubator-devlake): The open-source dev data platform & dashboard for your DevOps tools.
|
||||
- [matrixone](https://github.com/matrixorigin/matrixone): MatrixOne is a future-oriented hyper-converged cloud and edge native DBMS that supports transactional, analytical, and streaming workloads with a simplified and distributed database engine, across multiple data centers, clouds, edges and other heterogeneous infrastructures.
|
||||
- [bk-bcs](https://github.com/TencentBlueKing/bk-bcs): BlueKing Container Service (BCS, same below) is a container management and orchestration platform for the micro-services under the BlueKing ecosystem.
|
||||
- [trueblocks-core](https://github.com/TrueBlocks/trueblocks-core): TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local.
|
||||
- [openGemini](https://github.com/openGemini/openGemini): openGemini is an open-source,cloud-native time-series database(TSDB) that can be widely used in IoT, Internet of Vehicles(IoV), O&M monitoring, and industrial Internet scenarios.
|
||||
- [AdGuardDNS](https://github.com/AdguardTeam/AdGuardDNS): AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control.
|
||||
- [WatchAD2.0](https://github.com/Qihoo360/WatchAD2.0): WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。
|
||||
- [vanus](https://github.com/vanus-labs/vanus): Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications.
|
||||
|
||||
#### All use cases:
|
||||
|
||||
|
|
17
README_ZH.md
17
README_ZH.md
|
@ -328,21 +328,30 @@ pool.Reboot()
|
|||
|
||||
以下公司/组织在生产环境上使用了 `ants`。
|
||||
|
||||
<a href="https://www.tencent.com"><img src="http://img.taohuawu.club/gallery/tencent_logo.png" width="250" align="middle"/></a> <a href="https://www.bytedance.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/ByteDance_Logo.png" width="250" align="middle"/></a> <a href="https://tieba.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-tieba-logo.png" width="300" align="middle"/></a> <a href="https://www.sina.com.cn/" target="_blank"><img src="http://img.taohuawu.club/gallery/sina-logo.png" width="200" align="middle"/></a> <a href="https://www.163.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/netease-logo.png" width="150" align="middle"/></a> <a href="https://www.tencentmusic.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/tencent-music-logo.png" width="250" align="middle"/></a> <a href="https://www.futuhk.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/futu-logo.png" width="250" align="middle"/></a> <a href="https://www.shopify.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/shopify-logo.png" width="250" align="middle"/></a> <a href="https://weixin.qq.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/wechat-logo.png" width="250" align="middle"/></a><a href="https://www.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-mobile.png" width="250" align="middle"/></a>
|
||||
<a href="https://www.tencent.com"><img src="http://img.taohuawu.club/gallery/tencent_logo.png" width="250" align="middle"/></a> <a href="https://www.bytedance.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/ByteDance_Logo.png" width="250" align="middle"/></a> <a href="https://tieba.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-tieba-logo.png" width="300" align="middle"/></a> <a href="https://www.sina.com.cn/" target="_blank"><img src="http://img.taohuawu.club/gallery/sina-logo.png" width="200" align="middle"/></a> <a href="https://www.163.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/netease-logo.png" width="150" align="middle"/></a> <a href="https://www.tencentmusic.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/tencent-music-logo.png" width="250" align="middle"/></a> <a href="https://www.futuhk.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/futu-logo.png" width="250" align="middle"/></a> <a href="https://www.shopify.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/shopify-logo.png" width="250" align="middle"/></a> <a href="https://weixin.qq.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/wechat-logo.png" width="250" align="middle"/></a><a href="https://www.baidu.com/" target="_blank"><img src="http://img.taohuawu.club/gallery/baidu-mobile.png" width="250" align="middle"/></a> <a href="https://www.360.com" target="_blank"><img src="http://img.taohuawu.club/gallery/360-logo.png" width="250" align="middle"/></a><a href="https://www.huaweicloud.com" target="_blank"><img src="https://res-static.hc-cdn.cn/cloudbu-site/china/zh-cn/wangxue/header/logo.svg" width="250" align="middle"/></a> <a href="https://matrixorigin.cn" target="_blank"><img src="https://matrixorigin.cn/_next/static/media/logo-light-zh.a2a8f3c0.svg" width="250" align="middle"/></a> <a href="https://adguard-dns.io" target="_blank"><img src="https://cdn.adtidy.org/website/images/AdGuardDNS_black.svg" width="250" align="middle"/></a> <a href="https://bk.tencent.com" target="_blank"><img src="https://static.apiseven.com/2022/11/14/6371adab14119.png" width="250" align="middle"/></a>
|
||||
|
||||
### 开源软件
|
||||
|
||||
这些开源项目借助 `ants` 进行并发编程。
|
||||
|
||||
- [gnet](https://github.com/panjf2000/gnet): gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。
|
||||
- [nps](https://github.com/ehang-io/nps): A lightweight, high-performance, powerful intranet penetration proxy server, with a powerful web management terminal.
|
||||
- [milvus](https://github.com/milvus-io/milvus): An open-source vector database for scalable similarity search and AI applications.
|
||||
- [milvus](https://github.com/milvus-io/milvus): 一个高度灵活、可靠且速度极快的云原生开源向量数据库。
|
||||
- [nps](https://github.com/ehang-io/nps): 一款轻量级、高性能、功能强大的内网穿透代理服务器。
|
||||
- [siyuan](https://github.com/siyuan-note/siyuan): 思源笔记是一款本地优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。
|
||||
- [osmedeus](https://github.com/j3ssie/osmedeus): A Workflow Engine for Offensive Security.
|
||||
- [jitsu](https://github.com/jitsucom/jitsu): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days.
|
||||
- [jitsu](https://github.com/jitsucom/jitsu/tree/master): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days.
|
||||
- [triangula](https://github.com/RH12503/triangula): Generate high-quality triangulated and polygonal art from images.
|
||||
- [teler](https://github.com/kitabisa/teler): Real-time HTTP Intrusion Detection.
|
||||
- [bsc](https://github.com/binance-chain/bsc): A Binance Smart Chain client based on the go-ethereum fork.
|
||||
- [jaeles](https://github.com/jaeles-project/jaeles): The Swiss Army knife for automated Web Application Testing.
|
||||
- [devlake](https://github.com/apache/incubator-devlake): The open-source dev data platform & dashboard for your DevOps tools.
|
||||
- [matrixone](https://github.com/matrixorigin/matrixone): MatrixOne 是一款面向未来的超融合异构云原生数据库,通过超融合数据引擎支持事务/分析/流处理等混合工作负载,通过异构云原生架构支持跨机房协同/多地协同/云边协同。简化开发运维,消简数据碎片,打破数据的系统、位置和创新边界。
|
||||
- [bk-bcs](https://github.com/TencentBlueKing/bk-bcs): 蓝鲸容器管理平台(Blueking Container Service)定位于打造云原生技术和业务实际应用场景之间的桥梁;聚焦于复杂应用场景的容器化部署技术方案的研发、整合和产品化;致力于为游戏等复杂应用提供一站式、低门槛的容器编排和服务治理服务。
|
||||
- [trueblocks-core](https://github.com/TrueBlocks/trueblocks-core): TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local.
|
||||
- [openGemini](https://github.com/openGemini/openGemini): openGemini 是华为云开源的一款云原生分布式时序数据库,可广泛应用于物联网、车联网、运维监控、工业互联网等业务场景,具备卓越的读写性能和高效的数据分析能力,采用类SQL查询语言,无第三方软件依赖、安装简单、部署灵活、运维便捷。
|
||||
- [AdGuardDNS](https://github.com/AdguardTeam/AdGuardDNS): AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control.
|
||||
- [WatchAD2.0](https://github.com/Qihoo360/WatchAD2.0): WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。
|
||||
- [vanus](https://github.com/vanus-labs/vanus): Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications.
|
||||
|
||||
#### 所有案例:
|
||||
|
||||
|
|
4
go.mod
4
go.mod
|
@ -3,6 +3,6 @@ module github.com/panjf2000/ants/v2
|
|||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/stretchr/testify v1.8.1
|
||||
golang.org/x/sync v0.1.0
|
||||
github.com/stretchr/testify v1.8.2
|
||||
golang.org/x/sync v0.3.0
|
||||
)
|
||||
|
|
8
go.sum
8
go.sum
|
@ -8,10 +8,10 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
|
|||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
85
pool.go
85
pool.go
|
@ -106,8 +106,8 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
|
|||
staleWorkers[i] = nil
|
||||
}
|
||||
|
||||
// There might be a situation where all workers have been cleaned up(no worker is running),
|
||||
// while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
|
||||
// There might be a situation where all workers have been cleaned up (no worker is running),
|
||||
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
|
||||
if isDormant && p.Waiting() > 0 {
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
|
@ -207,8 +207,6 @@ func NewPool(size int, options ...Option) (*Pool, error) {
|
|||
return p, nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// Submit submits a task to this pool.
|
||||
//
|
||||
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
|
||||
|
@ -219,11 +217,12 @@ func (p *Pool) Submit(task func()) error {
|
|||
if p.IsClosed() {
|
||||
return ErrPoolClosed
|
||||
}
|
||||
if w := p.retrieveWorker(); w != nil {
|
||||
|
||||
w, err := p.retrieveWorker()
|
||||
if w != nil {
|
||||
w.inputFunc(task)
|
||||
return nil
|
||||
}
|
||||
return ErrPoolOverload
|
||||
return err
|
||||
}
|
||||
|
||||
// Running returns the number of workers currently running.
|
||||
|
@ -321,8 +320,6 @@ func (p *Pool) Reboot() {
|
|||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func (p *Pool) addRunning(delta int) {
|
||||
atomic.AddInt32(&p.running, int32(delta))
|
||||
}
|
||||
|
@ -332,52 +329,42 @@ func (p *Pool) addWaiting(delta int) {
|
|||
}
|
||||
|
||||
// retrieveWorker returns an available worker to run the tasks.
|
||||
func (p *Pool) retrieveWorker() (w worker) {
|
||||
spawnWorker := func() {
|
||||
func (p *Pool) retrieveWorker() (w worker, err error) {
|
||||
p.lock.Lock()
|
||||
|
||||
retry:
|
||||
// First try to fetch the worker from the queue.
|
||||
if w = p.workers.detach(); w != nil {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// If the worker queue is empty and we don't run out of the pool capacity,
|
||||
// then just spawn a new worker goroutine.
|
||||
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
|
||||
p.lock.Unlock()
|
||||
w = p.workerCache.Get().(*goWorker)
|
||||
w.run()
|
||||
return
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
w = p.workers.detach()
|
||||
if w != nil { // first try to fetch the worker from the queue
|
||||
p.lock.Unlock()
|
||||
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
|
||||
// if the worker queue is empty and we don't run out of the pool capacity,
|
||||
// then just spawn a new worker goroutine.
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
||||
if p.options.Nonblocking {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
retry:
|
||||
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
p.addWaiting(1)
|
||||
p.cond.Wait() // block and wait for an available worker
|
||||
p.addWaiting(-1)
|
||||
|
||||
if p.IsClosed() {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if w = p.workers.detach(); w == nil {
|
||||
if p.Free() > 0 {
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
return
|
||||
}
|
||||
goto retry
|
||||
}
|
||||
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
|
||||
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
|
||||
p.lock.Unlock()
|
||||
return nil, ErrPoolOverload
|
||||
}
|
||||
return
|
||||
|
||||
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
||||
p.addWaiting(1)
|
||||
p.cond.Wait() // block and wait for an available worker
|
||||
p.addWaiting(-1)
|
||||
|
||||
if p.IsClosed() {
|
||||
p.lock.Unlock()
|
||||
return nil, ErrPoolClosed
|
||||
}
|
||||
|
||||
goto retry
|
||||
}
|
||||
|
||||
// revertWorker puts a worker back into free pool, recycling the goroutines.
|
||||
|
|
85
pool_func.go
85
pool_func.go
|
@ -107,8 +107,8 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
|
|||
staleWorkers[i] = nil
|
||||
}
|
||||
|
||||
// There might be a situation where all workers have been cleaned up(no worker is running),
|
||||
// while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
|
||||
// There might be a situation where all workers have been cleaned up (no worker is running),
|
||||
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
|
||||
if isDormant && p.Waiting() > 0 {
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
|
@ -213,8 +213,6 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
|
|||
return p, nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
// Invoke submits a task to pool.
|
||||
//
|
||||
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
|
||||
|
@ -225,11 +223,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
|
|||
if p.IsClosed() {
|
||||
return ErrPoolClosed
|
||||
}
|
||||
if w := p.retrieveWorker(); w != nil {
|
||||
|
||||
w, err := p.retrieveWorker()
|
||||
if w != nil {
|
||||
w.inputParam(args)
|
||||
return nil
|
||||
}
|
||||
return ErrPoolOverload
|
||||
return err
|
||||
}
|
||||
|
||||
// Running returns the number of workers currently running.
|
||||
|
@ -327,8 +326,6 @@ func (p *PoolWithFunc) Reboot() {
|
|||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
func (p *PoolWithFunc) addRunning(delta int) {
|
||||
atomic.AddInt32(&p.running, int32(delta))
|
||||
}
|
||||
|
@ -338,52 +335,42 @@ func (p *PoolWithFunc) addWaiting(delta int) {
|
|||
}
|
||||
|
||||
// retrieveWorker returns an available worker to run the tasks.
|
||||
func (p *PoolWithFunc) retrieveWorker() (w worker) {
|
||||
spawnWorker := func() {
|
||||
func (p *PoolWithFunc) retrieveWorker() (w worker, err error) {
|
||||
p.lock.Lock()
|
||||
|
||||
retry:
|
||||
// First try to fetch the worker from the queue.
|
||||
if w = p.workers.detach(); w != nil {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// If the worker queue is empty and we don't run out of the pool capacity,
|
||||
// then just spawn a new worker goroutine.
|
||||
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
|
||||
p.lock.Unlock()
|
||||
w = p.workerCache.Get().(*goWorkerWithFunc)
|
||||
w.run()
|
||||
return
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
w = p.workers.detach()
|
||||
if w != nil { // first try to fetch the worker from the queue
|
||||
p.lock.Unlock()
|
||||
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
|
||||
// if the worker queue is empty and we don't run out of the pool capacity,
|
||||
// then just spawn a new worker goroutine.
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
||||
if p.options.Nonblocking {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
retry:
|
||||
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
p.addWaiting(1)
|
||||
p.cond.Wait() // block and wait for an available worker
|
||||
p.addWaiting(-1)
|
||||
|
||||
if p.IsClosed() {
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if w = p.workers.detach(); w == nil {
|
||||
if p.Free() > 0 {
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
return
|
||||
}
|
||||
goto retry
|
||||
}
|
||||
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
|
||||
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
|
||||
p.lock.Unlock()
|
||||
return nil, ErrPoolOverload
|
||||
}
|
||||
return
|
||||
|
||||
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
||||
p.addWaiting(1)
|
||||
p.cond.Wait() // block and wait for an available worker
|
||||
p.addWaiting(-1)
|
||||
|
||||
if p.IsClosed() {
|
||||
p.lock.Unlock()
|
||||
return nil, ErrPoolClosed
|
||||
}
|
||||
|
||||
goto retry
|
||||
}
|
||||
|
||||
// revertWorker puts a worker back into free pool, recycling the goroutines.
|
||||
|
|
Loading…
Reference in New Issue