From eaf79d239f42226b27c7ace2eed7485c06ddc07c Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Mon, 16 Jul 2018 01:21:23 +0800 Subject: [PATCH] fixed some issues --- ants_test.go | 4 ++-- pool.go | 65 +++++++++++++++++++++++++++++--------------------- pool_func.go | 65 +++++++++++++++++++++++++++++--------------------- worker.go | 1 - worker_func.go | 1 - 5 files changed, 78 insertions(+), 58 deletions(-) diff --git a/ants_test.go b/ants_test.go index 5edf9c4..908f414 100644 --- a/ants_test.go +++ b/ants_test.go @@ -112,7 +112,7 @@ func TestCodeCov(t *testing.T) { t.Logf("pool, free workers number:%d", p0.Free()) p0.ReSize(AntsSize) p0.ReSize(AntsSize / 2) - t.Logf("pool, after resize, capacity:%d", p0.Cap()) + t.Logf("pool, after resize, capacity:%d, running:%d", p0.Cap(), p0.Running()) p, _ := ants.NewPoolWithFunc(AntsSize, demoPoolFunc) defer p.Serve(Param) @@ -125,7 +125,7 @@ func TestCodeCov(t *testing.T) { t.Logf("pool with func, free workers number:%d", p.Free()) p.ReSize(AntsSize) p.ReSize(AntsSize / 2) - t.Logf("pool with func, after resize, capacity:%d", p.Cap()) + t.Logf("pool with func, after resize, capacity:%d, running:%d", p.Cap(), p.Running()) } // func TestNoPool(t *testing.T) { diff --git a/pool.go b/pool.go index 3f988aa..0207c31 100644 --- a/pool.go +++ b/pool.go @@ -68,15 +68,18 @@ func (p *Pool) monitorAndClear() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers + if len(idleWorkers) == 0 && len(p.release) > 0 { + return + } n := 0 for i, w := range idleWorkers { if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i + <-p.freeSignal w.task <- nil idleWorkers[i] = nil - atomic.AddInt32(&p.running, 1) } if n > 0 { n++ @@ -137,34 +140,42 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Release Closed this pool -func (p *Pool) Release() error { - p.once.Do(func() { - p.release <- sig{} - running := p.Running() - for i := 0; i < running; i++ { - p.getWorker().task <- nil - } - p.lock.Lock() - p.workers = nil - p.lock.Unlock() - }) - return nil -} - // ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { if size < p.Cap() { diff := p.Cap() - size + p.lock.Lock() + idleWorkers := p.workers for i := 0; i < diff; i++ { - p.getWorker().task <- nil + <-p.freeSignal + idleWorkers[i].task <- nil + idleWorkers[i] = nil } + p.workers = idleWorkers[diff:] + p.lock.Unlock() } else if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) } +// Release Closed this pool +func (p *Pool) Release() error { + p.once.Do(func() { + p.release <- sig{} + p.lock.Lock() + idleWorkers := p.workers + for i, w := range idleWorkers { + <-p.freeSignal + w.task <- nil + idleWorkers[i] = nil + } + p.workers = nil + p.lock.Unlock() + }) + return nil +} + //------------------------------------------------------------------------- // getWorker returns a available worker to run the tasks. @@ -173,8 +184,8 @@ func (p *Pool) getWorker() *Worker { waiting := false p.lock.Lock() - workers := p.workers - n := len(workers) - 1 + idleWorkers := p.workers + n := len(idleWorkers) - 1 if n < 0 { if p.Running() >= p.Cap() { waiting = true @@ -183,20 +194,20 @@ func (p *Pool) getWorker() *Worker { } } else { <-p.freeSignal - w = workers[n] - workers[n] = nil - p.workers = workers[:n] + w = idleWorkers[n] + idleWorkers[n] = nil + p.workers = idleWorkers[:n] } p.lock.Unlock() if waiting { <-p.freeSignal p.lock.Lock() - workers = p.workers - l := len(workers) - 1 - w = workers[l] - workers[l] = nil - p.workers = workers[:l] + idleWorkers = p.workers + l := len(idleWorkers) - 1 + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] p.lock.Unlock() } else if w == nil { w = &Worker{ diff --git a/pool_func.go b/pool_func.go index c4dea88..9ca3bfc 100644 --- a/pool_func.go +++ b/pool_func.go @@ -69,15 +69,18 @@ func (p *PoolWithFunc) monitorAndClear() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers + if len(idleWorkers) == 0 && len(p.release) > 0 { + return + } n := 0 for i, w := range idleWorkers { if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i + <-p.freeSignal w.args <- nil idleWorkers[i] = nil - atomic.AddInt32(&p.running, 1) } if n > 0 { n++ @@ -142,34 +145,42 @@ func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Release Closed this pool -func (p *PoolWithFunc) Release() error { - p.once.Do(func() { - p.release <- sig{} - running := p.Running() - for i := 0; i < running; i++ { - p.getWorker().args <- nil - } - p.lock.Lock() - p.workers = nil - p.lock.Unlock() - }) - return nil -} - // ReSize change the capacity of this pool func (p *PoolWithFunc) ReSize(size int) { if size < p.Cap() { diff := p.Cap() - size + p.lock.Lock() + idleWorkers := p.workers for i := 0; i < diff; i++ { - p.getWorker().args <- nil + <-p.freeSignal + idleWorkers[i].args <- nil + idleWorkers[i] = nil } + p.workers = idleWorkers[diff:] + p.lock.Unlock() } else if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) } +// Release Closed this pool +func (p *PoolWithFunc) Release() error { + p.once.Do(func() { + p.release <- sig{} + p.lock.Lock() + idleWorkers := p.workers + for i, w := range idleWorkers { + <-p.freeSignal + w.args <- nil + idleWorkers[i] = nil + } + p.workers = nil + p.lock.Unlock() + }) + return nil +} + //------------------------------------------------------------------------- // getWorker returns a available worker to run the tasks. @@ -178,8 +189,8 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { waiting := false p.lock.Lock() - workers := p.workers - n := len(workers) - 1 + idleWorkers := p.workers + n := len(idleWorkers) - 1 if n < 0 { if p.Running() >= p.Cap() { waiting = true @@ -188,20 +199,20 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { } } else { <-p.freeSignal - w = workers[n] - workers[n] = nil - p.workers = workers[:n] + w = idleWorkers[n] + idleWorkers[n] = nil + p.workers = idleWorkers[:n] } p.lock.Unlock() if waiting { <-p.freeSignal p.lock.Lock() - workers = p.workers - l := len(workers) - 1 - w = workers[l] - workers[l] = nil - p.workers = workers[:l] + idleWorkers = p.workers + l := len(idleWorkers) - 1 + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] p.lock.Unlock() } else if w == nil { w = &WorkerWithFunc{ diff --git a/worker.go b/worker.go index 9031f57..c0667fb 100644 --- a/worker.go +++ b/worker.go @@ -44,7 +44,6 @@ type Worker struct { // run starts a goroutine to repeat the process // that performs the function calls. func (w *Worker) run() { - //atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { if f == nil { diff --git a/worker_func.go b/worker_func.go index a3b5bb6..86c7448 100644 --- a/worker_func.go +++ b/worker_func.go @@ -44,7 +44,6 @@ type WorkerWithFunc struct { // run starts a goroutine to repeat the process // that performs the function calls. func (w *WorkerWithFunc) run() { - //atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { if args == nil {