From 8ff88950b8ad25be7a969bd3dc25bc8c135073a0 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 19 May 2018 13:09:44 +0800 Subject: [PATCH] add codes for reusing workers --- pool.go | 20 ++++++++++++-------- worker.go | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pool.go b/pool.go index d943c84..873a8c6 100644 --- a/pool.go +++ b/pool.go @@ -13,7 +13,7 @@ type f func() type Pool struct { capacity int32 - free int32 + running int32 tasks chan f workers chan *Worker destroy chan sig @@ -23,7 +23,6 @@ type Pool struct { func NewPool(size int) *Pool { p := &Pool{ capacity: int32(size), - free: int32(size), tasks: make(chan f, math.MaxInt32), //workers: &sync.Pool{New: func() interface{} { return &Worker{} }}, workers: make(chan *Worker, size), @@ -58,11 +57,11 @@ func (p *Pool) Push(task f) error { return nil } func (p *Pool) Running() int32 { - return atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.free) + return atomic.LoadInt32(&p.running) } func (p *Pool) Free() int32 { - return atomic.LoadInt32(&p.free) + return atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running) } func (p *Pool) Cap() int32 { @@ -91,15 +90,20 @@ func (p *Pool) newWorker() *Worker { exit: make(chan sig), } worker.run() - atomic.AddInt32(&p.free, -1) + atomic.AddInt32(&p.running, 1) return worker } func (p *Pool) getWorker() *Worker { var worker *Worker - if p.reachLimit() || p.Free() > 0 { - worker = <-p.workers + if p.reachLimit() { + select { + case worker = <-p.workers: + return worker + default: + worker = p.newWorker() + } } - worker = p.newWorker() + worker = <-p.workers return worker } diff --git a/worker.go b/worker.go index 42b9df4..76e8248 100644 --- a/worker.go +++ b/worker.go @@ -15,7 +15,7 @@ func (w *Worker) run() { case f := <-w.task: f() w.pool.workers <- w - atomic.AddInt32(&w.pool.free, 1) + atomic.AddInt32(&w.pool.running, -1) case <-w.exit: return }