add expired time for PoolWithFunc

This commit is contained in:
andy pan 2018-07-06 14:39:24 +08:00
parent 0cb5a50036
commit 337c644550
2 changed files with 34 additions and 1 deletions

View File

@ -26,6 +26,7 @@ import (
"math" "math"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
type pf func(interface{}) error type pf func(interface{}) error
@ -39,6 +40,9 @@ type PoolWithFunc struct {
// running is the number of the currently running goroutines. // running is the number of the currently running goroutines.
running int32 running int32
// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration
// freeSignal is used to notice pool there are available // freeSignal is used to notice pool there are available
// workers which can be sent to work. // workers which can be sent to work.
freeSignal chan sig freeSignal chan sig
@ -58,8 +62,31 @@ type PoolWithFunc struct {
once sync.Once once sync.Once
} }
func (p *PoolWithFunc) MonitorAndClear() {
go func() {
for {
time.Sleep(p.expiryDuration)
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
n := 0
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.stop()
idleWorkers[i] = nil
}
n += 1
p.workers = idleWorkers[n:]
p.lock.Unlock()
}
}()
}
// NewPoolWithFunc generates a instance of ants pool with a specific function. // NewPoolWithFunc generates a instance of ants pool with a specific function.
func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
if size <= 0 { if size <= 0 {
return nil, ErrPoolSizeInvalid return nil, ErrPoolSizeInvalid
} }
@ -67,6 +94,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {
capacity: int32(size), capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32), freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1), release: make(chan sig, 1),
expiryDuration: time.Duration(expiry)*time.Second,
poolFunc: f, poolFunc: f,
} }
@ -176,6 +204,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
// putWorker puts a worker back into free pool, recycling the goroutines. // putWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
worker.recycleTime = time.Now()
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
p.lock.Unlock() p.lock.Unlock()

View File

@ -24,6 +24,7 @@ package ants
import ( import (
"sync/atomic" "sync/atomic"
"time"
) )
// WorkerWithFunc is the actual executor who runs the tasks, // WorkerWithFunc is the actual executor who runs the tasks,
@ -35,6 +36,9 @@ type WorkerWithFunc struct {
// args is a job should be done. // args is a job should be done.
args chan interface{} args chan interface{}
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
} }
// run starts a goroutine to repeat the process // run starts a goroutine to repeat the process