diff --git a/ants_test.go b/ants_test.go index 80cf140..f61efe5 100644 --- a/ants_test.go +++ b/ants_test.go @@ -26,9 +26,9 @@ import ( "runtime" "sync" "testing" + "time" "github.com/panjf2000/ants" - "time" ) var n = 100000 diff --git a/pool.go b/pool.go index aec9573..ac3edcd 100644 --- a/pool.go +++ b/pool.go @@ -129,6 +129,16 @@ func (p *Pool) Running() int { return int(atomic.LoadInt32(&p.running)) } +// IncrRunning increases the number of the currently running goroutines +func (p *Pool) IncrRunning() { + atomic.AddInt32(&p.running, 1) +} + +// DecrRunning decreases the number of the currently running goroutines +func (p *Pool) DecrRunning() { + atomic.AddInt32(&p.running, -1) +} + // Free returns the available goroutines to work func (p *Pool) Free() int { return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) @@ -181,11 +191,7 @@ func (p *Pool) getWorker() *Worker { idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 { - if p.Running() >= p.Cap() { - waiting = true - } else { - atomic.AddInt32(&p.running, 1) - } + waiting = p.Running() >= p.Cap() } else { <-p.freeSignal w = idleWorkers[n] @@ -209,6 +215,7 @@ func (p *Pool) getWorker() *Worker { task: make(chan f, 1), } w.run() + p.IncrRunning() } return w } diff --git a/pool_func.go b/pool_func.go index c3f20a7..77221e4 100644 --- a/pool_func.go +++ b/pool_func.go @@ -134,6 +134,16 @@ func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } +// IncrRunning increases the number of the currently running goroutines +func (p *PoolWithFunc) IncrRunning() { + atomic.AddInt32(&p.running, 1) +} + +// DecrRunning decreases the number of the currently running goroutines +func (p *PoolWithFunc) DecrRunning() { + atomic.AddInt32(&p.running, -1) +} + // Free returns the available goroutines to work func (p *PoolWithFunc) Free() int { return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) @@ -186,11 +196,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 { - if p.Running() >= p.Cap() { - waiting = true - } else { - atomic.AddInt32(&p.running, 1) - } + waiting = p.Running() >= p.Cap() } else { <-p.freeSignal w = idleWorkers[n] @@ -214,6 +220,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { args: make(chan interface{}, 1), } w.run() + p.IncrRunning() } return w } diff --git a/worker.go b/worker.go index c0667fb..8d29a3a 100644 --- a/worker.go +++ b/worker.go @@ -23,7 +23,6 @@ package ants import ( - "sync/atomic" "time" ) @@ -47,7 +46,7 @@ func (w *Worker) run() { go func() { for f := range w.task { if f == nil { - atomic.AddInt32(&w.pool.running, -1) + w.pool.DecrRunning() return } f() diff --git a/worker_func.go b/worker_func.go index 86c7448..22b6069 100644 --- a/worker_func.go +++ b/worker_func.go @@ -23,7 +23,6 @@ package ants import ( - "sync/atomic" "time" ) @@ -47,7 +46,7 @@ func (w *WorkerWithFunc) run() { go func() { for args := range w.args { if args == nil { - atomic.AddInt32(&w.pool.running, -1) + w.pool.DecrRunning() return } w.pool.poolFunc(args)