forked from mirror/ants
📝Optimize the logic when cleaning up expired workers
This commit is contained in:
parent
21a109c7f0
commit
9eb91a4ae0
8
pool.go
8
pool.go
|
@ -68,7 +68,7 @@ func (p *Pool) periodicallyPurge() {
|
||||||
heartbeat := time.NewTicker(p.expiryDuration)
|
heartbeat := time.NewTicker(p.expiryDuration)
|
||||||
defer heartbeat.Stop()
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
var purgeWorkers []*Worker
|
var expiredWorkers []*Worker
|
||||||
for range heartbeat.C {
|
for range heartbeat.C {
|
||||||
if CLOSED == atomic.LoadInt32(&p.release) {
|
if CLOSED == atomic.LoadInt32(&p.release) {
|
||||||
break
|
break
|
||||||
|
@ -81,7 +81,7 @@ func (p *Pool) periodicallyPurge() {
|
||||||
for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration {
|
for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration {
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
purgeWorkers = append(purgeWorkers[:0], idleWorkers[:i]...)
|
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
m := copy(idleWorkers, idleWorkers[i:])
|
m := copy(idleWorkers, idleWorkers[i:])
|
||||||
for i = m; i < n; i++ {
|
for i = m; i < n; i++ {
|
||||||
|
@ -95,9 +95,9 @@ func (p *Pool) periodicallyPurge() {
|
||||||
// This notification must be outside the p.lock, since w.task
|
// This notification must be outside the p.lock, since w.task
|
||||||
// may be blocking and may consume a lot of time if many workers
|
// may be blocking and may consume a lot of time if many workers
|
||||||
// are located on non-local CPUs.
|
// are located on non-local CPUs.
|
||||||
for i, w := range purgeWorkers {
|
for i, w := range expiredWorkers {
|
||||||
w.task <- nil
|
w.task <- nil
|
||||||
purgeWorkers[i] = nil
|
expiredWorkers[i] = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
33
pool_func.go
33
pool_func.go
|
@ -71,6 +71,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
heartbeat := time.NewTicker(p.expiryDuration)
|
heartbeat := time.NewTicker(p.expiryDuration)
|
||||||
defer heartbeat.Stop()
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
|
var expiredWorkers []*Worker
|
||||||
for range heartbeat.C {
|
for range heartbeat.C {
|
||||||
if CLOSED == atomic.LoadInt32(&p.release) {
|
if CLOSED == atomic.LoadInt32(&p.release) {
|
||||||
break
|
break
|
||||||
|
@ -78,23 +79,29 @@ func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
currentTime := time.Now()
|
currentTime := time.Now()
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
n := -1
|
n := len(idleWorkers)
|
||||||
for i, w := range idleWorkers {
|
i := 0
|
||||||
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
|
for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration {
|
||||||
break
|
i++
|
||||||
}
|
|
||||||
n = i
|
|
||||||
w.args <- nil
|
|
||||||
idleWorkers[i] = nil
|
|
||||||
}
|
}
|
||||||
if n > -1 {
|
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
|
||||||
if n >= len(idleWorkers)-1 {
|
if i > 0 {
|
||||||
p.workers = idleWorkers[:0]
|
m := copy(idleWorkers, idleWorkers[i:])
|
||||||
} else {
|
for i = m; i < n; i++ {
|
||||||
p.workers = idleWorkers[n+1:]
|
idleWorkers[i] = nil
|
||||||
}
|
}
|
||||||
|
p.workers = idleWorkers[:m]
|
||||||
}
|
}
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
|
||||||
|
// Notify obsolete workers to stop.
|
||||||
|
// This notification must be outside the p.lock, since w.task
|
||||||
|
// may be blocking and may consume a lot of time if many workers
|
||||||
|
// are located on non-local CPUs.
|
||||||
|
for i, w := range expiredWorkers {
|
||||||
|
w.args <- nil
|
||||||
|
expiredWorkers[i] = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue