diff --git a/pool.go b/pool.go index 113027f..25c347e 100644 --- a/pool.go +++ b/pool.go @@ -26,7 +26,6 @@ import ( "math" "sync" "sync/atomic" - "time" ) type sig struct{} @@ -57,8 +56,7 @@ type Pool struct { lock sync.Mutex - // closed is used to confirm whether this pool has been closed. - closed int32 + once sync.Once } // NewPool generates a instance of ants pool @@ -69,8 +67,7 @@ func NewPool(size int) (*Pool, error) { p := &Pool{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - release: make(chan sig), - closed: 0, + release: make(chan sig, 1), } return p, nil @@ -78,27 +75,9 @@ func NewPool(size int) (*Pool, error) { //------------------------------------------------------------------------- -// scanAndClean is a goroutine who will periodically clean up -// after it is noticed that this pool is closed. -func (p *Pool) scanAndClean() { - ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) - go func() { - ticker.Stop() - for range ticker.C { - if atomic.LoadInt32(&p.closed) == 1 { - p.lock.Lock() - for _, w := range p.workers { - w.stop() - } - p.lock.Unlock() - } - } - }() -} - // Push submit a task to pool func (p *Pool) Push(task f) error { - if atomic.LoadInt32(&p.closed) == 1 { + if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() @@ -123,10 +102,9 @@ func (p *Pool) Cap() int { // Release Closed this pool func (p *Pool) Release() error { - p.lock.Lock() - atomic.StoreInt32(&p.closed, 1) - close(p.release) - p.lock.Unlock() + p.once.Do(func() { + p.release <- sig{} + }) return nil } diff --git a/worker.go b/worker.go index 601836c..34b3004 100644 --- a/worker.go +++ b/worker.go @@ -43,7 +43,7 @@ func (w *Worker) run() { atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { - if f == nil { + if f == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return }