Compare commits

...

7 Commits

Author SHA1 Message Date
re ed92fb6247 Merge remote-tracking branch 'upstream/master' 2022-12-21 14:45:48 +03:00
Andy Pan 88d2454bbb fix: resolve the build failures 2022-12-20 22:29:21 +08:00
Andy Pan b6eaea118b opt: refine some code 2022-12-20 22:09:35 +08:00
Andy Pan 858f91f48b chore: code cleanup 2022-12-20 21:55:28 +08:00
Andy Pan 4e0cb8cd03 chore: don't start workflow to scan code when there is no code changes 2022-12-20 21:36:02 +08:00
Andy Pan b1b2df0c10 chore: fix the broken build status icon 2022-12-20 21:33:16 +08:00
Gleb Radchenko 23c4f48d0d fix: exit ticktock goroutine when pool is closed 2022-12-20 21:15:34 +08:00
5 changed files with 122 additions and 58 deletions

View File

@ -9,6 +9,8 @@ on:
branches: branches:
- master - master
- dev - dev
paths-ignore:
- '**.md'
schedule: schedule:
# ┌───────────── minute (0 - 59) # ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23) # │ ┌───────────── hour (0 - 23)

View File

@ -2,7 +2,7 @@
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" /> <img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
<b>A goroutine pool for Go</b> <b>A goroutine pool for Go</b>
<br/><br/> <br/><br/>
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/workflow/status/panjf2000/ants/Tests?style=flat-square&logo=github-actions" /></a> <a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/ants/test.yml?branch=master&style=flat-square&logo=github-actions" /></a>
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a> <a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a> <a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a> <a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>

View File

@ -2,7 +2,7 @@
<img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" /> <img src="https://raw.githubusercontent.com/panjf2000/logos/master/ants/logo.png" />
<b>Go 语言的 goroutine 池</b> <b>Go 语言的 goroutine 池</b>
<br/><br/> <br/><br/>
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/workflow/status/panjf2000/ants/Tests?style=flat-square&logo=github-actions" /></a> <a title="Build Status" target="_blank" href="https://github.com/panjf2000/ants/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/ants/test.yml?branch=master&style=flat-square&logo=github-actions" /></a>
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a> <a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/ants"><img src="https://img.shields.io/codecov/c/github/panjf2000/ants?style=flat-square&logo=codecov" /></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a> <a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/v/release/panjf2000/ants.svg?color=161823&style=flat-square&logo=smartthings" /></a>
<a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a> <a title="Tag" target="_blank" href="https://github.com/panjf2000/ants/tags"><img src="https://img.shields.io/github/v/tag/panjf2000/ants?color=%23ff8936&logo=fitbit&style=flat-square" /></a>

87
pool.go
View File

@ -59,8 +59,11 @@ type Pool struct {
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32 waiting int32
heartbeatDone int32 purgeDone int32
stopHeartbeat context.CancelFunc stopPurge context.CancelFunc
ticktockDone int32
stopTicktock context.CancelFunc
now atomic.Value now atomic.Value
@ -69,18 +72,18 @@ type Pool struct {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) { func (p *Pool) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
heartbeat.Stop() ticker.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1) atomic.StoreInt32(&p.purgeDone, 1)
}() }()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C: case <-ticker.C:
} }
if p.IsClosed() { if p.IsClosed() {
@ -111,15 +114,46 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
} }
// ticktock is a goroutine that updates the current time in the pool regularly. // ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock() { func (p *Pool) ticktock(ctx context.Context) {
ticker := time.NewTicker(nowTimeUpdateInterval) ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop() defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
for range ticker.C {
p.now.Store(time.Now()) p.now.Store(time.Now())
} }
} }
func (p *Pool) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
}
func (p *Pool) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}
func (p *Pool) nowTime() time.Time { func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time) return p.now.Load().(time.Time)
} }
@ -166,15 +200,8 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically. p.goPurge()
var ctx context.Context p.goTicktock()
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
p.now.Store(time.Now())
go p.ticktock()
return p, nil return p, nil
} }
@ -259,17 +286,23 @@ func (p *Pool) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error { func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || p.stopHeartbeat == nil { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed return ErrPoolClosed
} }
p.stopHeartbeat() if p.stopPurge != nil {
p.stopHeartbeat = nil p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
p.Release() p.Release()
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil return nil
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -280,12 +313,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *Pool) Reboot() { func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.heartbeatDone, 0) atomic.StoreInt32(&p.purgeDone, 0)
var ctx context.Context p.goPurge()
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) atomic.StoreInt32(&p.ticktockDone, 0)
if !p.options.DisablePurge { p.goTicktock()
go p.purgeStaleWorkers(ctx)
}
} }
} }

View File

@ -61,8 +61,11 @@ type PoolWithFunc struct {
// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32 waiting int32
heartbeatDone int32 purgeDone int32
stopHeartbeat context.CancelFunc stopPurge context.CancelFunc
ticktockDone int32
stopTicktock context.CancelFunc
now atomic.Value now atomic.Value
@ -71,10 +74,10 @@ type PoolWithFunc struct {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
heartbeat.Stop() ticker.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1) atomic.StoreInt32(&p.purgeDone, 1)
}() }()
var expiredWorkers []*goWorkerWithFunc var expiredWorkers []*goWorkerWithFunc
@ -82,7 +85,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C: case <-ticker.C:
} }
if p.IsClosed() { if p.IsClosed() {
@ -134,15 +137,46 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
} }
// ticktock is a goroutine that updates the current time in the pool regularly. // ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock() { func (p *PoolWithFunc) ticktock(ctx context.Context) {
ticker := time.NewTicker(nowTimeUpdateInterval) ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop() defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if p.IsClosed() {
break
}
for range ticker.C {
p.now.Store(time.Now()) p.now.Store(time.Now())
} }
} }
func (p *PoolWithFunc) goPurge() {
if p.options.DisablePurge {
return
}
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
}
func (p *PoolWithFunc) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}
func (p *PoolWithFunc) nowTime() time.Time { func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time) return p.now.Load().(time.Time)
} }
@ -191,15 +225,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
} }
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically. p.goPurge()
var ctx context.Context p.goTicktock()
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
p.now.Store(time.Now())
go p.ticktock()
return p, nil return p, nil
} }
@ -288,17 +315,23 @@ func (p *PoolWithFunc) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || p.stopHeartbeat == nil { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed return ErrPoolClosed
} }
p.stopHeartbeat() if p.stopPurge != nil {
p.stopHeartbeat = nil p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
p.Release() p.Release()
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil return nil
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -309,12 +342,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() { func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.heartbeatDone, 0) atomic.StoreInt32(&p.purgeDone, 0)
var ctx context.Context p.goPurge()
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) atomic.StoreInt32(&p.ticktockDone, 0)
if !p.options.DisablePurge { p.goTicktock()
go p.purgeStaleWorkers(ctx)
}
} }
} }