diff --git a/pool.go b/pool.go index a0dc6a5..41fd61d 100644 --- a/pool.go +++ b/pool.go @@ -59,8 +59,8 @@ type Pool struct { // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock waiting int32 - heartbeatDone int32 - stopHeartbeat context.CancelFunc + purgeDone int32 + stopPurge context.CancelFunc ticktockDone int32 stopTicktock context.CancelFunc @@ -72,18 +72,18 @@ type Pool struct { // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. func (p *Pool) purgeStaleWorkers(ctx context.Context) { - heartbeat := time.NewTicker(p.options.ExpiryDuration) + ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { - heartbeat.Stop() - atomic.StoreInt32(&p.heartbeatDone, 1) + ticker.Stop() + atomic.StoreInt32(&p.purgeDone, 1) }() for { select { case <-ctx.Done(): return - case <-heartbeat.C: + case <-ticker.C: } if p.IsClosed() { @@ -136,16 +136,16 @@ func (p *Pool) ticktock(ctx context.Context) { } } -func (p *Pool) startHeartbeat() { +func (p *Pool) goPurge() { // Start a goroutine to clean up expired workers periodically. var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + ctx, p.stopPurge = context.WithCancel(context.Background()) if !p.options.DisablePurge { go p.purgeStaleWorkers(ctx) } } -func (p *Pool) startTicktock() { +func (p *Pool) goTicktock() { p.now.Store(time.Now()) var ctx context.Context ctx, p.stopTicktock = context.WithCancel(context.Background()) @@ -198,8 +198,8 @@ func NewPool(size int, options ...Option) (*Pool, error) { p.cond = sync.NewCond(p.lock) - p.startHeartbeat() - p.startTicktock() + p.goPurge() + p.goTicktock() return p, nil } @@ -284,12 +284,12 @@ func (p *Pool) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *Pool) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil { + if p.IsClosed() || p.stopPurge == nil || p.stopTicktock == nil { return ErrPoolClosed } - p.stopHeartbeat() - p.stopHeartbeat = nil + p.stopPurge() + p.stopPurge = nil p.stopTicktock() p.stopTicktock = nil p.Release() @@ -297,7 +297,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } @@ -309,10 +309,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { // Reboot reboots a closed pool. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - atomic.StoreInt32(&p.heartbeatDone, 0) - p.startHeartbeat() + atomic.StoreInt32(&p.purgeDone, 0) + p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) - p.startTicktock() + p.goTicktock() } } diff --git a/pool_func.go b/pool_func.go index 35a0651..ec2dfd2 100644 --- a/pool_func.go +++ b/pool_func.go @@ -61,8 +61,8 @@ type PoolWithFunc struct { // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock waiting int32 - heartbeatDone int32 - stopHeartbeat context.CancelFunc + purgeDone int32 + stopPurge context.CancelFunc ticktockDone int32 stopTicktock context.CancelFunc @@ -74,10 +74,10 @@ type PoolWithFunc struct { // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { - heartbeat := time.NewTicker(p.options.ExpiryDuration) + ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { - heartbeat.Stop() - atomic.StoreInt32(&p.heartbeatDone, 1) + ticker.Stop() + atomic.StoreInt32(&p.purgeDone, 1) }() var expiredWorkers []*goWorkerWithFunc @@ -85,7 +85,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { select { case <-ctx.Done(): return - case <-heartbeat.C: + case <-ticker.C: } if p.IsClosed() { @@ -159,16 +159,16 @@ func (p *PoolWithFunc) ticktock(ctx context.Context) { } } -func (p *PoolWithFunc) startHeartbeat() { +func (p *PoolWithFunc) goPurge() { // Start a goroutine to clean up expired workers periodically. var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) + ctx, p.stopPurge = context.WithCancel(context.Background()) if !p.options.DisablePurge { go p.purgeStaleWorkers(ctx) } } -func (p *PoolWithFunc) startTicktock() { +func (p *PoolWithFunc) goTicktock() { p.now.Store(time.Now()) var ctx context.Context ctx, p.stopTicktock = context.WithCancel(context.Background()) @@ -223,8 +223,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p.cond = sync.NewCond(p.lock) - p.startHeartbeat() - p.startTicktock() + p.goPurge() + p.goTicktock() return p, nil } @@ -313,12 +313,12 @@ func (p *PoolWithFunc) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil { + if p.IsClosed() || p.stopPurge == nil || p.stopTicktock == nil { return ErrPoolClosed } - p.stopHeartbeat() - p.stopHeartbeat = nil + p.stopPurge() + p.stopPurge = nil p.stopTicktock() p.stopTicktock = nil p.Release() @@ -326,7 +326,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } @@ -338,10 +338,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { // Reboot reboots a closed pool. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - atomic.StoreInt32(&p.heartbeatDone, 0) - p.startHeartbeat() + atomic.StoreInt32(&p.purgeDone, 0) + p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) - p.startTicktock() + p.goTicktock() } }