From 4927155de340f4b2e70df03cf8dcda9b70d41caa Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Fri, 3 Aug 2018 21:15:11 +0800 Subject: [PATCH] fixed issue #6 --- ants_benchmark_test.go | 1 + pool.go | 35 +++++++++++++++-------------------- pool_func.go | 35 +++++++++++++++-------------------- 3 files changed, 31 insertions(+), 40 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 56d94f7..de1f320 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -89,6 +89,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { }) defer p.Release() + b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { wg.Add(1) diff --git a/pool.go b/pool.go index f430e04..8f0abba 100644 --- a/pool.go +++ b/pool.go @@ -23,7 +23,6 @@ package ants import ( - "math" "sync" "sync/atomic" "time" @@ -45,10 +44,6 @@ type Pool struct { // expiryDuration set the expired time (second) of every worker. expiryDuration time.Duration - // freeSignal is used to notice pool there are available - // workers which can be sent to work. - freeSignal chan sig - // workers is a slice that store the available workers. workers []*Worker @@ -77,7 +72,6 @@ func (p *Pool) periodicallyPurge() { break } n = i - <-p.freeSignal w.task <- nil idleWorkers[i] = nil } @@ -104,7 +98,6 @@ func NewTimingPool(size, expiry int) (*Pool, error) { } p := &Pool{ capacity: int32(size), - freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, } @@ -119,8 +112,7 @@ func (p *Pool) Submit(task f) error { if len(p.release) > 0 { return ErrPoolClosed } - w := p.getWorker() - w.task <- task + p.getWorker().task <- task return nil } @@ -160,7 +152,6 @@ func (p *Pool) Release() error { p.lock.Lock() idleWorkers := p.workers for i, w := range idleWorkers { - <-p.freeSignal w.task <- nil idleWorkers[i] = nil } @@ -193,7 +184,6 @@ func (p *Pool) getWorker() *Worker { if n < 0 { waiting = p.Running() >= p.Cap() } else { - <-p.freeSignal w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] @@ -201,14 +191,20 @@ func (p *Pool) getWorker() *Worker { p.lock.Unlock() if waiting { - <-p.freeSignal - p.lock.Lock() - idleWorkers = p.workers - l := len(idleWorkers) - 1 - w = idleWorkers[l] - idleWorkers[l] = nil - p.workers = idleWorkers[:l] - p.lock.Unlock() + for { + p.lock.Lock() + idleWorkers = p.workers + l := len(idleWorkers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] + p.lock.Unlock() + break + } } else if w == nil { w = &Worker{ pool: p, @@ -226,5 +222,4 @@ func (p *Pool) putWorker(worker *Worker) { p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() - p.freeSignal <- sig{} } diff --git a/pool_func.go b/pool_func.go index dcb9f1d..890e7be 100644 --- a/pool_func.go +++ b/pool_func.go @@ -23,7 +23,6 @@ package ants import ( - "math" "sync" "sync/atomic" "time" @@ -43,10 +42,6 @@ type PoolWithFunc struct { // expiryDuration set the expired time (second) of every worker. expiryDuration time.Duration - // freeSignal is used to notice pool there are available - // workers which can be sent to work. - freeSignal chan sig - // workers is a slice that store the available workers. workers []*WorkerWithFunc @@ -78,7 +73,6 @@ func (p *PoolWithFunc) periodicallyPurge() { break } n = i - <-p.freeSignal w.args <- nil idleWorkers[i] = nil } @@ -105,7 +99,6 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { } p := &PoolWithFunc{ capacity: int32(size), - freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, poolFunc: f, @@ -124,8 +117,7 @@ func (p *PoolWithFunc) Serve(args interface{}) error { if len(p.release) > 0 { return ErrPoolClosed } - w := p.getWorker() - w.args <- args + p.getWorker().args <- args return nil } @@ -165,7 +157,6 @@ func (p *PoolWithFunc) Release() error { p.lock.Lock() idleWorkers := p.workers for i, w := range idleWorkers { - <-p.freeSignal w.args <- nil idleWorkers[i] = nil } @@ -198,7 +189,6 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if n < 0 { waiting = p.Running() >= p.Cap() } else { - <-p.freeSignal w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] @@ -206,14 +196,20 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { p.lock.Unlock() if waiting { - <-p.freeSignal - p.lock.Lock() - idleWorkers = p.workers - l := len(idleWorkers) - 1 - w = idleWorkers[l] - idleWorkers[l] = nil - p.workers = idleWorkers[:l] - p.lock.Unlock() + for { + p.lock.Lock() + idleWorkers = p.workers + l := len(idleWorkers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] + p.lock.Unlock() + break + } } else if w == nil { w = &WorkerWithFunc{ pool: p, @@ -231,5 +227,4 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() - p.freeSignal <- sig{} }