Compare commits

..

No commits in common. "ed92fb6247e9c0b5ed5cc0e88120590d58bdeff1" and "8a39ec0e4a8ad8e2d84c867e00985b96b09a0cd1" have entirely different histories.

5 changed files with 58 additions and 122 deletions

View File

@ -9,8 +9,6 @@ on:
branches:
- master
- dev
paths-ignore:
- '**.md'
schedule:
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)

View File

@ -2,7 +2,7 @@
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
<b>A goroutine pool for Go</b>
<br/><br/>
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/ants/test.yml?branch=master&style=flat-square&logo=github-actions" /></a>
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/workflow/status/panjf2000/ants/Tests?style=flat-square&logo=github-actions" /></a>
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>

View File

@ -2,7 +2,7 @@
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
<b>Go 语言的 goroutine 池</b>
<br/><br/>
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/ants/test.yml?branch=master&style=flat-square&logo=github-actions" /></a>
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/workflow/status/panjf2000/ants/Tests?style=flat-square&logo=github-actions" /></a>
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>

87
pool.go
View File

@ -59,11 +59,8 @@ type Pool struct {
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32
purgeDone int32
stopPurge context.CancelFunc
ticktockDone int32
stopTicktock context.CancelFunc
heartbeatDone int32
stopHeartbeat context.CancelFunc
now atomic.Value
@ -72,18 +69,18 @@ type Pool struct {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
ticker := time.NewTicker(p.options.ExpiryDuration)
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.purgeDone, 1)
heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-heartbeat.C:
}
if p.IsClosed() {
@ -114,46 +111,15 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
}
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock(ctx context.Context) {
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
defer ticker.Stop()
for range ticker.C {
p.now.Store(time.Now())
}
}
func (p *Pool) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
}
func (p *Pool) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}
func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time)
}
@ -200,8 +166,15 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p.cond = sync.NewCond(p.lock)
p.goPurge()
p.goTicktock()
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
p.now.Store(time.Now())
go p.ticktock()
return p, nil
}
@ -286,23 +259,17 @@ func (p *Pool) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
if p.IsClosed() || p.stopHeartbeat == nil {
return ErrPoolClosed
}
if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
p.stopHeartbeat()
p.stopHeartbeat = nil
p.Release()
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
return nil
}
time.Sleep(10 * time.Millisecond)
@ -313,10 +280,12 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
}
}

View File

@ -61,11 +61,8 @@ type PoolWithFunc struct {
// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32
purgeDone int32
stopPurge context.CancelFunc
ticktockDone int32
stopTicktock context.CancelFunc
heartbeatDone int32
stopHeartbeat context.CancelFunc
now atomic.Value
@ -74,10 +71,10 @@ type PoolWithFunc struct {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
ticker := time.NewTicker(p.options.ExpiryDuration)
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.purgeDone, 1)
heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1)
}()
var expiredWorkers []*goWorkerWithFunc
@ -85,7 +82,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-heartbeat.C:
}
if p.IsClosed() {
@ -137,46 +134,15 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
}
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock(ctx context.Context) {
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
defer ticker.Stop()
for range ticker.C {
p.now.Store(time.Now())
}
}
func (p *PoolWithFunc) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
}
func (p *PoolWithFunc) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}
func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time)
}
@ -225,8 +191,15 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
}
p.cond = sync.NewCond(p.lock)
p.goPurge()
p.goTicktock()
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
p.now.Store(time.Now())
go p.ticktock()
return p, nil
}
@ -315,23 +288,17 @@ func (p *PoolWithFunc) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
if p.IsClosed() || p.stopHeartbeat == nil {
return ErrPoolClosed
}
if p.stopPurge != nil {
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
p.stopHeartbeat()
p.stopHeartbeat = nil
p.Release()
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
return nil
}
time.Sleep(10 * time.Millisecond)
@ -342,10 +309,12 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
}
}