diff --git a/pool.go b/pool.go index 751ea65..460d0fd 100644 --- a/pool.go +++ b/pool.go @@ -40,15 +40,15 @@ type Pool struct { // running is the number of the currently running goroutines. running int32 + // lock for protecting the worker queue. + lock sync.Locker + // workers is a slice that store the available workers. workers workerArray // state is used to notice the pool to closed itself. state int32 - // lock for synchronous operation. - lock sync.Locker - // cond for waiting to get a idle worker. cond *sync.Cond @@ -86,7 +86,7 @@ func (p *Pool) purgePeriodically() { // There might be a situation that all workers have been cleaned up(no any worker is running) // while some invokers still get stuck in "p.cond.Wait()", - // then it ought to wakes all those invokers. + // then it ought to wake all those invokers. if p.Running() == 0 { p.cond.Broadcast() } @@ -173,7 +173,7 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Tune changes the capacity of this pool, this method is noneffective to the infinite pool. +// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. func (p *Pool) Tune(size int) { if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return @@ -186,7 +186,7 @@ func (p *Pool) IsClosed() bool { return atomic.LoadInt32(&p.state) == CLOSED } -// Release Closes this pool. +// Release closes this pool and releases the worker queue. func (p *Pool) Release() { atomic.StoreInt32(&p.state, CLOSED) p.lock.Lock() @@ -197,7 +197,7 @@ func (p *Pool) Release() { p.cond.Broadcast() } -// Reboot reboots a released pool. +// Reboot reboots a closed pool. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { go p.purgePeriodically() @@ -226,26 +226,28 @@ func (p *Pool) retrieveWorker() (w *goWorker) { p.lock.Lock() w = p.workers.detach() - if w != nil { + if w != nil { // first try to fetch the worker from the queue p.lock.Unlock() } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + // if the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. p.lock.Unlock() spawnWorker() - } else { + } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. if p.options.Nonblocking { p.lock.Unlock() return } - Reentry: + retry: if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() return } p.blockingNum++ - p.cond.Wait() + p.cond.Wait() // block and wait for an available worker p.blockingNum-- var nw int - if nw = p.Running(); nw == 0 { + if nw = p.Running(); nw == 0 { // awakened by the scavenger p.lock.Unlock() if !p.IsClosed() { spawnWorker() @@ -258,7 +260,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) { spawnWorker() return } - goto Reentry + goto retry } p.lock.Unlock() diff --git a/pool_func.go b/pool_func.go index 6add14e..83d8771 100644 --- a/pool_func.go +++ b/pool_func.go @@ -39,15 +39,15 @@ type PoolWithFunc struct { // running is the number of the currently running goroutines. running int32 + // lock for protecting the worker queue. + lock sync.Locker + // workers is a slice that store the available workers. workers []*goWorkerWithFunc // state is used to notice the pool to closed itself. state int32 - // lock for synchronous operation. - lock sync.Locker - // cond for waiting to get a idle worker. cond *sync.Cond @@ -101,7 +101,7 @@ func (p *PoolWithFunc) purgePeriodically() { // There might be a situation that all workers have been cleaned up(no any worker is running) // while some invokers still get stuck in "p.cond.Wait()", - // then it ought to wakes all those invokers. + // then it ought to wake all those invokers. if p.Running() == 0 { p.cond.Broadcast() } @@ -190,7 +190,7 @@ func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -// Tune changes the capacity of this pool. +// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. func (p *PoolWithFunc) Tune(size int) { if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return @@ -203,7 +203,7 @@ func (p *PoolWithFunc) IsClosed() bool { return atomic.LoadInt32(&p.state) == CLOSED } -// Release Closes this pool. +// Release closes this pool and releases the worker queue. func (p *PoolWithFunc) Release() { atomic.StoreInt32(&p.state, CLOSED) p.lock.Lock() @@ -218,7 +218,7 @@ func (p *PoolWithFunc) Release() { p.cond.Broadcast() } -// Reboot reboots a released pool. +// Reboot reboots a closed pool. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { go p.purgePeriodically() @@ -247,29 +247,31 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 - if n >= 0 { + if n >= 0 { // first try to fetch the worker from the queue w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] p.lock.Unlock() } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + // if the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. p.lock.Unlock() spawnWorker() - } else { + } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. if p.options.Nonblocking { p.lock.Unlock() return } - Reentry: + retry: if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() return } p.blockingNum++ - p.cond.Wait() + p.cond.Wait() // block and wait for an available worker p.blockingNum-- var nw int - if nw = p.Running(); nw == 0 { + if nw = p.Running(); nw == 0 { // awakened by the scavenger p.lock.Unlock() if !p.IsClosed() { spawnWorker() @@ -283,7 +285,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { spawnWorker() return } - goto Reentry + goto retry } w = p.workers[l] p.workers[l] = nil