From 337c644550032f24bc45253522a9beb1f0024a86 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:39:24 +0800 Subject: [PATCH] add expired time for PoolWithFunc --- pool_func.go | 31 ++++++++++++++++++++++++++++++- worker_func.go | 4 ++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pool_func.go b/pool_func.go index 8739feb..c6db78f 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,6 +26,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) type pf func(interface{}) error @@ -39,6 +40,9 @@ type PoolWithFunc struct { // running is the number of the currently running goroutines. running int32 + // expiryDuration set the expired time (second) of every worker. + expiryDuration time.Duration + // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig @@ -58,8 +62,31 @@ type PoolWithFunc struct { once sync.Once } +func (p *PoolWithFunc) MonitorAndClear() { + go func() { + for { + time.Sleep(p.expiryDuration) + currentTime := time.Now() + p.lock.Lock() + idleWorkers := p.workers + n := 0 + for i, w := range idleWorkers { + if currentTime.Sub(w.recycleTime) <= p.expiryDuration { + break + } + n = i + w.stop() + idleWorkers[i] = nil + } + n += 1 + p.workers = idleWorkers[n:] + p.lock.Unlock() + } + }() +} + // NewPoolWithFunc generates a instance of ants pool with a specific function. -func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { +func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } @@ -67,6 +94,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), + expiryDuration: time.Duration(expiry)*time.Second, poolFunc: f, } @@ -176,6 +204,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker_func.go b/worker_func.go index d533bfd..6467bac 100644 --- a/worker_func.go +++ b/worker_func.go @@ -24,6 +24,7 @@ package ants import ( "sync/atomic" + "time" ) // WorkerWithFunc is the actual executor who runs the tasks, @@ -35,6 +36,9 @@ type WorkerWithFunc struct { // args is a job should be done. args chan interface{} + + // recycleTime will be update when putting a worker back into queue. + recycleTime time.Time } // run starts a goroutine to repeat the process