mirror of https://github.com/panjf2000/ants.git
clear expired workers for Pool
This commit is contained in:
parent
bd3ca2489e
commit
d04febc0b2
32
pool.go
32
pool.go
|
@ -26,6 +26,7 @@ import (
|
|||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type sig struct{}
|
||||
|
@ -41,6 +42,9 @@ type Pool struct {
|
|||
// running is the number of the currently running goroutines.
|
||||
running int32
|
||||
|
||||
// expiryDuration set the expired time (second) of every worker.
|
||||
expiryDuration time.Duration
|
||||
|
||||
// freeSignal is used to notice pool there are available
|
||||
// workers which can be sent to work.
|
||||
freeSignal chan sig
|
||||
|
@ -57,8 +61,32 @@ type Pool struct {
|
|||
once sync.Once
|
||||
}
|
||||
|
||||
func (p *Pool) MonitorAndClear() {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(p.expiryDuration)
|
||||
currentTime := time.Now()
|
||||
p.lock.Lock()
|
||||
idleWorkers := p.workers
|
||||
n := 0
|
||||
for i, w := range idleWorkers {
|
||||
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
|
||||
break
|
||||
}
|
||||
n = i
|
||||
w.stop()
|
||||
idleWorkers[i] = nil
|
||||
}
|
||||
n += 1
|
||||
p.workers = idleWorkers[n:]
|
||||
p.lock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
// NewPool generates a instance of ants pool
|
||||
func NewPool(size int) (*Pool, error) {
|
||||
func NewPool(size, expiry int) (*Pool, error) {
|
||||
if size <= 0 {
|
||||
return nil, ErrPoolSizeInvalid
|
||||
}
|
||||
|
@ -66,6 +94,7 @@ func NewPool(size int) (*Pool, error) {
|
|||
capacity: int32(size),
|
||||
freeSignal: make(chan sig, math.MaxInt32),
|
||||
release: make(chan sig, 1),
|
||||
expiryDuration: time.Duration(expiry)*time.Second,
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
@ -171,6 +200,7 @@ func (p *Pool) getWorker() *Worker {
|
|||
|
||||
// putWorker puts a worker back into free pool, recycling the goroutines.
|
||||
func (p *Pool) putWorker(worker *Worker) {
|
||||
worker.recycleTime = time.Now()
|
||||
p.lock.Lock()
|
||||
p.workers = append(p.workers, worker)
|
||||
p.lock.Unlock()
|
||||
|
|
|
@ -24,6 +24,7 @@ package ants
|
|||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Worker is the actual executor who runs the tasks,
|
||||
|
@ -35,6 +36,9 @@ type Worker struct {
|
|||
|
||||
// task is a job should be done.
|
||||
task chan f
|
||||
|
||||
// recycleTime will be update when putting a worker back into queue.
|
||||
recycleTime time.Time
|
||||
}
|
||||
|
||||
// run starts a goroutine to repeat the process
|
||||
|
|
Loading…
Reference in New Issue