diff --git a/pool.go b/pool.go index 3c50b4b..5fa2512 100644 --- a/pool.go +++ b/pool.go @@ -68,7 +68,7 @@ func (p *Pool) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() - var purgeWorkers []*Worker + var expiredWorkers []*Worker for range heartbeat.C { if CLOSED == atomic.LoadInt32(&p.release) { break @@ -81,7 +81,7 @@ func (p *Pool) periodicallyPurge() { for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration { i++ } - purgeWorkers = append(purgeWorkers[:0], idleWorkers[:i]...) + expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) if i > 0 { m := copy(idleWorkers, idleWorkers[i:]) for i = m; i < n; i++ { @@ -95,9 +95,9 @@ func (p *Pool) periodicallyPurge() { // This notification must be outside the p.lock, since w.task // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. - for i, w := range purgeWorkers { + for i, w := range expiredWorkers { w.task <- nil - purgeWorkers[i] = nil + expiredWorkers[i] = nil } } } diff --git a/pool_func.go b/pool_func.go index b8ef5ab..eafe81b 100644 --- a/pool_func.go +++ b/pool_func.go @@ -71,6 +71,7 @@ func (p *PoolWithFunc) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() + var expiredWorkers []*Worker for range heartbeat.C { if CLOSED == atomic.LoadInt32(&p.release) { break @@ -78,23 +79,29 @@ func (p *PoolWithFunc) periodicallyPurge() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - n := -1 - for i, w := range idleWorkers { - if currentTime.Sub(w.recycleTime) <= p.expiryDuration { - break - } - n = i - w.args <- nil - idleWorkers[i] = nil + n := len(idleWorkers) + i := 0 + for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration { + i++ } - if n > -1 { - if n >= len(idleWorkers)-1 { - p.workers = idleWorkers[:0] - } else { - p.workers = idleWorkers[n+1:] + expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) + if i > 0 { + m := copy(idleWorkers, idleWorkers[i:]) + for i = m; i < n; i++ { + idleWorkers[i] = nil } + p.workers = idleWorkers[:m] } p.lock.Unlock() + + // Notify obsolete workers to stop. + // This notification must be outside the p.lock, since w.task + // may be blocking and may consume a lot of time if many workers + // are located on non-local CPUs. + for i, w := range expiredWorkers { + w.args <- nil + expiredWorkers[i] = nil + } } }