forked from mirror/ants
Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
ed92fb6247
|
@ -9,6 +9,8 @@ on:
|
||||||
branches:
|
branches:
|
||||||
- master
|
- master
|
||||||
- dev
|
- dev
|
||||||
|
paths-ignore:
|
||||||
|
- '**.md'
|
||||||
schedule:
|
schedule:
|
||||||
# ┌───────────── minute (0 - 59)
|
# ┌───────────── minute (0 - 59)
|
||||||
# │ ┌───────────── hour (0 - 23)
|
# │ ┌───────────── hour (0 - 23)
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
|
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
|
||||||
<b>A goroutine pool for Go</b>
|
<b>A goroutine pool for Go</b>
|
||||||
<br/><br/>
|
<br/><br/>
|
||||||
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/workflow/status/panjf2000/ants/Tests?style=flat-square&logo=github-actions" /></a>
|
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/ants/test.yml?branch=master&style=flat-square&logo=github-actions" /></a>
|
||||||
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
|
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
|
||||||
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
|
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
|
||||||
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>
|
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
|
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
|
||||||
<b>Go 语言的 goroutine 池</b>
|
<b>Go 语言的 goroutine 池</b>
|
||||||
<br/><br/>
|
<br/><br/>
|
||||||
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/workflow/status/panjf2000/ants/Tests?style=flat-square&logo=github-actions" /></a>
|
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/ants/test.yml?branch=master&style=flat-square&logo=github-actions" /></a>
|
||||||
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
|
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
|
||||||
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
|
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
|
||||||
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>
|
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>
|
||||||
|
|
87
pool.go
87
pool.go
|
@ -59,8 +59,11 @@ type Pool struct {
|
||||||
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
|
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
|
||||||
waiting int32
|
waiting int32
|
||||||
|
|
||||||
heartbeatDone int32
|
purgeDone int32
|
||||||
stopHeartbeat context.CancelFunc
|
stopPurge context.CancelFunc
|
||||||
|
|
||||||
|
ticktockDone int32
|
||||||
|
stopTicktock context.CancelFunc
|
||||||
|
|
||||||
now atomic.Value
|
now atomic.Value
|
||||||
|
|
||||||
|
@ -69,18 +72,18 @@ type Pool struct {
|
||||||
|
|
||||||
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
|
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
|
||||||
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
|
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
|
||||||
heartbeat := time.NewTicker(p.options.ExpiryDuration)
|
ticker := time.NewTicker(p.options.ExpiryDuration)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
heartbeat.Stop()
|
ticker.Stop()
|
||||||
atomic.StoreInt32(&p.heartbeatDone, 1)
|
atomic.StoreInt32(&p.purgeDone, 1)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-heartbeat.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
|
@ -111,15 +114,46 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ticktock is a goroutine that updates the current time in the pool regularly.
|
// ticktock is a goroutine that updates the current time in the pool regularly.
|
||||||
func (p *Pool) ticktock() {
|
func (p *Pool) ticktock(ctx context.Context) {
|
||||||
ticker := time.NewTicker(nowTimeUpdateInterval)
|
ticker := time.NewTicker(nowTimeUpdateInterval)
|
||||||
defer ticker.Stop()
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
atomic.StoreInt32(&p.ticktockDone, 1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.IsClosed() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
for range ticker.C {
|
|
||||||
p.now.Store(time.Now())
|
p.now.Store(time.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Pool) goPurge() {
|
||||||
|
if p.options.DisablePurge {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a goroutine to clean up expired workers periodically.
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, p.stopPurge = context.WithCancel(context.Background())
|
||||||
|
go p.purgeStaleWorkers(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) goTicktock() {
|
||||||
|
p.now.Store(time.Now())
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, p.stopTicktock = context.WithCancel(context.Background())
|
||||||
|
go p.ticktock(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Pool) nowTime() time.Time {
|
func (p *Pool) nowTime() time.Time {
|
||||||
return p.now.Load().(time.Time)
|
return p.now.Load().(time.Time)
|
||||||
}
|
}
|
||||||
|
@ -166,15 +200,8 @@ func NewPool(size int, options ...Option) (*Pool, error) {
|
||||||
|
|
||||||
p.cond = sync.NewCond(p.lock)
|
p.cond = sync.NewCond(p.lock)
|
||||||
|
|
||||||
// Start a goroutine to clean up expired workers periodically.
|
p.goPurge()
|
||||||
var ctx context.Context
|
p.goTicktock()
|
||||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
|
||||||
if !p.options.DisablePurge {
|
|
||||||
go p.purgeStaleWorkers(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.now.Store(time.Now())
|
|
||||||
go p.ticktock()
|
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
@ -259,17 +286,23 @@ func (p *Pool) Release() {
|
||||||
|
|
||||||
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
|
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
|
||||||
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
|
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
|
||||||
if p.IsClosed() || p.stopHeartbeat == nil {
|
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
p.stopHeartbeat()
|
if p.stopPurge != nil {
|
||||||
p.stopHeartbeat = nil
|
p.stopPurge()
|
||||||
|
p.stopPurge = nil
|
||||||
|
}
|
||||||
|
p.stopTicktock()
|
||||||
|
p.stopTicktock = nil
|
||||||
p.Release()
|
p.Release()
|
||||||
|
|
||||||
endTime := time.Now().Add(timeout)
|
endTime := time.Now().Add(timeout)
|
||||||
for time.Now().Before(endTime) {
|
for time.Now().Before(endTime) {
|
||||||
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
|
if p.Running() == 0 &&
|
||||||
|
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
|
||||||
|
atomic.LoadInt32(&p.ticktockDone) == 1 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -280,12 +313,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
|
||||||
// Reboot reboots a closed pool.
|
// Reboot reboots a closed pool.
|
||||||
func (p *Pool) Reboot() {
|
func (p *Pool) Reboot() {
|
||||||
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
|
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
|
||||||
atomic.StoreInt32(&p.heartbeatDone, 0)
|
atomic.StoreInt32(&p.purgeDone, 0)
|
||||||
var ctx context.Context
|
p.goPurge()
|
||||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
atomic.StoreInt32(&p.ticktockDone, 0)
|
||||||
if !p.options.DisablePurge {
|
p.goTicktock()
|
||||||
go p.purgeStaleWorkers(ctx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
87
pool_func.go
87
pool_func.go
|
@ -61,8 +61,11 @@ type PoolWithFunc struct {
|
||||||
// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
|
// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
|
||||||
waiting int32
|
waiting int32
|
||||||
|
|
||||||
heartbeatDone int32
|
purgeDone int32
|
||||||
stopHeartbeat context.CancelFunc
|
stopPurge context.CancelFunc
|
||||||
|
|
||||||
|
ticktockDone int32
|
||||||
|
stopTicktock context.CancelFunc
|
||||||
|
|
||||||
now atomic.Value
|
now atomic.Value
|
||||||
|
|
||||||
|
@ -71,10 +74,10 @@ type PoolWithFunc struct {
|
||||||
|
|
||||||
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
|
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
|
||||||
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
|
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
|
||||||
heartbeat := time.NewTicker(p.options.ExpiryDuration)
|
ticker := time.NewTicker(p.options.ExpiryDuration)
|
||||||
defer func() {
|
defer func() {
|
||||||
heartbeat.Stop()
|
ticker.Stop()
|
||||||
atomic.StoreInt32(&p.heartbeatDone, 1)
|
atomic.StoreInt32(&p.purgeDone, 1)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var expiredWorkers []*goWorkerWithFunc
|
var expiredWorkers []*goWorkerWithFunc
|
||||||
|
@ -82,7 +85,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-heartbeat.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
|
@ -134,15 +137,46 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ticktock is a goroutine that updates the current time in the pool regularly.
|
// ticktock is a goroutine that updates the current time in the pool regularly.
|
||||||
func (p *PoolWithFunc) ticktock() {
|
func (p *PoolWithFunc) ticktock(ctx context.Context) {
|
||||||
ticker := time.NewTicker(nowTimeUpdateInterval)
|
ticker := time.NewTicker(nowTimeUpdateInterval)
|
||||||
defer ticker.Stop()
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
atomic.StoreInt32(&p.ticktockDone, 1)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.IsClosed() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
for range ticker.C {
|
|
||||||
p.now.Store(time.Now())
|
p.now.Store(time.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PoolWithFunc) goPurge() {
|
||||||
|
if p.options.DisablePurge {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a goroutine to clean up expired workers periodically.
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, p.stopPurge = context.WithCancel(context.Background())
|
||||||
|
go p.purgeStaleWorkers(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PoolWithFunc) goTicktock() {
|
||||||
|
p.now.Store(time.Now())
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, p.stopTicktock = context.WithCancel(context.Background())
|
||||||
|
go p.ticktock(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *PoolWithFunc) nowTime() time.Time {
|
func (p *PoolWithFunc) nowTime() time.Time {
|
||||||
return p.now.Load().(time.Time)
|
return p.now.Load().(time.Time)
|
||||||
}
|
}
|
||||||
|
@ -191,15 +225,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
|
||||||
}
|
}
|
||||||
p.cond = sync.NewCond(p.lock)
|
p.cond = sync.NewCond(p.lock)
|
||||||
|
|
||||||
// Start a goroutine to clean up expired workers periodically.
|
p.goPurge()
|
||||||
var ctx context.Context
|
p.goTicktock()
|
||||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
|
||||||
if !p.options.DisablePurge {
|
|
||||||
go p.purgeStaleWorkers(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.now.Store(time.Now())
|
|
||||||
go p.ticktock()
|
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
@ -288,17 +315,23 @@ func (p *PoolWithFunc) Release() {
|
||||||
|
|
||||||
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
|
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
|
||||||
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
||||||
if p.IsClosed() || p.stopHeartbeat == nil {
|
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
p.stopHeartbeat()
|
if p.stopPurge != nil {
|
||||||
p.stopHeartbeat = nil
|
p.stopPurge()
|
||||||
|
p.stopPurge = nil
|
||||||
|
}
|
||||||
|
p.stopTicktock()
|
||||||
|
p.stopTicktock = nil
|
||||||
p.Release()
|
p.Release()
|
||||||
|
|
||||||
endTime := time.Now().Add(timeout)
|
endTime := time.Now().Add(timeout)
|
||||||
for time.Now().Before(endTime) {
|
for time.Now().Before(endTime) {
|
||||||
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
|
if p.Running() == 0 &&
|
||||||
|
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
|
||||||
|
atomic.LoadInt32(&p.ticktockDone) == 1 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
@ -309,12 +342,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
||||||
// Reboot reboots a closed pool.
|
// Reboot reboots a closed pool.
|
||||||
func (p *PoolWithFunc) Reboot() {
|
func (p *PoolWithFunc) Reboot() {
|
||||||
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
|
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
|
||||||
atomic.StoreInt32(&p.heartbeatDone, 0)
|
atomic.StoreInt32(&p.purgeDone, 0)
|
||||||
var ctx context.Context
|
p.goPurge()
|
||||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
atomic.StoreInt32(&p.ticktockDone, 0)
|
||||||
if !p.options.DisablePurge {
|
p.goTicktock()
|
||||||
go p.purgeStaleWorkers(ctx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue