Go 语言的 goroutine 池


[英文](README.md) | 中文 ## 📖 简介 `ants`是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。 ## 🚀 功能: - 自动调度海量的 goroutines,复用 goroutines - 定期清理过期的 goroutines,进一步节省资源 - 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool - 优雅处理 panic,防止程序崩溃 - 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有[更高的性能](#-性能小结) - 非阻塞机制 ## 💡 `ants` 是如何运行的 ### 流程图

ants-flowchart-cn

### 动态图 ![](https://raw.githubusercontent.com/panjf2000/illustrations/master/go/ants-pool-1.png) ![](https://raw.githubusercontent.com/panjf2000/illustrations/master/go/ants-pool-2.png) ![](https://raw.githubusercontent.com/panjf2000/illustrations/master/go/ants-pool-3.png) ![](https://raw.githubusercontent.com/panjf2000/illustrations/master/go/ants-pool-4.png) ## 🧰 安装 ### 使用 `ants` v1 版本: ``` powershell go get -u github.com/panjf2000/ants ``` ### 使用 `ants` v2 版本 (开启 GO111MODULE=on): ```powershell go get -u github.com/panjf2000/ants/v2 ``` ## 🛠 使用 写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 `ants`,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能: ``` go package main import ( "fmt" "sync" "sync/atomic" "time" "github.com/panjf2000/ants/v2" ) var sum int32 func myFunc(i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) fmt.Printf("run with %d\n", n) } func demoFunc() { time.Sleep(10 * time.Millisecond) fmt.Println("Hello World!") } func main() { defer ants.Release() runTimes := 1000 // Use the common pool. var wg sync.WaitGroup syncCalculateSum := func() { demoFunc() wg.Done() } for i := 0; i < runTimes; i++ { wg.Add(1) _ = ants.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") // Use the pool with a function, // set 10 to the capacity of goroutine pool and 1 second for expired duration. p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) wg.Done() }) defer p.Release() // Submit tasks one by one. for i := 0; i < runTimes; i++ { wg.Add(1) _ = p.Invoke(int32(i)) } wg.Wait() fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Printf("finish all tasks, result is %d\n", sum) if sum != 499500 { panic("the final result is wrong!!!") } // Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited. // If you use -1 as the pool size parameter, the size will be unlimited. // There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks. mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin) defer mp.ReleaseTimeout(5 * time.Second) for i := 0; i < runTimes; i++ { wg.Add(1) _ = mp.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("running goroutines: %d\n", mp.Running()) fmt.Printf("finish all tasks.\n") // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10). mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) { myFunc(i) wg.Done() }, ants.LeastTasks) defer mpf.ReleaseTimeout(5 * time.Second) for i := 0; i < runTimes; i++ { wg.Add(1) _ = mpf.Invoke(int32(i)) } wg.Wait() fmt.Printf("running goroutines: %d\n", mpf.Running()) fmt.Printf("finish all tasks, result is %d\n", sum) if sum != 499500*2 { panic("the final result is wrong!!!") } } ``` ### Pool 配置 ```go // Option represents the optional function. type Option func(opts *Options) // Options contains all options which will be applied when instantiating a ants pool. type Options struct { // ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers, // the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been // used for more than `ExpiryDuration`. ExpiryDuration time.Duration // PreAlloc indicates whether to make memory pre-allocation when initializing Pool. PreAlloc bool // Max number of goroutine blocking on pool.Submit. // 0 (default value) means no such limit. MaxBlockingTasks int // When Nonblocking is true, Pool.Submit will never be blocked. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. // When Nonblocking is true, MaxBlockingTasks is inoperative. Nonblocking bool // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) // Logger is the customized logger for logging info, if it is not set, // default standard logger from log package is used. Logger Logger } // WithOptions accepts the whole options config. func WithOptions(options Options) Option { return func(opts *Options) { *opts = options } } // WithExpiryDuration sets up the interval time of cleaning up goroutines. func WithExpiryDuration(expiryDuration time.Duration) Option { return func(opts *Options) { opts.ExpiryDuration = expiryDuration } } // WithPreAlloc indicates whether it should malloc for workers. func WithPreAlloc(preAlloc bool) Option { return func(opts *Options) { opts.PreAlloc = preAlloc } } // WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool. func WithMaxBlockingTasks(maxBlockingTasks int) Option { return func(opts *Options) { opts.MaxBlockingTasks = maxBlockingTasks } } // WithNonblocking indicates that pool will return nil when there is no available workers. func WithNonblocking(nonblocking bool) Option { return func(opts *Options) { opts.Nonblocking = nonblocking } } // WithPanicHandler sets up panic handler. func WithPanicHandler(panicHandler func(interface{})) Option { return func(opts *Options) { opts.PanicHandler = panicHandler } } // WithLogger sets up a customized logger. func WithLogger(logger Logger) Option { return func(opts *Options) { opts.Logger = logger } } ``` 通过在调用`NewPool`/`NewPoolWithFunc`之时使用各种 optional function,可以设置`ants.Options`中各个配置项的值,然后用它来定制化 goroutine pool. ### 自定义池 `ants`支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 Pool ,如下: ``` go p, _ := ants.NewPool(10000) ``` ### 任务提交 提交任务通过调用 `ants.Submit(func())`方法: ```go ants.Submit(func(){}) ``` ### 动态调整 goroutine 池容量 需要动态调整 goroutine 池容量可以通过调用`Tune(int)`: ``` go pool.Tune(1000) // Tune its capacity to 1000 pool.Tune(100000) // Tune its capacity to 100000 ``` 该方法是线程安全的。 ### 预先分配 goroutine 队列内存 `ants`允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。 ```go // ants will pre-malloc the whole capacity of pool when you invoke this function p, _ := ants.NewPool(100000, ants.WithPreAlloc(true)) ``` ### 释放 Pool ```go pool.Release() ``` ### 重启 Pool ```go // 只要调用 Reboot() 方法,就可以重新激活一个之前已经被销毁掉的池,并且投入使用。 pool.Reboot() ``` ## ⚙️ 关于任务执行顺序 `ants` 并不保证提交的任务被执行的顺序,执行的顺序也不是和提交的顺序保持一致,因为在 `ants` 是并发地处理所有提交的任务,提交的任务会被分派到正在并发运行的 workers 上去,因此那些任务将会被并发且无序地被执行。 ## 👏 贡献者 请在提 PR 之前仔细阅读 [Contributing Guidelines](CONTRIBUTING.md),感谢那些为 `ants` 贡献过代码的开发者! ## 📄 证书 `ants` 的源码允许用户在遵循 [MIT 开源证书](/LICENSE) 规则的前提下使用。 ## 📚 相关文章 - [Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) - [Visually Understanding Worker Pool](https://medium.com/coinmonks/visually-understanding-worker-pool-48a83b7fc1f5) - [The Case For A Go Worker Pool](https://brandur.org/go-worker-pool) - [Go Concurrency - GoRoutines, Worker Pools and Throttling Made Simple](https://twin.sh/articles/39/go-concurrency-goroutines-worker-pools-and-throttling-made-simple) ## 🖥 用户案例 ### 商业公司 以下公司/组织在生产环境上使用了 `ants`。
### 开源软件 这些开源项目借助 `ants` 进行并发编程。 - [gnet](https://github.com/panjf2000/gnet): gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。 - [milvus](https://github.com/milvus-io/milvus): 一个高度灵活、可靠且速度极快的云原生开源向量数据库。 - [nps](https://github.com/ehang-io/nps): 一款轻量级、高性能、功能强大的内网穿透代理服务器。 - [siyuan](https://github.com/siyuan-note/siyuan): 思源笔记是一款本地优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。 - [osmedeus](https://github.com/j3ssie/osmedeus): A Workflow Engine for Offensive Security. - [jitsu](https://github.com/jitsucom/jitsu/tree/master): An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days. - [triangula](https://github.com/RH12503/triangula): Generate high-quality triangulated and polygonal art from images. - [teler](https://github.com/kitabisa/teler): Real-time HTTP Intrusion Detection. - [bsc](https://github.com/binance-chain/bsc): A Binance Smart Chain client based on the go-ethereum fork. - [jaeles](https://github.com/jaeles-project/jaeles): The Swiss Army knife for automated Web Application Testing. - [devlake](https://github.com/apache/incubator-devlake): The open-source dev data platform & dashboard for your DevOps tools. - [matrixone](https://github.com/matrixorigin/matrixone): MatrixOne 是一款面向未来的超融合异构云原生数据库,通过超融合数据引擎支持事务/分析/流处理等混合工作负载,通过异构云原生架构支持跨机房协同/多地协同/云边协同。简化开发运维,消简数据碎片,打破数据的系统、位置和创新边界。 - [bk-bcs](https://github.com/TencentBlueKing/bk-bcs): 蓝鲸容器管理平台(Blueking Container Service)定位于打造云原生技术和业务实际应用场景之间的桥梁;聚焦于复杂应用场景的容器化部署技术方案的研发、整合和产品化;致力于为游戏等复杂应用提供一站式、低门槛的容器编排和服务治理服务。 - [trueblocks-core](https://github.com/TrueBlocks/trueblocks-core): TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local. - [openGemini](https://github.com/openGemini/openGemini): openGemini 是华为云开源的一款云原生分布式时序数据库,可广泛应用于物联网、车联网、运维监控、工业互联网等业务场景,具备卓越的读写性能和高效的数据分析能力,采用类SQL查询语言,无第三方软件依赖、安装简单、部署灵活、运维便捷。 - [AdGuardDNS](https://github.com/AdguardTeam/AdGuardDNS): AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control. - [WatchAD2.0](https://github.com/Qihoo360/WatchAD2.0): WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。 - [vanus](https://github.com/vanus-labs/vanus): Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications. - [trpc-go](https://github.com/trpc-group/trpc-go): 一个 Go 实现的可插拔的高性能 RPC 框架。 - [motan-go](https://github.com/weibocom/motan-go): 一套高性能、易于使用的分布式远程服务调用(RPC)框架。motan-go 是 motan 的 Go 语言实现。 #### 所有案例: - [Repositories that depend on ants/v2](https://github.com/panjf2000/ants/network/dependents?package_id=UGFja2FnZS0yMjY2ODgxMjg2) - [Repositories that depend on ants/v1](https://github.com/panjf2000/ants/network/dependents?package_id=UGFja2FnZS0yMjY0ODMzNjEw) 如果你的项目也在使用 `ants`,欢迎给我提 Pull Request 来更新这份用户案例列表。 ## 🔋 JetBrains 开源证书支持 `ants` 项目一直以来都是在 JetBrains 公司旗下的 GoLand 集成开发环境中进行开发,基于 **free JetBrains Open Source license(s)** 正版免费授权,在此表达我的谢意。 ## 💰 支持 如果有意向,可以通过每个月定量的少许捐赠来支持这个项目。 ## 💎 赞助 每月定量捐赠 10 刀即可成为本项目的赞助者,届时您的 logo 或者 link 可以展示在本项目的 README 上。 ## ☕️ 打赏 > 当您通过以下方式进行捐赠时,请务必留下姓名、GitHub 账号或其他社交媒体账号,以便我将其添加到捐赠者名单中,以表谢意。          ## 资助者
Patrick Othmer Jimmy ChenZhen Mai Yang 王开帅 Unger Alejandro Weng Wei
## 🔋 赞助商