diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 56d94f7..de1f320 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -89,6 +89,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { }) defer p.Release() + b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { wg.Add(1) diff --git a/pool.go b/pool.go index 96873cd..8f0abba 100644 --- a/pool.go +++ b/pool.go @@ -23,7 +23,6 @@ package ants import ( - "math" "sync" "sync/atomic" "time" @@ -45,10 +44,6 @@ type Pool struct { // expiryDuration set the expired time (second) of every worker. expiryDuration time.Duration - // freeSignal is used to notice pool there are available - // workers which can be sent to work. - freeSignal chan sig - // workers is a slice that store the available workers. workers []*Worker @@ -77,7 +72,6 @@ func (p *Pool) periodicallyPurge() { break } n = i - <-p.freeSignal w.task <- nil idleWorkers[i] = nil } @@ -104,7 +98,6 @@ func NewTimingPool(size, expiry int) (*Pool, error) { } p := &Pool{ capacity: int32(size), - freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, } @@ -119,8 +112,7 @@ func (p *Pool) Submit(task f) error { if len(p.release) > 0 { return ErrPoolClosed } - w := p.getWorker() - w.task <- task + p.getWorker().task <- task return nil } @@ -160,7 +152,6 @@ func (p *Pool) Release() error { p.lock.Lock() idleWorkers := p.workers for i, w := range idleWorkers { - <-p.freeSignal w.task <- nil idleWorkers[i] = nil } @@ -172,13 +163,13 @@ func (p *Pool) Release() error { //------------------------------------------------------------------------- -// incrRunning increases the number of the currently running goroutines -func (p *Pool) incrRunning() { +// incRunning increases the number of the currently running goroutines +func (p *Pool) incRunning() { atomic.AddInt32(&p.running, 1) } -// decrRunning decreases the number of the currently running goroutines -func (p *Pool) decrRunning() { +// decRunning decreases the number of the currently running goroutines +func (p *Pool) decRunning() { atomic.AddInt32(&p.running, -1) } @@ -193,7 +184,6 @@ func (p *Pool) getWorker() *Worker { if n < 0 { waiting = p.Running() >= p.Cap() } else { - <-p.freeSignal w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] @@ -201,21 +191,27 @@ func (p *Pool) getWorker() *Worker { p.lock.Unlock() if waiting { - <-p.freeSignal - p.lock.Lock() - idleWorkers = p.workers - l := len(idleWorkers) - 1 - w = idleWorkers[l] - idleWorkers[l] = nil - p.workers = idleWorkers[:l] - p.lock.Unlock() + for { + p.lock.Lock() + idleWorkers = p.workers + l := len(idleWorkers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] + p.lock.Unlock() + break + } } else if w == nil { w = &Worker{ pool: p, task: make(chan f, 1), } w.run() - p.incrRunning() + p.incRunning() } return w } @@ -226,5 +222,4 @@ func (p *Pool) putWorker(worker *Worker) { p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() - p.freeSignal <- sig{} } diff --git a/pool_func.go b/pool_func.go index ee8c9c5..890e7be 100644 --- a/pool_func.go +++ b/pool_func.go @@ -23,7 +23,6 @@ package ants import ( - "math" "sync" "sync/atomic" "time" @@ -43,10 +42,6 @@ type PoolWithFunc struct { // expiryDuration set the expired time (second) of every worker. expiryDuration time.Duration - // freeSignal is used to notice pool there are available - // workers which can be sent to work. - freeSignal chan sig - // workers is a slice that store the available workers. workers []*WorkerWithFunc @@ -78,7 +73,6 @@ func (p *PoolWithFunc) periodicallyPurge() { break } n = i - <-p.freeSignal w.args <- nil idleWorkers[i] = nil } @@ -105,7 +99,6 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { } p := &PoolWithFunc{ capacity: int32(size), - freeSignal: make(chan sig, math.MaxInt32), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, poolFunc: f, @@ -124,8 +117,7 @@ func (p *PoolWithFunc) Serve(args interface{}) error { if len(p.release) > 0 { return ErrPoolClosed } - w := p.getWorker() - w.args <- args + p.getWorker().args <- args return nil } @@ -165,7 +157,6 @@ func (p *PoolWithFunc) Release() error { p.lock.Lock() idleWorkers := p.workers for i, w := range idleWorkers { - <-p.freeSignal w.args <- nil idleWorkers[i] = nil } @@ -177,13 +168,13 @@ func (p *PoolWithFunc) Release() error { //------------------------------------------------------------------------- -// incrRunning increases the number of the currently running goroutines -func (p *PoolWithFunc) incrRunning() { +// incRunning increases the number of the currently running goroutines +func (p *PoolWithFunc) incRunning() { atomic.AddInt32(&p.running, 1) } -// decrRunning decreases the number of the currently running goroutines -func (p *PoolWithFunc) decrRunning() { +// decRunning decreases the number of the currently running goroutines +func (p *PoolWithFunc) decRunning() { atomic.AddInt32(&p.running, -1) } @@ -198,7 +189,6 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if n < 0 { waiting = p.Running() >= p.Cap() } else { - <-p.freeSignal w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] @@ -206,21 +196,27 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { p.lock.Unlock() if waiting { - <-p.freeSignal - p.lock.Lock() - idleWorkers = p.workers - l := len(idleWorkers) - 1 - w = idleWorkers[l] - idleWorkers[l] = nil - p.workers = idleWorkers[:l] - p.lock.Unlock() + for { + p.lock.Lock() + idleWorkers = p.workers + l := len(idleWorkers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = idleWorkers[l] + idleWorkers[l] = nil + p.workers = idleWorkers[:l] + p.lock.Unlock() + break + } } else if w == nil { w = &WorkerWithFunc{ pool: p, args: make(chan interface{}, 1), } w.run() - p.incrRunning() + p.incRunning() } return w } @@ -231,5 +227,4 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() - p.freeSignal <- sig{} } diff --git a/worker.go b/worker.go index aa4e761..a0c2faf 100644 --- a/worker.go +++ b/worker.go @@ -46,7 +46,7 @@ func (w *Worker) run() { go func() { for f := range w.task { if f == nil { - w.pool.decrRunning() + w.pool.decRunning() return } f() diff --git a/worker_func.go b/worker_func.go index 277ee31..a02264b 100644 --- a/worker_func.go +++ b/worker_func.go @@ -46,7 +46,7 @@ func (w *WorkerWithFunc) run() { go func() { for args := range w.args { if args == nil { - w.pool.decrRunning() + w.pool.decRunning() return } w.pool.poolFunc(args)