diff --git a/pool.go b/pool.go index 338d94f..3c50b4b 100644 --- a/pool.go +++ b/pool.go @@ -68,6 +68,7 @@ func (p *Pool) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() + var purgeWorkers []*Worker for range heartbeat.C { if CLOSED == atomic.LoadInt32(&p.release) { break @@ -75,23 +76,29 @@ func (p *Pool) periodicallyPurge() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - n := -1 - for i, w := range idleWorkers { - if currentTime.Sub(w.recycleTime) <= p.expiryDuration { - break - } - n = i - w.task <- nil - idleWorkers[i] = nil + n := len(idleWorkers) + i := 0 + for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration { + i++ } - if n > -1 { - if n >= len(idleWorkers)-1 { - p.workers = idleWorkers[:0] - } else { - p.workers = idleWorkers[n+1:] + purgeWorkers = append(purgeWorkers[:0], idleWorkers[:i]...) + if i > 0 { + m := copy(idleWorkers, idleWorkers[i:]) + for i = m; i < n; i++ { + idleWorkers[i] = nil } + p.workers = idleWorkers[:m] } p.lock.Unlock() + + // Notify obsolete workers to stop. + // This notification must be outside the p.lock, since w.task + // may be blocking and may consume a lot of time if many workers + // are located on non-local CPUs. + for i, w := range purgeWorkers { + w.task <- nil + purgeWorkers[i] = nil + } } }