From c186871e608dbd61ed48b08319cd86ffddad1db5 Mon Sep 17 00:00:00 2001 From: Andy Pan <panjf2000@gmail.com> Date: Sun, 15 Jul 2018 23:58:09 +0800 Subject: [PATCH 1/4] update --- ants_benchmark_test.go | 2 +- ants_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0374d8a..af0ef99 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -44,7 +44,7 @@ const ( const ( RunTimes = 10000000 Param = 100 - AntsSize = 500 + AntsSize = 100 ) func demoFunc() error { diff --git a/ants_test.go b/ants_test.go index 297ddd3..5edf9c4 100644 --- a/ants_test.go +++ b/ants_test.go @@ -30,7 +30,7 @@ import ( "github.com/panjf2000/ants" ) -var n = 1000000 +var n = 100000 func TestAntsPoolWithFunc(t *testing.T) { var wg sync.WaitGroup From eaf79d239f42226b27c7ace2eed7485c06ddc07c Mon Sep 17 00:00:00 2001 From: Andy Pan <panjf2000@gmail.com> Date: Mon, 16 Jul 2018 01:21:23 +0800 Subject: [PATCH 2/4] fixed some issues --- ants_test.go | 4 ++-- pool.go | 65 +++++++++++++++++++++++++++++--------------------- pool_func.go | 65 +++++++++++++++++++++++++++++--------------------- worker.go | 1 - worker_func.go | 1 - 5 files changed, 78 insertions(+), 58 deletions(-) diff --git a/ants_test.go b/ants_test.go index 5edf9c4..908f414 100644 --- a/ants_test.go +++ b/ants_test.go @@ -112,7 +112,7 @@ func TestCodeCov(t *testing.T) { t.Logf("pool, free workers number:%d", p0.Free()) p0.ReSize(AntsSize) p0.ReSize(AntsSize / 2) - t.Logf("pool, after resize, capacity:%d", p0.Cap()) + t.Logf("pool, after resize, capacity:%d, running:%d", p0.Cap(), p0.Running()) p, _ := ants.NewPoolWithFunc(AntsSize, demoPoolFunc) defer p.Serve(Param) @@ -125,7 +125,7 @@ func TestCodeCov(t *testing.T) { t.Logf("pool with func, free workers number:%d", p.Free()) p.ReSize(AntsSize) p.ReSize(AntsSize / 2) - t.Logf("pool with func, after resize, capacity:%d", p.Cap()) + t.Logf("pool with func, after resize, capacity:%d, running:%d", p.Cap(), p.Running()) } // func TestNoPool(t *testing.T) { diff --git a/pool.go b/pool.go index 3f988aa..0207c31 100644 --- a/pool.go +++ b/pool.go @@ -68,15 +68,18 @@ func (p *Pool) monitorAndClear() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers + if len(idleWorkers) == 0 && len(p.release) > 0 { + return + } n := 0 for i, w := range idleWorkers { if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i + <-p.freeSignal w.task <- nil idleWorkers[i] = nil - atomic.AddInt32(&p.running, 1) } if n > 0 { n++ @@ -137,34 +140,42 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Release Closed this pool -func (p *Pool) Release() error { - p.once.Do(func() { - p.release <- sig{} - running := p.Running() - for i := 0; i < running; i++ { - p.getWorker().task <- nil - } - p.lock.Lock() - p.workers = nil - p.lock.Unlock() - }) - return nil -} - // ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { if size < p.Cap() { diff := p.Cap() - size + p.lock.Lock() + idleWorkers := p.workers for i := 0; i < diff; i++ { - p.getWorker().task <- nil + <-p.freeSignal + idleWorkers[i].task <- nil + idleWorkers[i] = nil } + p.workers = idleWorkers[diff:] + p.lock.Unlock() } else if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) } +// Release Closed this pool +func (p *Pool) Release() error { + p.once.Do(func() { + p.release <- sig{} + p.lock.Lock() + idleWorkers := p.workers + for i, w := range idleWorkers { + <-p.freeSignal + w.task <- nil + idleWorkers[i] = nil + } + p.workers = nil + p.lock.Unlock() + }) + return nil +} + //------------------------------------------------------------------------- // getWorker returns a available worker to run the tasks. @@ -173,8 +184,8 @@ func (p *Pool) getWorker() *Worker { waiting := false p.lock.Lock() - workers := p.workers - n := len(workers) - 1 + idleWorkers := p.workers + n := len(idleWorkers) - 1 if n < 0 { if p.Running() >= p.Cap() { waiting = true @@ -183,20 +194,20 @@ func (p *Pool) getWorker() *Worker { } } else { <-p.freeSignal - w = workers[n] - workers[n] = nil - p.workers = workers[:n] + w = idleWorkers[n] + idleWorkers[n] = nil + p.workers = idleWorkers[:n] } p.lock.Unlock() if waiting { <-p.freeSignal p.lock.Lock() - workers = p.workers - l := len(workers) - 1 - w = workers[l] - workers[l] = nil - p.workers = workers[:l] + idleWorkers = p.workers + l := len(idleWorkers) - 1 + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] p.lock.Unlock() } else if w == nil { w = &Worker{ diff --git a/pool_func.go b/pool_func.go index c4dea88..9ca3bfc 100644 --- a/pool_func.go +++ b/pool_func.go @@ -69,15 +69,18 @@ func (p *PoolWithFunc) monitorAndClear() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers + if len(idleWorkers) == 0 && len(p.release) > 0 { + return + } n := 0 for i, w := range idleWorkers { if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i + <-p.freeSignal w.args <- nil idleWorkers[i] = nil - atomic.AddInt32(&p.running, 1) } if n > 0 { n++ @@ -142,34 +145,42 @@ func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Release Closed this pool -func (p *PoolWithFunc) Release() error { - p.once.Do(func() { - p.release <- sig{} - running := p.Running() - for i := 0; i < running; i++ { - p.getWorker().args <- nil - } - p.lock.Lock() - p.workers = nil - p.lock.Unlock() - }) - return nil -} - // ReSize change the capacity of this pool func (p *PoolWithFunc) ReSize(size int) { if size < p.Cap() { diff := p.Cap() - size + p.lock.Lock() + idleWorkers := p.workers for i := 0; i < diff; i++ { - p.getWorker().args <- nil + <-p.freeSignal + idleWorkers[i].args <- nil + idleWorkers[i] = nil } + p.workers = idleWorkers[diff:] + p.lock.Unlock() } else if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) } +// Release Closed this pool +func (p *PoolWithFunc) Release() error { + p.once.Do(func() { + p.release <- sig{} + p.lock.Lock() + idleWorkers := p.workers + for i, w := range idleWorkers { + <-p.freeSignal + w.args <- nil + idleWorkers[i] = nil + } + p.workers = nil + p.lock.Unlock() + }) + return nil +} + //------------------------------------------------------------------------- // getWorker returns a available worker to run the tasks. @@ -178,8 +189,8 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { waiting := false p.lock.Lock() - workers := p.workers - n := len(workers) - 1 + idleWorkers := p.workers + n := len(idleWorkers) - 1 if n < 0 { if p.Running() >= p.Cap() { waiting = true @@ -188,20 +199,20 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { } } else { <-p.freeSignal - w = workers[n] - workers[n] = nil - p.workers = workers[:n] + w = idleWorkers[n] + idleWorkers[n] = nil + p.workers = idleWorkers[:n] } p.lock.Unlock() if waiting { <-p.freeSignal p.lock.Lock() - workers = p.workers - l := len(workers) - 1 - w = workers[l] - workers[l] = nil - p.workers = workers[:l] + idleWorkers = p.workers + l := len(idleWorkers) - 1 + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] p.lock.Unlock() } else if w == nil { w = &WorkerWithFunc{ diff --git a/worker.go b/worker.go index 9031f57..c0667fb 100644 --- a/worker.go +++ b/worker.go @@ -44,7 +44,6 @@ type Worker struct { // run starts a goroutine to repeat the process // that performs the function calls. func (w *Worker) run() { - //atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { if f == nil { diff --git a/worker_func.go b/worker_func.go index a3b5bb6..86c7448 100644 --- a/worker_func.go +++ b/worker_func.go @@ -44,7 +44,6 @@ type WorkerWithFunc struct { // run starts a goroutine to repeat the process // that performs the function calls. func (w *WorkerWithFunc) run() { - //atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { if args == nil { From d31b2413c66828ca1806bed13543fe12a1df5345 Mon Sep 17 00:00:00 2001 From: Andy Pan <panjf2000@gmail.com> Date: Mon, 16 Jul 2018 02:33:43 +0800 Subject: [PATCH 3/4] update --- pool.go | 25 +++++++++++-------------- pool_func.go | 25 +++++++++++-------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/pool.go b/pool.go index 0207c31..4f8dbae 100644 --- a/pool.go +++ b/pool.go @@ -68,7 +68,8 @@ func (p *Pool) monitorAndClear() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - if len(idleWorkers) == 0 && len(p.release) > 0 { + if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 { + p.lock.Unlock() return } n := 0 @@ -142,21 +143,17 @@ func (p *Pool) Cap() int { // ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { - if size < p.Cap() { - diff := p.Cap() - size - p.lock.Lock() - idleWorkers := p.workers - for i := 0; i < diff; i++ { - <-p.freeSignal - idleWorkers[i].task <- nil - idleWorkers[i] = nil - } - p.workers = idleWorkers[diff:] - p.lock.Unlock() - } else if size == p.Cap() { + if size == p.Cap() { return + } else if size < p.Cap() { + diff := p.Cap() - size + atomic.StoreInt32(&p.capacity, int32(size)) + for i := 0; i < diff; i++ { + p.getWorker().task <- nil + } + } else { + atomic.StoreInt32(&p.capacity, int32(size)) } - atomic.StoreInt32(&p.capacity, int32(size)) } // Release Closed this pool diff --git a/pool_func.go b/pool_func.go index 9ca3bfc..6099868 100644 --- a/pool_func.go +++ b/pool_func.go @@ -69,7 +69,8 @@ func (p *PoolWithFunc) monitorAndClear() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - if len(idleWorkers) == 0 && len(p.release) > 0 { + if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 { + p.lock.Unlock() return } n := 0 @@ -147,21 +148,17 @@ func (p *PoolWithFunc) Cap() int { // ReSize change the capacity of this pool func (p *PoolWithFunc) ReSize(size int) { - if size < p.Cap() { - diff := p.Cap() - size - p.lock.Lock() - idleWorkers := p.workers - for i := 0; i < diff; i++ { - <-p.freeSignal - idleWorkers[i].args <- nil - idleWorkers[i] = nil - } - p.workers = idleWorkers[diff:] - p.lock.Unlock() - } else if size == p.Cap() { + if size == p.Cap() { return + } else if size < p.Cap() { + diff := p.Cap() - size + atomic.StoreInt32(&p.capacity, int32(size)) + for i := 0; i < diff; i++ { + p.getWorker().args <- nil + } + } else { + atomic.StoreInt32(&p.capacity, int32(size)) } - atomic.StoreInt32(&p.capacity, int32(size)) } // Release Closed this pool From 4553a7a1c646263815e0eef7aaaa490be0679231 Mon Sep 17 00:00:00 2001 From: Andy Pan <panjf2000@gmail.com> Date: Mon, 16 Jul 2018 02:43:38 +0800 Subject: [PATCH 4/4] update codecov test --- pool.go | 9 ++++----- pool_func.go | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pool.go b/pool.go index 4f8dbae..1420c2e 100644 --- a/pool.go +++ b/pool.go @@ -145,14 +145,13 @@ func (p *Pool) Cap() int { func (p *Pool) ReSize(size int) { if size == p.Cap() { return - } else if size < p.Cap() { - diff := p.Cap() - size - atomic.StoreInt32(&p.capacity, int32(size)) + } + atomic.StoreInt32(&p.capacity, int32(size)) + diff := p.Running() - size + if diff > 0 { for i := 0; i < diff; i++ { p.getWorker().task <- nil } - } else { - atomic.StoreInt32(&p.capacity, int32(size)) } } diff --git a/pool_func.go b/pool_func.go index 6099868..6ba7817 100644 --- a/pool_func.go +++ b/pool_func.go @@ -150,14 +150,13 @@ func (p *PoolWithFunc) Cap() int { func (p *PoolWithFunc) ReSize(size int) { if size == p.Cap() { return - } else if size < p.Cap() { - diff := p.Cap() - size - atomic.StoreInt32(&p.capacity, int32(size)) + } + atomic.StoreInt32(&p.capacity, int32(size)) + diff := p.Running() - size + if diff > 0 { for i := 0; i < diff; i++ { p.getWorker().args <- nil } - } else { - atomic.StoreInt32(&p.capacity, int32(size)) } }