22 KiB
英文 | 中文
📖 简介
ants
是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
🚀 功能:
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量实用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool 等
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下甚至可能比原生 goroutine 并发具有更高的性能
- 非阻塞机制
- 预分配内存 (环形队列,可选)
💡 ants
是如何运行的
流程图
动态图
🧰 安装
使用 ants
v1 版本:
go get -u github.com/panjf2000/ants
使用 ants
v2 版本 (开启 GO111MODULE=on):
go get -u github.com/panjf2000/ants/v2
🛠 使用
写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 ants
,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
// Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
// If you use -1 as the pool size parameter, the size will be unlimited.
// There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
defer mp.ReleaseTimeout(5 * time.Second)
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = mp.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", mp.Running())
fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
myFunc(i)
wg.Done()
}, ants.LeastTasks)
defer mpf.ReleaseTimeout(5 * time.Second)
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = mpf.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", mpf.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500*2 {
panic("the final result is wrong!!!")
}
}
Pool 配置
// Option represents the optional function.
type Option func(opts *Options)
// Options contains all options which will be applied when instantiating a ants pool.
type Options struct {
// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
// used for more than `ExpiryDuration`.
ExpiryDuration time.Duration
// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
Logger Logger
}
// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
// WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
// WithPreAlloc indicates whether it should malloc for workers.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
// WithNonblocking indicates that pool will return nil when there is no available workers.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
// WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
// WithLogger sets up a customized logger.
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
通过在调用NewPool
/NewPoolWithFunc
之时使用各种 optional function,可以设置ants.Options
中各个配置项的值,然后用它来定制化 goroutine pool.
自定义池
ants
支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 NewPool
方法可以实例化一个新的带有指定容量的 Pool ,如下:
p, _ := ants.NewPool(10000)
任务提交
提交任务通过调用 ants.Submit(func())
方法:
ants.Submit(func(){})
动态调整 goroutine 池容量
需要动态调整 goroutine 池容量可以通过调用Tune(int)
:
pool.Tune(1000) // Tune its capacity to 1000
pool.Tune(100000) // Tune its capacity to 100000
该方法是线程安全的。
预先分配 goroutine 队列内存
ants
允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。
// ants will pre-malloc the whole capacity of pool when you invoke this function
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
释放 Pool
pool.Release()
重启 Pool
// 只要调用 Reboot() 方法,就可以重新激活一个之前已经被销毁掉的池,并且投入使用。
pool.Reboot()
⚙️ 关于任务执行顺序
ants
并不保证提交的任务被执行的顺序,执行的顺序也不是和提交的顺序保持一致,因为在 ants
是并发地处理所有提交的任务,提交的任务会被分派到正在并发运行的 workers 上去,因此那些任务将会被并发且无序地被执行。
👏 贡献者
请在提 PR 之前仔细阅读 Contributing Guidelines,感谢那些为 ants
贡献过代码的开发者!
📄 证书
ants
的源码允许用户在遵循 MIT 开源证书 规则的前提下使用。
📚 相关文章
- Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池
- Visually Understanding Worker Pool
- The Case For A Go Worker Pool
- Go Concurrency - GoRoutines, Worker Pools and Throttling Made Simple
🖥 用户案例
商业公司
以下公司/组织在生产环境上使用了 ants
。
如果你也正在生产环境上使用 ants
,欢迎提 PR 来丰富这份列表。
开源软件
这些开源项目借助 ants
进行并发编程。
- gnet: gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。
- milvus: 一个高度灵活、可靠且速度极快的云原生开源向量数据库。
- nps: 一款轻量级、高性能、功能强大的内网穿透代理服务器。
- TDengine: TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。
- siyuan: 思源笔记是一款本地优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。
- osmedeus: A Workflow Engine for Offensive Security.
- jitsu: An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days.
- triangula: Generate high-quality triangulated and polygonal art from images.
- teler: Real-time HTTP Intrusion Detection.
- bsc: A Binance Smart Chain client based on the go-ethereum fork.
- jaeles: The Swiss Army knife for automated Web Application Testing.
- devlake: The open-source dev data platform & dashboard for your DevOps tools.
- matrixone: MatrixOne 是一款面向未来的超融合异构云原生数据库,通过超融合数据引擎支持事务/分析/流处理等混合工作负载,通过异构云原生架构支持跨机房协同/多地协同/云边协同。简化开发运维,消简数据碎片,打破数据的系统、位置和创新边界。
- bk-bcs: 蓝鲸容器管理平台(Blueking Container Service)定位于打造云原生技术和业务实际应用场景之间的桥梁;聚焦于复杂应用场景的容器化部署技术方案的研发、整合和产品化;致力于为游戏等复杂应用提供一站式、低门槛的容器编排和服务治理服务。
- trueblocks-core: TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local.
- openGemini: openGemini 是华为云开源的一款云原生分布式时序数据库,可广泛应用于物联网、车联网、运维监控、工业互联网等业务场景,具备卓越的读写性能和高效的数据分析能力,采用类SQL查询语言,无第三方软件依赖、安装简单、部署灵活、运维便捷。
- AdGuardDNS: AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control.
- WatchAD2.0: WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。
- vanus: Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications.
- trpc-go: 一个 Go 实现的可插拔的高性能 RPC 框架。
- motan-go: Motan 是一套高性能、易于使用的分布式远程服务调用 (RPC) 框架。motan-go 是 motan 的 Go 语言实现。
所有案例:
如果你的项目也在使用 ants
,欢迎给我提 Pull Request 来更新这份用户案例列表。
🔋 JetBrains 开源证书支持
ants
项目一直以来都是在 JetBrains 公司旗下的 GoLand 集成开发环境中进行开发,基于 free JetBrains Open Source license(s) 正版免费授权,在此表达我的谢意。
💰 支持
如果有意向,可以通过每个月定量的少许捐赠来支持这个项目。
💎 赞助
每月定量捐赠 10 刀即可成为本项目的赞助者,届时您的 logo 或者 link 可以展示在本项目的 README 上。
☕️ 打赏
当您通过以下方式进行捐赠时,请务必留下姓名、GitHub 账号或其他社交媒体账号,以便我将其添加到捐赠者名单中,以表谢意。