diff --git a/.github/release-drafter.yml b/.github/release-drafter.yml index 7896e67..78f55b9 100644 --- a/.github/release-drafter.yml +++ b/.github/release-drafter.yml @@ -44,6 +44,7 @@ autolabeler: title: - /fix/i - /bug/i + - /patch/i - label: docs files: - '*.md' @@ -65,11 +66,27 @@ autolabeler: - /feature/i - /implement/i - /add/i + - /minor/i - label: dependencies title: - /dependencies/i - /upgrade/i - /bump up/i + - label: chores + title: + - /chore/i + - /\bmisc\b/i + - /cleanup/i + - /clean up/i + - label: major + title: + - /major:/i + - label: minor + title: + - /minor:/i + - label: patch + title: + - /patch:/i template: | ## Changelogs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d0481a1..33ff07a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -54,7 +54,7 @@ jobs: strategy: fail-fast: false matrix: - go: [1.13, 1.19] + go: [1.13, 1.21] os: [ubuntu-latest, macos-latest, windows-latest] name: Go ${{ matrix.go }} @ ${{ matrix.os }} runs-on: ${{ matrix.os}} @@ -92,7 +92,7 @@ jobs: restore-keys: | ${{ runner.os }}-${{ matrix.go }}-go-ci - - name: Run unit tests + - name: Run unit tests and integrated tests run: go test -v -race -coverprofile="codecov.report" -covermode=atomic - name: Upload code coverage report to Codecov diff --git a/README.md b/README.md index ef64416..bacefbd 100644 --- a/README.md +++ b/README.md @@ -327,21 +327,30 @@ The source code in `ants` is available under the [MIT License](/LICENSE). The following companies/organizations use `ants` in production. -                 +                         ### open-source software +The open-source projects below do concurrent programming with the help of `ants`. + - [gnet](https://github.com/panjf2000/gnet): A high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go. -- [nps](https://github.com/ehang-io/nps): A lightweight, high-performance, powerful intranet penetration proxy server, with a powerful web management terminal. - [milvus](https://github.com/milvus-io/milvus): An open-source vector database for scalable similarity search and AI applications. +- [nps](https://github.com/ehang-io/nps): A lightweight, high-performance, powerful intranet penetration proxy server, with a powerful web management terminal. - [siyuan](https://github.com/siyuan-note/siyuan): SiYuan is a local-first personal knowledge management system that supports complete offline use, as well as end-to-end encrypted synchronization. - [osmedeus](https://github.com/j3ssie/osmedeus): A Workflow Engine for Offensive Security. -- [jitsu](https://github.com/jitsucom/jitsu): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days. +- [jitsu](https://github.com/jitsucom/jitsu/tree/master): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days. - [triangula](https://github.com/RH12503/triangula): Generate high-quality triangulated and polygonal art from images. - [teler](https://github.com/kitabisa/teler): Real-time HTTP Intrusion Detection. - [bsc](https://github.com/binance-chain/bsc): A Binance Smart Chain client based on the go-ethereum fork. - [jaeles](https://github.com/jaeles-project/jaeles): The Swiss Army knife for automated Web Application Testing. - [devlake](https://github.com/apache/incubator-devlake): The open-source dev data platform & dashboard for your DevOps tools. +- [matrixone](https://github.com/matrixorigin/matrixone): MatrixOne is a future-oriented hyper-converged cloud and edge native DBMS that supports transactional, analytical, and streaming workloads with a simplified and distributed database engine, across multiple data centers, clouds, edges and other heterogeneous infrastructures. +- [bk-bcs](https://github.com/TencentBlueKing/bk-bcs): BlueKing Container Service (BCS, same below) is a container management and orchestration platform for the micro-services under the BlueKing ecosystem. +- [trueblocks-core](https://github.com/TrueBlocks/trueblocks-core): TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local. +- [openGemini](https://github.com/openGemini/openGemini): openGemini is an open-source,cloud-native time-series database(TSDB) that can be widely used in IoT, Internet of Vehicles(IoV), O&M monitoring, and industrial Internet scenarios. +- [AdGuardDNS](https://github.com/AdguardTeam/AdGuardDNS): AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control. +- [WatchAD2.0](https://github.com/Qihoo360/WatchAD2.0): WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。 +- [vanus](https://github.com/vanus-labs/vanus): Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications. #### All use cases: diff --git a/README_ZH.md b/README_ZH.md index 57d7292..d3bb007 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -328,21 +328,30 @@ pool.Reboot() 以下公司/组织在生产环境上使用了 `ants`。 -                 +                         ### 开源软件 +这些开源项目借助 `ants` 进行并发编程。 + - [gnet](https://github.com/panjf2000/gnet): gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。 -- [nps](https://github.com/ehang-io/nps): A lightweight, high-performance, powerful intranet penetration proxy server, with a powerful web management terminal. -- [milvus](https://github.com/milvus-io/milvus): An open-source vector database for scalable similarity search and AI applications. +- [milvus](https://github.com/milvus-io/milvus): 一个高度灵活、可靠且速度极快的云原生开源向量数据库。 +- [nps](https://github.com/ehang-io/nps): 一款轻量级、高性能、功能强大的内网穿透代理服务器。 - [siyuan](https://github.com/siyuan-note/siyuan): 思源笔记是一款本地优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。 - [osmedeus](https://github.com/j3ssie/osmedeus): A Workflow Engine for Offensive Security. -- [jitsu](https://github.com/jitsucom/jitsu): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days. +- [jitsu](https://github.com/jitsucom/jitsu/tree/master): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days. - [triangula](https://github.com/RH12503/triangula): Generate high-quality triangulated and polygonal art from images. - [teler](https://github.com/kitabisa/teler): Real-time HTTP Intrusion Detection. - [bsc](https://github.com/binance-chain/bsc): A Binance Smart Chain client based on the go-ethereum fork. - [jaeles](https://github.com/jaeles-project/jaeles): The Swiss Army knife for automated Web Application Testing. - [devlake](https://github.com/apache/incubator-devlake): The open-source dev data platform & dashboard for your DevOps tools. +- [matrixone](https://github.com/matrixorigin/matrixone): MatrixOne 是一款面向未来的超融合异构云原生数据库,通过超融合数据引擎支持事务/分析/流处理等混合工作负载,通过异构云原生架构支持跨机房协同/多地协同/云边协同。简化开发运维,消简数据碎片,打破数据的系统、位置和创新边界。 +- [bk-bcs](https://github.com/TencentBlueKing/bk-bcs): 蓝鲸容器管理平台(Blueking Container Service)定位于打造云原生技术和业务实际应用场景之间的桥梁;聚焦于复杂应用场景的容器化部署技术方案的研发、整合和产品化;致力于为游戏等复杂应用提供一站式、低门槛的容器编排和服务治理服务。 +- [trueblocks-core](https://github.com/TrueBlocks/trueblocks-core): TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local. +- [openGemini](https://github.com/openGemini/openGemini): openGemini 是华为云开源的一款云原生分布式时序数据库,可广泛应用于物联网、车联网、运维监控、工业互联网等业务场景,具备卓越的读写性能和高效的数据分析能力,采用类SQL查询语言,无第三方软件依赖、安装简单、部署灵活、运维便捷。 +- [AdGuardDNS](https://github.com/AdguardTeam/AdGuardDNS): AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control. +- [WatchAD2.0](https://github.com/Qihoo360/WatchAD2.0): WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。 +- [vanus](https://github.com/vanus-labs/vanus): Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications. #### 所有案例: diff --git a/go.mod b/go.mod index e307bfb..af3906b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/panjf2000/ants/v2 go 1.13 require ( - github.com/stretchr/testify v1.8.1 - golang.org/x/sync v0.1.0 + github.com/stretchr/testify v1.8.2 + golang.org/x/sync v0.3.0 ) diff --git a/go.sum b/go.sum index 96e82f8..c915667 100644 --- a/go.sum +++ b/go.sum @@ -8,10 +8,10 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pool.go b/pool.go index b0481bb..497d4cf 100644 --- a/pool.go +++ b/pool.go @@ -106,8 +106,8 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { staleWorkers[i] = nil } - // There might be a situation where all workers have been cleaned up(no worker is running), - // while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers. + // There might be a situation where all workers have been cleaned up (no worker is running), + // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers. if isDormant && p.Waiting() > 0 { p.cond.Broadcast() } @@ -207,8 +207,6 @@ func NewPool(size int, options ...Option) (*Pool, error) { return p, nil } -// --------------------------------------------------------------------------- - // Submit submits a task to this pool. // // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), @@ -219,11 +217,12 @@ func (p *Pool) Submit(task func()) error { if p.IsClosed() { return ErrPoolClosed } - if w := p.retrieveWorker(); w != nil { + + w, err := p.retrieveWorker() + if w != nil { w.inputFunc(task) - return nil } - return ErrPoolOverload + return err } // Running returns the number of workers currently running. @@ -321,8 +320,6 @@ func (p *Pool) Reboot() { } } -// --------------------------------------------------------------------------- - func (p *Pool) addRunning(delta int) { atomic.AddInt32(&p.running, int32(delta)) } @@ -332,52 +329,42 @@ func (p *Pool) addWaiting(delta int) { } // retrieveWorker returns an available worker to run the tasks. -func (p *Pool) retrieveWorker() (w worker) { - spawnWorker := func() { +func (p *Pool) retrieveWorker() (w worker, err error) { + p.lock.Lock() + +retry: + // First try to fetch the worker from the queue. + if w = p.workers.detach(); w != nil { + p.lock.Unlock() + return + } + + // If the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + p.lock.Unlock() w = p.workerCache.Get().(*goWorker) w.run() + return } - p.lock.Lock() - w = p.workers.detach() - if w != nil { // first try to fetch the worker from the queue - p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - // if the worker queue is empty and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. - p.lock.Unlock() - spawnWorker() - } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - retry: - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { - p.lock.Unlock() - return - } - - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) - - if p.IsClosed() { - p.lock.Unlock() - return - } - - if w = p.workers.detach(); w == nil { - if p.Free() > 0 { - p.lock.Unlock() - spawnWorker() - return - } - goto retry - } + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() + return nil, ErrPoolOverload } - return + + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return nil, ErrPoolClosed + } + + goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines. diff --git a/pool_func.go b/pool_func.go index 69d3c86..413f187 100644 --- a/pool_func.go +++ b/pool_func.go @@ -107,8 +107,8 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { staleWorkers[i] = nil } - // There might be a situation where all workers have been cleaned up(no worker is running), - // while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers. + // There might be a situation where all workers have been cleaned up (no worker is running), + // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers. if isDormant && p.Waiting() > 0 { p.cond.Broadcast() } @@ -213,8 +213,6 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi return p, nil } -//--------------------------------------------------------------------------- - // Invoke submits a task to pool. // // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), @@ -225,11 +223,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error { if p.IsClosed() { return ErrPoolClosed } - if w := p.retrieveWorker(); w != nil { + + w, err := p.retrieveWorker() + if w != nil { w.inputParam(args) - return nil } - return ErrPoolOverload + return err } // Running returns the number of workers currently running. @@ -327,8 +326,6 @@ func (p *PoolWithFunc) Reboot() { } } -//--------------------------------------------------------------------------- - func (p *PoolWithFunc) addRunning(delta int) { atomic.AddInt32(&p.running, int32(delta)) } @@ -338,52 +335,42 @@ func (p *PoolWithFunc) addWaiting(delta int) { } // retrieveWorker returns an available worker to run the tasks. -func (p *PoolWithFunc) retrieveWorker() (w worker) { - spawnWorker := func() { +func (p *PoolWithFunc) retrieveWorker() (w worker, err error) { + p.lock.Lock() + +retry: + // First try to fetch the worker from the queue. + if w = p.workers.detach(); w != nil { + p.lock.Unlock() + return + } + + // If the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + p.lock.Unlock() w = p.workerCache.Get().(*goWorkerWithFunc) w.run() + return } - p.lock.Lock() - w = p.workers.detach() - if w != nil { // first try to fetch the worker from the queue - p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - // if the worker queue is empty and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. - p.lock.Unlock() - spawnWorker() - } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - retry: - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { - p.lock.Unlock() - return - } - - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) - - if p.IsClosed() { - p.lock.Unlock() - return - } - - if w = p.workers.detach(); w == nil { - if p.Free() > 0 { - p.lock.Unlock() - spawnWorker() - return - } - goto retry - } + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() + return nil, ErrPoolOverload } - return + + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return nil, ErrPoolClosed + } + + goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines.