From 908553139cbf3585bba56571a0fc28d1d61f131c Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 27 Jan 2019 00:26:24 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=AC=20Performance=20improvement:=20use?= =?UTF-8?q?=20sync.Pool=20to=20cache=20active=20workers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool.go | 27 ++++++++++++++------------- pool_func.go | 27 ++++++++++++++------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/pool.go b/pool.go index 2def8d2..9e73064 100644 --- a/pool.go +++ b/pool.go @@ -51,12 +51,14 @@ type Pool struct { // lock for synchronous operation. lock sync.Mutex - // cond for waiting to get a idle worker + // cond for waiting to get a idle worker. cond *sync.Cond + // once makes sure releasing this pool will just be done for one time. once sync.Once - cachePool sync.Pool + // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. + workerCache sync.Pool // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. @@ -125,7 +127,7 @@ func (p *Pool) Submit(task f) error { if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } - p.getWorker().task <- task + p.retrieveWorker().task <- task return nil } @@ -144,15 +146,15 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// ReSize changes the capacity of this pool. -func (p *Pool) ReSize(size int) { +// Tune changes the capacity of this pool. +func (p *Pool) Tune(size int) { if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) diff := p.Running() - size for i := 0; i < diff; i++ { - p.getWorker().task <- nil + p.retrieveWorker().task <- nil } } @@ -184,8 +186,8 @@ func (p *Pool) decRunning() { atomic.AddInt32(&p.running, -1) } -// getWorker returns a available worker to run the tasks. -func (p *Pool) getWorker() *Worker { +// retrieveWorker returns a available worker to run the tasks. +func (p *Pool) retrieveWorker() *Worker { var w *Worker var waiting bool @@ -198,7 +200,7 @@ func (p *Pool) getWorker() *Worker { if p.Running() >= p.Cap() { waiting = true } else { - if cacheWorker := p.cachePool.Get(); cacheWorker != nil { + if cacheWorker := p.workerCache.Get(); cacheWorker != nil { return cacheWorker.(*Worker) } } @@ -226,17 +228,16 @@ func (p *Pool) getWorker() *Worker { task: make(chan f, workerChanCap), } w.run() - p.incRunning() } return w } -// putWorker puts a worker back into free pool, recycling the goroutines. -func (p *Pool) putWorker(worker *Worker) { +// revertWorker puts a worker back into free pool, recycling the goroutines. +func (p *Pool) revertWorker(worker *Worker) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) - // Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue. + // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() p.lock.Unlock() } diff --git a/pool_func.go b/pool_func.go index 0b7bb4b..91c4bf1 100644 --- a/pool_func.go +++ b/pool_func.go @@ -51,15 +51,17 @@ type PoolWithFunc struct { // lock for synchronous operation. lock sync.Mutex - // cond for waiting to get a idle worker + // cond for waiting to get a idle worker. cond *sync.Cond // pf is the function for processing tasks. poolFunc pf + // once makes sure releasing this pool will just be done for one time. once sync.Once - cachePool sync.Pool + // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. + workerCache sync.Pool // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. @@ -129,7 +131,7 @@ func (p *PoolWithFunc) Serve(args interface{}) error { if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } - p.getWorker().args <- args + p.retrieveWorker().args <- args return nil } @@ -148,15 +150,15 @@ func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// ReSize change the capacity of this pool. -func (p *PoolWithFunc) ReSize(size int) { +// Tune change the capacity of this pool. +func (p *PoolWithFunc) Tune(size int) { if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) diff := p.Running() - size for i := 0; i < diff; i++ { - p.getWorker().args <- nil + p.retrieveWorker().args <- nil } } @@ -188,8 +190,8 @@ func (p *PoolWithFunc) decRunning() { atomic.AddInt32(&p.running, -1) } -// getWorker returns a available worker to run the tasks. -func (p *PoolWithFunc) getWorker() *WorkerWithFunc { +// retrieveWorker returns a available worker to run the tasks. +func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { var w *WorkerWithFunc waiting := false @@ -203,7 +205,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if p.Running() >= p.Cap() { waiting = true } else { - if cacheWorker := p.cachePool.Get(); cacheWorker != nil { + if cacheWorker := p.workerCache.Get(); cacheWorker != nil { return cacheWorker.(*WorkerWithFunc) } } @@ -231,17 +233,16 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { args: make(chan interface{}, workerChanCap), } w.run() - p.incRunning() } return w } -// putWorker puts a worker back into free pool, recycling the goroutines. -func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { +// revertWorker puts a worker back into free pool, recycling the goroutines. +func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) - // Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue. + // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() p.lock.Unlock() }