diff --git a/pool.go b/pool.go index fbf3b80..b4e8f34 100644 --- a/pool.go +++ b/pool.go @@ -48,7 +48,7 @@ type Pool struct { workers []*Worker // release is used to notice the pool to closed itself. - release chan sig + release bool // lock for synchronous operation. lock sync.Mutex @@ -72,7 +72,7 @@ func (p *Pool) periodicallyPurge() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - if len(idleWorkers) == 0 && p.Running() == 0 && p.release != nil { + if len(idleWorkers) == 0 && p.Running() == 0 && p.release { p.lock.Unlock() return } @@ -122,11 +122,12 @@ func NewTimingPool(size, expiry int) (*Pool, error) { // Submit submits a task to this pool. func (p *Pool) Submit(task f) error { - if p.release != nil { + if worker := p.getWorker(); worker != nil { + worker.task <- task + return nil + } else { return ErrPoolClosed } - p.getWorker().task <- task - return nil } // Running returns the number of the currently running goroutines. @@ -159,8 +160,8 @@ func (p *Pool) ReSize(size int) { // Release Closes this pool. func (p *Pool) Release() error { p.once.Do(func() { - p.release = make(chan sig) p.lock.Lock() + p.release = true idleWorkers := p.workers for i, w := range idleWorkers { w.task <- nil @@ -191,6 +192,11 @@ func (p *Pool) getWorker() *Worker { p.lock.Lock() defer p.lock.Unlock() + + if p.release { + return nil + } + idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 { diff --git a/pool_func.go b/pool_func.go index 0205ab1..ddf3034 100644 --- a/pool_func.go +++ b/pool_func.go @@ -46,7 +46,7 @@ type PoolWithFunc struct { workers []*WorkerWithFunc // release is used to notice the pool to closed itself. - release chan sig + release bool // lock for synchronous operation. lock sync.Mutex @@ -73,7 +73,7 @@ func (p *PoolWithFunc) periodicallyPurge() { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers - if len(idleWorkers) == 0 && p.Running() == 0 && p.release != nil { + if len(idleWorkers) == 0 && p.Running() == 0 && p.release { p.lock.Unlock() return } @@ -124,11 +124,12 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { // Serve submits a task to pool. func (p *PoolWithFunc) Serve(args interface{}) error { - if p.release != nil { + if worker := p.getWorker(); worker != nil { + worker.args <- args + return nil + } else { return ErrPoolClosed } - p.getWorker().args <- args - return nil } // Running returns the number of the currently running goroutines. @@ -161,8 +162,8 @@ func (p *PoolWithFunc) ReSize(size int) { // Release Closed this pool. func (p *PoolWithFunc) Release() error { p.once.Do(func() { - p.release = make(chan sig) p.lock.Lock() + p.release = true idleWorkers := p.workers for i, w := range idleWorkers { w.args <- nil @@ -193,6 +194,11 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { p.lock.Lock() defer p.lock.Unlock() + + if p.release { + return nil + } + idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 {