From 15f3cdfb7bd40e21ae0eb4e1ba9901d8cfa1cb2a Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Fri, 6 May 2022 20:27:08 +0800 Subject: [PATCH] opt: refine ReleaseTimeout() --- pool.go | 46 ++++++++++++++++++++++++++-------------------- pool_func.go | 48 +++++++++++++++++++++++++++--------------------- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/pool.go b/pool.go index e8c4335..89ad35d 100644 --- a/pool.go +++ b/pool.go @@ -23,7 +23,7 @@ package ants import ( - "errors" + "context" "sync" "sync/atomic" "time" @@ -59,21 +59,24 @@ type Pool struct { // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock blockingNum int - stopHeartbeat chan struct{} + heartbeatDone int32 + stopHeartbeat context.CancelFunc options *Options } // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. -func (p *Pool) purgePeriodically() { +func (p *Pool) purgePeriodically(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) - defer heartbeat.Stop() + defer func() { + heartbeat.Stop() + atomic.StoreInt32(&p.heartbeatDone, 1) + }() for { select { case <-heartbeat.C: - case <-p.stopHeartbeat: - p.stopHeartbeat <- struct{}{} + case <-ctx.Done(): return } @@ -122,10 +125,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { } p := &Pool{ - capacity: int32(size), - lock: internal.NewSpinLock(), - stopHeartbeat: make(chan struct{}, 1), - options: opts, + capacity: int32(size), + lock: internal.NewSpinLock(), + options: opts, } p.workerCache.New = func() interface{} { return &goWorker{ @@ -145,7 +147,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { p.cond = sync.NewCond(p.lock) // Start a goroutine to clean up expired workers periodically. - go p.purgePeriodically() + var ctx context.Context + ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + go p.purgePeriodically(ctx) return p, nil } @@ -225,18 +229,17 @@ func (p *Pool) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *Pool) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() { - return errors.New("pool is already closed") - } - select { - case p.stopHeartbeat <- struct{}{}: - <-p.stopHeartbeat - default: + if p.IsClosed() || p.stopHeartbeat == nil { + return ErrPoolClosed } + + p.stopHeartbeat() + p.stopHeartbeat = nil p.Release() + endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 { + if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 { return nil } time.Sleep(10 * time.Millisecond) @@ -247,7 +250,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { // Reboot reboots a closed pool. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - go p.purgePeriodically() + atomic.StoreInt32(&p.heartbeatDone, 0) + var ctx context.Context + ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + go p.purgePeriodically(ctx) } } diff --git a/pool_func.go b/pool_func.go index f9accb2..bfa4ccb 100644 --- a/pool_func.go +++ b/pool_func.go @@ -23,7 +23,7 @@ package ants import ( - "errors" + "context" "sync" "sync/atomic" "time" @@ -61,22 +61,25 @@ type PoolWithFunc struct { // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock blockingNum int - stopHeartbeat chan struct{} + heartbeatDone int32 + stopHeartbeat context.CancelFunc options *Options } // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgePeriodically() { +func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) - defer heartbeat.Stop() + defer func() { + heartbeat.Stop() + atomic.StoreInt32(&p.heartbeatDone, 1) + }() var expiredWorkers []*goWorkerWithFunc for { select { case <-heartbeat.C: - case <-p.stopHeartbeat: - p.stopHeartbeat <- struct{}{} + case <-ctx.Done(): return } @@ -141,11 +144,10 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p := &PoolWithFunc{ - capacity: int32(size), - poolFunc: pf, - lock: internal.NewSpinLock(), - stopHeartbeat: make(chan struct{}, 1), - options: opts, + capacity: int32(size), + poolFunc: pf, + lock: internal.NewSpinLock(), + options: opts, } p.workerCache.New = func() interface{} { return &goWorkerWithFunc{ @@ -162,7 +164,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi p.cond = sync.NewCond(p.lock) // Start a goroutine to clean up expired workers periodically. - go p.purgePeriodically() + var ctx context.Context + ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + go p.purgePeriodically(ctx) return p, nil } @@ -246,18 +250,17 @@ func (p *PoolWithFunc) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() { - return errors.New("pool is already closed") - } - select { - case p.stopHeartbeat <- struct{}{}: - <-p.stopHeartbeat - default: + if p.IsClosed() || p.stopHeartbeat == nil { + return ErrPoolClosed } + + p.stopHeartbeat() + p.stopHeartbeat = nil p.Release() + endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 { + if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 { return nil } time.Sleep(10 * time.Millisecond) @@ -268,7 +271,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { // Reboot reboots a closed pool. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - go p.purgePeriodically() + atomic.StoreInt32(&p.heartbeatDone, 0) + var ctx context.Context + ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + go p.purgePeriodically(ctx) } }