opt: refine ReleaseTimeout()

This commit is contained in:
Andy Pan 2022-05-06 20:27:08 +08:00
parent 9d85d57cc4
commit 15f3cdfb7b
2 changed files with 53 additions and 41 deletions

40
pool.go
View File

@ -23,7 +23,7 @@
package ants package ants
import ( import (
"errors" "context"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -59,21 +59,24 @@ type Pool struct {
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int blockingNum int
stopHeartbeat chan struct{} heartbeatDone int32
stopHeartbeat context.CancelFunc
options *Options options *Options
} }
// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically() { func (p *Pool) purgePeriodically(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop() defer func() {
heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1)
}()
for { for {
select { select {
case <-heartbeat.C: case <-heartbeat.C:
case <-p.stopHeartbeat: case <-ctx.Done():
p.stopHeartbeat <- struct{}{}
return return
} }
@ -124,7 +127,6 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
lock: internal.NewSpinLock(), lock: internal.NewSpinLock(),
stopHeartbeat: make(chan struct{}, 1),
options: opts, options: opts,
} }
p.workerCache.New = func() interface{} { p.workerCache.New = func() interface{} {
@ -145,7 +147,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically. // Start a goroutine to clean up expired workers periodically.
go p.purgePeriodically() var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
return p, nil return p, nil
} }
@ -225,18 +229,17 @@ func (p *Pool) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error { func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() { if p.IsClosed() || p.stopHeartbeat == nil {
return errors.New("pool is already closed") return ErrPoolClosed
}
select {
case p.stopHeartbeat <- struct{}{}:
<-p.stopHeartbeat
default:
} }
p.stopHeartbeat()
p.stopHeartbeat = nil
p.Release() p.Release()
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 { if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 {
return nil return nil
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -247,7 +250,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *Pool) Reboot() { func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.purgePeriodically() atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
} }
} }

View File

@ -23,7 +23,7 @@
package ants package ants
import ( import (
"errors" "context"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -61,22 +61,25 @@ type PoolWithFunc struct {
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int blockingNum int
stopHeartbeat chan struct{} heartbeatDone int32
stopHeartbeat context.CancelFunc
options *Options options *Options
} }
// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgePeriodically() { func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop() defer func() {
heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1)
}()
var expiredWorkers []*goWorkerWithFunc var expiredWorkers []*goWorkerWithFunc
for { for {
select { select {
case <-heartbeat.C: case <-heartbeat.C:
case <-p.stopHeartbeat: case <-ctx.Done():
p.stopHeartbeat <- struct{}{}
return return
} }
@ -144,7 +147,6 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
capacity: int32(size), capacity: int32(size),
poolFunc: pf, poolFunc: pf,
lock: internal.NewSpinLock(), lock: internal.NewSpinLock(),
stopHeartbeat: make(chan struct{}, 1),
options: opts, options: opts,
} }
p.workerCache.New = func() interface{} { p.workerCache.New = func() interface{} {
@ -162,7 +164,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically. // Start a goroutine to clean up expired workers periodically.
go p.purgePeriodically() var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
return p, nil return p, nil
} }
@ -246,18 +250,17 @@ func (p *PoolWithFunc) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() { if p.IsClosed() || p.stopHeartbeat == nil {
return errors.New("pool is already closed") return ErrPoolClosed
}
select {
case p.stopHeartbeat <- struct{}{}:
<-p.stopHeartbeat
default:
} }
p.stopHeartbeat()
p.stopHeartbeat = nil
p.Release() p.Release()
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 { if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 {
return nil return nil
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -268,7 +271,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() { func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.purgePeriodically() atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
} }
} }