From bd3ca2489e9f78cf0012e973c39a8a09c2a09e13 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:32:19 +0800 Subject: [PATCH 01/11] remove useless codes --- ants.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ants.go b/ants.go index 3573b1c..d8755eb 100644 --- a/ants.go +++ b/ants.go @@ -70,9 +70,3 @@ var ( ErrPoolClosed = errors.New("this pool has been closed") ) -var workerArgsCap = func() int { - if runtime.GOMAXPROCS(0) == 1 { - return 0 - } - return 1 -}() From d04febc0b2febe12538c03962be0970841d8b201 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:33:07 +0800 Subject: [PATCH 02/11] clear expired workers for Pool --- pool.go | 32 +++++++++++++++++++++++++++++++- worker.go | 4 ++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pool.go b/pool.go index 003b11d..ef03d8a 100644 --- a/pool.go +++ b/pool.go @@ -26,6 +26,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) type sig struct{} @@ -41,6 +42,9 @@ type Pool struct { // running is the number of the currently running goroutines. running int32 + // expiryDuration set the expired time (second) of every worker. + expiryDuration time.Duration + // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig @@ -57,8 +61,32 @@ type Pool struct { once sync.Once } +func (p *Pool) MonitorAndClear() { + go func() { + for { + time.Sleep(p.expiryDuration) + currentTime := time.Now() + p.lock.Lock() + idleWorkers := p.workers + n := 0 + for i, w := range idleWorkers { + if currentTime.Sub(w.recycleTime) <= p.expiryDuration { + break + } + n = i + w.stop() + idleWorkers[i] = nil + } + n += 1 + p.workers = idleWorkers[n:] + p.lock.Unlock() + } + }() +} + + // NewPool generates a instance of ants pool -func NewPool(size int) (*Pool, error) { +func NewPool(size, expiry int) (*Pool, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } @@ -66,6 +94,7 @@ func NewPool(size int) (*Pool, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), + expiryDuration: time.Duration(expiry)*time.Second, } return p, nil @@ -171,6 +200,7 @@ func (p *Pool) getWorker() *Worker { // putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { + worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker.go b/worker.go index 162673e..babb8af 100644 --- a/worker.go +++ b/worker.go @@ -24,6 +24,7 @@ package ants import ( "sync/atomic" + "time" ) // Worker is the actual executor who runs the tasks, @@ -35,6 +36,9 @@ type Worker struct { // task is a job should be done. task chan f + + // recycleTime will be update when putting a worker back into queue. + recycleTime time.Time } // run starts a goroutine to repeat the process From 56943d78d58a2b68969a2966f67e67104c40cc6d Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:33:53 +0800 Subject: [PATCH 03/11] add expired time for pool --- ants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ants.go b/ants.go index d8755eb..03dd478 100644 --- a/ants.go +++ b/ants.go @@ -37,7 +37,7 @@ const ( ) // Init a instance pool when importing ants -var defaultPool, _ = NewPool(DefaultPoolSize) +var defaultPool, _ = NewPool(DefaultPoolSize, 10) // Submit submit a task to pool func Submit(task f) error { From 0cb5a500366a301f0b3a74750f99f11a17e9ce6b Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:35:30 +0800 Subject: [PATCH 04/11] rename func --- pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool.go b/pool.go index ef03d8a..c94e5e8 100644 --- a/pool.go +++ b/pool.go @@ -61,7 +61,7 @@ type Pool struct { once sync.Once } -func (p *Pool) MonitorAndClear() { +func (p *Pool) monitorAndClear() { go func() { for { time.Sleep(p.expiryDuration) From 337c644550032f24bc45253522a9beb1f0024a86 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:39:24 +0800 Subject: [PATCH 05/11] add expired time for PoolWithFunc --- pool_func.go | 31 ++++++++++++++++++++++++++++++- worker_func.go | 4 ++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pool_func.go b/pool_func.go index 8739feb..c6db78f 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,6 +26,7 @@ import ( "math" "sync" "sync/atomic" + "time" ) type pf func(interface{}) error @@ -39,6 +40,9 @@ type PoolWithFunc struct { // running is the number of the currently running goroutines. running int32 + // expiryDuration set the expired time (second) of every worker. + expiryDuration time.Duration + // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig @@ -58,8 +62,31 @@ type PoolWithFunc struct { once sync.Once } +func (p *PoolWithFunc) MonitorAndClear() { + go func() { + for { + time.Sleep(p.expiryDuration) + currentTime := time.Now() + p.lock.Lock() + idleWorkers := p.workers + n := 0 + for i, w := range idleWorkers { + if currentTime.Sub(w.recycleTime) <= p.expiryDuration { + break + } + n = i + w.stop() + idleWorkers[i] = nil + } + n += 1 + p.workers = idleWorkers[n:] + p.lock.Unlock() + } + }() +} + // NewPoolWithFunc generates a instance of ants pool with a specific function. -func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { +func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { if size <= 0 { return nil, ErrPoolSizeInvalid } @@ -67,6 +94,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), + expiryDuration: time.Duration(expiry)*time.Second, poolFunc: f, } @@ -176,6 +204,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker_func.go b/worker_func.go index d533bfd..6467bac 100644 --- a/worker_func.go +++ b/worker_func.go @@ -24,6 +24,7 @@ package ants import ( "sync/atomic" + "time" ) // WorkerWithFunc is the actual executor who runs the tasks, @@ -35,6 +36,9 @@ type WorkerWithFunc struct { // args is a job should be done. args chan interface{} + + // recycleTime will be update when putting a worker back into queue. + recycleTime time.Time } // run starts a goroutine to repeat the process From 02a168974db1d6290c375a7732e7c1810b161472 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:40:43 +0800 Subject: [PATCH 06/11] add expired time in go test --- ants_benchmark_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index e81dff4..35c83a3 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -78,7 +78,7 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { func BenchmarkAntsPoolWithFunc(b *testing.B) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(50000, 1,func(i interface{}) error { demoPoolFunc(i) wg.Done() return nil @@ -104,7 +104,7 @@ func BenchmarkGoroutine(b *testing.B) { } func BenchmarkAntsPool(b *testing.B) { - p, _ := ants.NewPoolWithFunc(50000, demoPoolFunc) + p, _ := ants.NewPoolWithFunc(50000, 1, demoPoolFunc) defer p.Release() b.ResetTimer() From 5ae6239a5708657828ddfbdd5297e7e777bdb52c Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:42:26 +0800 Subject: [PATCH 07/11] start a goroutine to clear expired workers when init a pool --- pool.go | 2 +- pool_func.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index c94e5e8..40f2ba1 100644 --- a/pool.go +++ b/pool.go @@ -96,7 +96,7 @@ func NewPool(size, expiry int) (*Pool, error) { release: make(chan sig, 1), expiryDuration: time.Duration(expiry)*time.Second, } - + p.monitorAndClear() return p, nil } diff --git a/pool_func.go b/pool_func.go index c6db78f..3d92730 100644 --- a/pool_func.go +++ b/pool_func.go @@ -97,7 +97,7 @@ func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { expiryDuration: time.Duration(expiry)*time.Second, poolFunc: f, } - + p.MonitorAndClear() return p, nil } From 93533b243ba52f08d5e7216fc809bf38857bd827 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:45:35 +0800 Subject: [PATCH 08/11] remove unused imports --- ants.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ants.go b/ants.go index 03dd478..78e3b63 100644 --- a/ants.go +++ b/ants.go @@ -25,7 +25,6 @@ package ants import ( "errors" "math" - "runtime" ) const ( @@ -33,11 +32,11 @@ const ( DefaultPoolSize = math.MaxInt32 // DefaultCleanIntervalTime is the interval time to clean up goroutines - DefaultCleanIntervalTime = 30 + DefaultCleanIntervalTime = 10 ) // Init a instance pool when importing ants -var defaultPool, _ = NewPool(DefaultPoolSize, 10) +var defaultPool, _ = NewPool(DefaultPoolSize, DefaultCleanIntervalTime) // Submit submit a task to pool func Submit(task f) error { From 3e6386d097a4916b89e2cbc48a76b0fc9238fc23 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:47:25 +0800 Subject: [PATCH 09/11] update example codes --- examples/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/main.go b/examples/main.go index 53ef719..c9b6aae 100644 --- a/examples/main.go +++ b/examples/main.go @@ -67,7 +67,7 @@ func main() { // use the pool with a function // set 10 the size of goroutine pool - p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error { myFunc(i) wg.Done() return nil From b21f63142d492e0bcb0250a2be907b53a80e89d7 Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 14:56:59 +0800 Subject: [PATCH 10/11] fixed the "slice bounds out of range" error --- pool.go | 6 ++++-- pool_func.go | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pool.go b/pool.go index 40f2ba1..755e32d 100644 --- a/pool.go +++ b/pool.go @@ -77,8 +77,10 @@ func (p *Pool) monitorAndClear() { w.stop() idleWorkers[i] = nil } - n += 1 - p.workers = idleWorkers[n:] + if n > 0 { + n += 1 + p.workers = idleWorkers[n:] + } p.lock.Unlock() } }() diff --git a/pool_func.go b/pool_func.go index 3d92730..f583600 100644 --- a/pool_func.go +++ b/pool_func.go @@ -78,8 +78,10 @@ func (p *PoolWithFunc) MonitorAndClear() { w.stop() idleWorkers[i] = nil } - n += 1 - p.workers = idleWorkers[n:] + if n > 0 { + n += 1 + p.workers = idleWorkers[n:] + } p.lock.Unlock() } }() From afba560448d98a6bfe5270d5d5a74396af23601e Mon Sep 17 00:00:00 2001 From: andy pan Date: Fri, 6 Jul 2018 15:00:30 +0800 Subject: [PATCH 11/11] format codes --- pool.go | 2 +- pool_func.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index 755e32d..9184c24 100644 --- a/pool.go +++ b/pool.go @@ -96,7 +96,7 @@ func NewPool(size, expiry int) (*Pool, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), - expiryDuration: time.Duration(expiry)*time.Second, + expiryDuration: time.Duration(expiry) * time.Second, } p.monitorAndClear() return p, nil diff --git a/pool_func.go b/pool_func.go index f583600..439b0fe 100644 --- a/pool_func.go +++ b/pool_func.go @@ -96,7 +96,7 @@ func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), - expiryDuration: time.Duration(expiry)*time.Second, + expiryDuration: time.Duration(expiry) * time.Second, poolFunc: f, } p.MonitorAndClear()