🐬 Performance improvement: use sync.Pool to cache active workers

This commit is contained in:
Andy Pan 2019-01-27 00:26:24 +08:00
parent 30bf38e6a8
commit 908553139c
2 changed files with 28 additions and 26 deletions

27
pool.go
View File

@ -51,12 +51,14 @@ type Pool struct {
// lock for synchronous operation.
lock sync.Mutex
// cond for waiting to get a idle worker
// cond for waiting to get a idle worker.
cond *sync.Cond
// once makes sure releasing this pool will just be done for one time.
once sync.Once
cachePool sync.Pool
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
@ -125,7 +127,7 @@ func (p *Pool) Submit(task f) error {
if 1 == atomic.LoadInt32(&p.release) {
return ErrPoolClosed
}
p.getWorker().task <- task
p.retrieveWorker().task <- task
return nil
}
@ -144,15 +146,15 @@ func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}
// ReSize changes the capacity of this pool.
func (p *Pool) ReSize(size int) {
// Tune changes the capacity of this pool.
func (p *Pool) Tune(size int) {
if size == p.Cap() {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
diff := p.Running() - size
for i := 0; i < diff; i++ {
p.getWorker().task <- nil
p.retrieveWorker().task <- nil
}
}
@ -184,8 +186,8 @@ func (p *Pool) decRunning() {
atomic.AddInt32(&p.running, -1)
}
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
// retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker() *Worker {
var w *Worker
var waiting bool
@ -198,7 +200,7 @@ func (p *Pool) getWorker() *Worker {
if p.Running() >= p.Cap() {
waiting = true
} else {
if cacheWorker := p.cachePool.Get(); cacheWorker != nil {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
return cacheWorker.(*Worker)
}
}
@ -226,17 +228,16 @@ func (p *Pool) getWorker() *Worker {
task: make(chan f, workerChanCap),
}
w.run()
p.incRunning()
}
return w
}
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *Worker) {
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
// Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue.
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
}

View File

@ -51,15 +51,17 @@ type PoolWithFunc struct {
// lock for synchronous operation.
lock sync.Mutex
// cond for waiting to get a idle worker
// cond for waiting to get a idle worker.
cond *sync.Cond
// pf is the function for processing tasks.
poolFunc pf
// once makes sure releasing this pool will just be done for one time.
once sync.Once
cachePool sync.Pool
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
@ -129,7 +131,7 @@ func (p *PoolWithFunc) Serve(args interface{}) error {
if 1 == atomic.LoadInt32(&p.release) {
return ErrPoolClosed
}
p.getWorker().args <- args
p.retrieveWorker().args <- args
return nil
}
@ -148,15 +150,15 @@ func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}
// ReSize change the capacity of this pool.
func (p *PoolWithFunc) ReSize(size int) {
// Tune change the capacity of this pool.
func (p *PoolWithFunc) Tune(size int) {
if size == p.Cap() {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
diff := p.Running() - size
for i := 0; i < diff; i++ {
p.getWorker().args <- nil
p.retrieveWorker().args <- nil
}
}
@ -188,8 +190,8 @@ func (p *PoolWithFunc) decRunning() {
atomic.AddInt32(&p.running, -1)
}
// getWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
// retrieveWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
var w *WorkerWithFunc
waiting := false
@ -203,7 +205,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
if p.Running() >= p.Cap() {
waiting = true
} else {
if cacheWorker := p.cachePool.Get(); cacheWorker != nil {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
return cacheWorker.(*WorkerWithFunc)
}
}
@ -231,17 +233,16 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
args: make(chan interface{}, workerChanCap),
}
w.run()
p.incRunning()
}
return w
}
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) {
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
// Notify the invoker stuck in 'getWorker()' of there is an available worker in the worker queue.
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
}