Improve some comments

This commit is contained in:
Andy Pan 2021-05-23 20:23:43 +08:00
parent a5ccf7622a
commit 8ab8c9f899
2 changed files with 30 additions and 26 deletions

28
pool.go
View File

@ -40,15 +40,15 @@ type Pool struct {
// running is the number of the currently running goroutines. // running is the number of the currently running goroutines.
running int32 running int32
// lock for protecting the worker queue.
lock sync.Locker
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers workerArray workers workerArray
// state is used to notice the pool to closed itself. // state is used to notice the pool to closed itself.
state int32 state int32
// lock for synchronous operation.
lock sync.Locker
// cond for waiting to get a idle worker. // cond for waiting to get a idle worker.
cond *sync.Cond cond *sync.Cond
@ -86,7 +86,7 @@ func (p *Pool) purgePeriodically() {
// There might be a situation that all workers have been cleaned up(no any worker is running) // There might be a situation that all workers have been cleaned up(no any worker is running)
// while some invokers still get stuck in "p.cond.Wait()", // while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wakes all those invokers. // then it ought to wake all those invokers.
if p.Running() == 0 { if p.Running() == 0 {
p.cond.Broadcast() p.cond.Broadcast()
} }
@ -173,7 +173,7 @@ func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
// Tune changes the capacity of this pool, this method is noneffective to the infinite pool. // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *Pool) Tune(size int) { func (p *Pool) Tune(size int) {
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return return
@ -186,7 +186,7 @@ func (p *Pool) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED return atomic.LoadInt32(&p.state) == CLOSED
} }
// Release Closes this pool. // Release closes this pool and releases the worker queue.
func (p *Pool) Release() { func (p *Pool) Release() {
atomic.StoreInt32(&p.state, CLOSED) atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock() p.lock.Lock()
@ -197,7 +197,7 @@ func (p *Pool) Release() {
p.cond.Broadcast() p.cond.Broadcast()
} }
// Reboot reboots a released pool. // Reboot reboots a closed pool.
func (p *Pool) Reboot() { func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.purgePeriodically() go p.purgePeriodically()
@ -226,26 +226,28 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
p.lock.Lock() p.lock.Lock()
w = p.workers.detach() w = p.workers.detach()
if w != nil { if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock() p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
} else { } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking { if p.options.Nonblocking {
p.lock.Unlock() p.lock.Unlock()
return return
} }
Reentry: retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock() p.lock.Unlock()
return return
} }
p.blockingNum++ p.blockingNum++
p.cond.Wait() p.cond.Wait() // block and wait for an available worker
p.blockingNum-- p.blockingNum--
var nw int var nw int
if nw = p.Running(); nw == 0 { if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock() p.lock.Unlock()
if !p.IsClosed() { if !p.IsClosed() {
spawnWorker() spawnWorker()
@ -258,7 +260,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker() spawnWorker()
return return
} }
goto Reentry goto retry
} }
p.lock.Unlock() p.lock.Unlock()

View File

@ -39,15 +39,15 @@ type PoolWithFunc struct {
// running is the number of the currently running goroutines. // running is the number of the currently running goroutines.
running int32 running int32
// lock for protecting the worker queue.
lock sync.Locker
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers []*goWorkerWithFunc workers []*goWorkerWithFunc
// state is used to notice the pool to closed itself. // state is used to notice the pool to closed itself.
state int32 state int32
// lock for synchronous operation.
lock sync.Locker
// cond for waiting to get a idle worker. // cond for waiting to get a idle worker.
cond *sync.Cond cond *sync.Cond
@ -101,7 +101,7 @@ func (p *PoolWithFunc) purgePeriodically() {
// There might be a situation that all workers have been cleaned up(no any worker is running) // There might be a situation that all workers have been cleaned up(no any worker is running)
// while some invokers still get stuck in "p.cond.Wait()", // while some invokers still get stuck in "p.cond.Wait()",
// then it ought to wakes all those invokers. // then it ought to wake all those invokers.
if p.Running() == 0 { if p.Running() == 0 {
p.cond.Broadcast() p.cond.Broadcast()
} }
@ -190,7 +190,7 @@ func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
// Tune changes the capacity of this pool. // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *PoolWithFunc) Tune(size int) { func (p *PoolWithFunc) Tune(size int) {
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return return
@ -203,7 +203,7 @@ func (p *PoolWithFunc) IsClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSED return atomic.LoadInt32(&p.state) == CLOSED
} }
// Release Closes this pool. // Release closes this pool and releases the worker queue.
func (p *PoolWithFunc) Release() { func (p *PoolWithFunc) Release() {
atomic.StoreInt32(&p.state, CLOSED) atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock() p.lock.Lock()
@ -218,7 +218,7 @@ func (p *PoolWithFunc) Release() {
p.cond.Broadcast() p.cond.Broadcast()
} }
// Reboot reboots a released pool. // Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() { func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.purgePeriodically() go p.purgePeriodically()
@ -247,29 +247,31 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
n := len(idleWorkers) - 1 n := len(idleWorkers) - 1
if n >= 0 { if n >= 0 { // first try to fetch the worker from the queue
w = idleWorkers[n] w = idleWorkers[n]
idleWorkers[n] = nil idleWorkers[n] = nil
p.workers = idleWorkers[:n] p.workers = idleWorkers[:n]
p.lock.Unlock() p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
} else { } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking { if p.options.Nonblocking {
p.lock.Unlock() p.lock.Unlock()
return return
} }
Reentry: retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock() p.lock.Unlock()
return return
} }
p.blockingNum++ p.blockingNum++
p.cond.Wait() p.cond.Wait() // block and wait for an available worker
p.blockingNum-- p.blockingNum--
var nw int var nw int
if nw = p.Running(); nw == 0 { if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock() p.lock.Unlock()
if !p.IsClosed() { if !p.IsClosed() {
spawnWorker() spawnWorker()
@ -283,7 +285,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
spawnWorker() spawnWorker()
return return
} }
goto Reentry goto retry
} }
w = p.workers[l] w = p.workers[l]
p.workers[l] = nil p.workers[l] = nil