diff --git a/ants.go b/ants.go index 3573b1c..78e3b63 100644 --- a/ants.go +++ b/ants.go @@ -25,7 +25,6 @@ package ants import ( "errors" "math" - "runtime" ) const ( @@ -33,11 +32,11 @@ const ( DefaultPoolSize = math.MaxInt32 // DefaultCleanIntervalTime is the interval time to clean up goroutines - DefaultCleanIntervalTime = 30 + DefaultCleanIntervalTime = 10 ) // Init a instance pool when importing ants -var defaultPool, _ = NewPool(DefaultPoolSize) +var defaultPool, _ = NewPool(DefaultPoolSize, DefaultCleanIntervalTime) // Submit submit a task to pool func Submit(task f) error { @@ -70,9 +69,3 @@ var ( ErrPoolClosed = errors.New("this pool has been closed") ) -var workerArgsCap = func() int { - if runtime.GOMAXPROCS(0) == 1 { - return 0 - } - return 1 -}() diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index e81dff4..35c83a3 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -78,7 +78,7 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { func BenchmarkAntsPoolWithFunc(b *testing.B) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(50000, 1,func(i interface{}) error { demoPoolFunc(i) wg.Done() return nil @@ -104,7 +104,7 @@ func BenchmarkGoroutine(b *testing.B) { } func BenchmarkAntsPool(b *testing.B) { - p, _ := ants.NewPoolWithFunc(50000, demoPoolFunc) + p, _ := ants.NewPoolWithFunc(50000, 1, demoPoolFunc) defer p.Release() b.ResetTimer() diff --git a/examples/main.go b/examples/main.go index 53ef719..c9b6aae 100644 --- a/examples/main.go +++ b/examples/main.go @@ -67,7 +67,7 @@ func main() { // use the pool with a function // set 10 the size of goroutine pool - p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error { myFunc(i) wg.Done() return nil diff --git a/pool.go b/pool.go index 003b11d..9184c24 100644 --- a/pool.go +++ b/pool.go @@ -26,6 +26,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) type sig struct{} @@ -41,6 +42,9 @@ type Pool struct { // running is the number of the currently running goroutines. running int32 + // expiryDuration set the expired time (second) of every worker. + expiryDuration time.Duration + // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig @@ -57,8 +61,34 @@ type Pool struct { once sync.Once } +func (p *Pool) monitorAndClear() { + go func() { + for { + time.Sleep(p.expiryDuration) + currentTime := time.Now() + p.lock.Lock() + idleWorkers := p.workers + n := 0 + for i, w := range idleWorkers { + if currentTime.Sub(w.recycleTime) <= p.expiryDuration { + break + } + n = i + w.stop() + idleWorkers[i] = nil + } + if n > 0 { + n += 1 + p.workers = idleWorkers[n:] + } + p.lock.Unlock() + } + }() +} + + // NewPool generates a instance of ants pool -func NewPool(size int) (*Pool, error) { +func NewPool(size, expiry int) (*Pool, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } @@ -66,8 +96,9 @@ func NewPool(size int) (*Pool, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), + expiryDuration: time.Duration(expiry) * time.Second, } - + p.monitorAndClear() return p, nil } @@ -171,6 +202,7 @@ func (p *Pool) getWorker() *Worker { // putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { + worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/pool_func.go b/pool_func.go index 8739feb..439b0fe 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,6 +26,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) type pf func(interface{}) error @@ -39,6 +40,9 @@ type PoolWithFunc struct { // running is the number of the currently running goroutines. running int32 + // expiryDuration set the expired time (second) of every worker. + expiryDuration time.Duration + // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig @@ -58,8 +62,33 @@ type PoolWithFunc struct { once sync.Once } +func (p *PoolWithFunc) MonitorAndClear() { + go func() { + for { + time.Sleep(p.expiryDuration) + currentTime := time.Now() + p.lock.Lock() + idleWorkers := p.workers + n := 0 + for i, w := range idleWorkers { + if currentTime.Sub(w.recycleTime) <= p.expiryDuration { + break + } + n = i + w.stop() + idleWorkers[i] = nil + } + if n > 0 { + n += 1 + p.workers = idleWorkers[n:] + } + p.lock.Unlock() + } + }() +} + // NewPoolWithFunc generates a instance of ants pool with a specific function. -func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { +func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } @@ -67,9 +96,10 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), + expiryDuration: time.Duration(expiry) * time.Second, poolFunc: f, } - + p.MonitorAndClear() return p, nil } @@ -176,6 +206,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker.go b/worker.go index 162673e..babb8af 100644 --- a/worker.go +++ b/worker.go @@ -24,6 +24,7 @@ package ants import ( "sync/atomic" + "time" ) // Worker is the actual executor who runs the tasks, @@ -35,6 +36,9 @@ type Worker struct { // task is a job should be done. task chan f + + // recycleTime will be update when putting a worker back into queue. + recycleTime time.Time } // run starts a goroutine to repeat the process diff --git a/worker_func.go b/worker_func.go index d533bfd..6467bac 100644 --- a/worker_func.go +++ b/worker_func.go @@ -24,6 +24,7 @@ package ants import ( "sync/atomic" + "time" ) // WorkerWithFunc is the actual executor who runs the tasks, @@ -35,6 +36,9 @@ type WorkerWithFunc struct { // args is a job should be done. args chan interface{} + + // recycleTime will be update when putting a worker back into queue. + recycleTime time.Time } // run starts a goroutine to repeat the process