diff --git a/pool.go b/pool.go index 95909e9..a0dc6a5 100644 --- a/pool.go +++ b/pool.go @@ -62,6 +62,9 @@ type Pool struct { heartbeatDone int32 stopHeartbeat context.CancelFunc + ticktockDone int32 + stopTicktock context.CancelFunc + now atomic.Value options *Options @@ -111,15 +114,44 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *Pool) ticktock() { +func (p *Pool) ticktock(ctx context.Context) { ticker := time.NewTicker(nowTimeUpdateInterval) - defer ticker.Stop() + defer func() { + ticker.Stop() + atomic.StoreInt32(&p.ticktockDone, 1) + }() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if p.IsClosed() { + break + } - for range ticker.C { p.now.Store(time.Now()) } } +func (p *Pool) startHeartbeat() { + // Start a goroutine to clean up expired workers periodically. + var ctx context.Context + ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + if !p.options.DisablePurge { + go p.purgeStaleWorkers(ctx) + } +} + +func (p *Pool) startTicktock() { + p.now.Store(time.Now()) + var ctx context.Context + ctx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock(ctx) +} + func (p *Pool) nowTime() time.Time { return p.now.Load().(time.Time) } @@ -166,15 +198,8 @@ func NewPool(size int, options ...Option) (*Pool, error) { p.cond = sync.NewCond(p.lock) - // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } - - p.now.Store(time.Now()) - go p.ticktock() + p.startHeartbeat() + p.startTicktock() return p, nil } @@ -259,17 +284,21 @@ func (p *Pool) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *Pool) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || p.stopHeartbeat == nil { + if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil { return ErrPoolClosed } p.stopHeartbeat() p.stopHeartbeat = nil + p.stopTicktock() + p.stopTicktock = nil p.Release() endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } time.Sleep(10 * time.Millisecond) @@ -281,11 +310,9 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.heartbeatDone, 0) - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } + p.startHeartbeat() + atomic.StoreInt32(&p.ticktockDone, 0) + p.startTicktock() } } diff --git a/pool_func.go b/pool_func.go index a3cf224..35a0651 100644 --- a/pool_func.go +++ b/pool_func.go @@ -64,6 +64,9 @@ type PoolWithFunc struct { heartbeatDone int32 stopHeartbeat context.CancelFunc + ticktockDone int32 + stopTicktock context.CancelFunc + now atomic.Value options *Options @@ -134,15 +137,44 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *PoolWithFunc) ticktock() { +func (p *PoolWithFunc) ticktock(ctx context.Context) { ticker := time.NewTicker(nowTimeUpdateInterval) - defer ticker.Stop() + defer func() { + ticker.Stop() + atomic.StoreInt32(&p.ticktockDone, 1) + }() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if p.IsClosed() { + break + } - for range ticker.C { p.now.Store(time.Now()) } } +func (p *PoolWithFunc) startHeartbeat() { + // Start a goroutine to clean up expired workers periodically. + var ctx context.Context + ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + if !p.options.DisablePurge { + go p.purgeStaleWorkers(ctx) + } +} + +func (p *PoolWithFunc) startTicktock() { + p.now.Store(time.Now()) + var ctx context.Context + ctx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock(ctx) +} + func (p *PoolWithFunc) nowTime() time.Time { return p.now.Load().(time.Time) } @@ -191,15 +223,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p.cond = sync.NewCond(p.lock) - // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } - - p.now.Store(time.Now()) - go p.ticktock() + p.startHeartbeat() + p.startTicktock() return p, nil } @@ -288,17 +313,21 @@ func (p *PoolWithFunc) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || p.stopHeartbeat == nil { + if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil { return ErrPoolClosed } p.stopHeartbeat() p.stopHeartbeat = nil + p.stopTicktock() + p.stopTicktock = nil p.Release() endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } time.Sleep(10 * time.Millisecond) @@ -310,11 +339,9 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.heartbeatDone, 0) - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } + p.startHeartbeat() + atomic.StoreInt32(&p.ticktockDone, 0) + p.startTicktock() } }