From 92a7dec1965da64fe7f53dc6afc5e2a3ede40cae Mon Sep 17 00:00:00 2001 From: andy pan Date: Wed, 30 May 2018 12:37:13 +0800 Subject: [PATCH 1/2] optimization --- worker.go | 2 +- worker_func.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/worker.go b/worker.go index 5b8426b..c4df148 100644 --- a/worker.go +++ b/worker.go @@ -55,7 +55,7 @@ func (w *Worker) run() { // stop this worker. func (w *Worker) stop() { - w.task <- nil + w.sendTask(nil) } // sendTask sends a task to this worker. diff --git a/worker_func.go b/worker_func.go index b8d59ec..b149411 100644 --- a/worker_func.go +++ b/worker_func.go @@ -55,7 +55,7 @@ func (w *WorkerWithFunc) run() { // stop this worker. func (w *WorkerWithFunc) stop() { - w.args <- nil + w.sendTask(nil) } // sendTask sends a task to this worker. From d56ebb1a29323b26decebb3d1065b309739f1fdb Mon Sep 17 00:00:00 2001 From: andy pan Date: Wed, 30 May 2018 12:57:20 +0800 Subject: [PATCH 2/2] finish the ReSize() and Release() methods --- pool.go | 20 ++++++++++---------- pool_func.go | 21 ++++++++++----------- worker.go | 2 +- worker_func.go | 2 +- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pool.go b/pool.go index a080872..7224e3d 100644 --- a/pool.go +++ b/pool.go @@ -102,12 +102,22 @@ func (p *Pool) Cap() int { func (p *Pool) Release() error { p.once.Do(func() { p.release <- sig{} + running := p.Running() + for i := 0; i < running; i++ { + p.getWorker().stop() + } }) return nil } // ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { + if size < p.Cap() { + diff := p.Cap() - size + for i := 0; i < diff; i++ { + p.getWorker().stop() + } + } atomic.StoreInt32(&p.capacity, int32(size)) } @@ -151,21 +161,11 @@ func (p *Pool) getWorker() *Worker { break } } else if w == nil { - //wp := p.workerPool.Get() - //if wp == nil { - // w = &Worker{ - // pool: p, - // task: make(chan f, workerArgsCap), - // } - //} else { - // w = wp.(*Worker) - //} w = &Worker{ pool: p, task: make(chan f), } w.run() - //p.workerPool.Put(w) } return w } diff --git a/pool_func.go b/pool_func.go index b236cf9..f8354d0 100644 --- a/pool_func.go +++ b/pool_func.go @@ -107,12 +107,22 @@ func (p *PoolWithFunc) Cap() int { func (p *PoolWithFunc) Release() error { p.once.Do(func() { p.release <- sig{} + running := p.Running() + for i := 0; i < running; i++ { + p.getWorker().stop() + } }) return nil } // ReSize change the capacity of this pool func (p *PoolWithFunc) ReSize(size int) { + if size < p.Cap() { + diff := p.Cap() - size + for i := 0; i < diff; i++ { + p.getWorker().stop() + } + } atomic.StoreInt32(&p.capacity, int32(size)) } @@ -156,28 +166,17 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { break } } else if w == nil { - //wp := p.workerPool.Get() - //if wp == nil { - // w = &WorkerWithFunc{ - // pool: p, - // args: make(chan interface{}, workerArgsCap), - // } - //} else { - // w = wp.(*WorkerWithFunc) - //} w = &WorkerWithFunc{ pool: p, args: make(chan interface{}), } w.run() - //p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { - //p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker.go b/worker.go index c4df148..162673e 100644 --- a/worker.go +++ b/worker.go @@ -43,7 +43,7 @@ func (w *Worker) run() { //atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { - if f == nil || len(w.pool.release) > 0 { + if f == nil { atomic.AddInt32(&w.pool.running, -1) return } diff --git a/worker_func.go b/worker_func.go index b149411..d533bfd 100644 --- a/worker_func.go +++ b/worker_func.go @@ -43,7 +43,7 @@ func (w *WorkerWithFunc) run() { //atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { - if args == nil || len(w.pool.release) > 0 { + if args == nil { atomic.AddInt32(&w.pool.running, -1) return }