forked from mirror/ants
update
This commit is contained in:
parent
ff4b7d8a22
commit
6581f1821d
34
pool.go
34
pool.go
|
@ -26,7 +26,6 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type sig struct{}
|
type sig struct{}
|
||||||
|
@ -57,8 +56,7 @@ type Pool struct {
|
||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
||||||
// closed is used to confirm whether this pool has been closed.
|
once sync.Once
|
||||||
closed int32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPool generates a instance of ants pool
|
// NewPool generates a instance of ants pool
|
||||||
|
@ -69,8 +67,7 @@ func NewPool(size int) (*Pool, error) {
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
capacity: int32(size),
|
capacity: int32(size),
|
||||||
freeSignal: make(chan sig, math.MaxInt32),
|
freeSignal: make(chan sig, math.MaxInt32),
|
||||||
release: make(chan sig),
|
release: make(chan sig, 1),
|
||||||
closed: 0,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
|
@ -78,27 +75,9 @@ func NewPool(size int) (*Pool, error) {
|
||||||
|
|
||||||
//-------------------------------------------------------------------------
|
//-------------------------------------------------------------------------
|
||||||
|
|
||||||
// scanAndClean is a goroutine who will periodically clean up
|
|
||||||
// after it is noticed that this pool is closed.
|
|
||||||
func (p *Pool) scanAndClean() {
|
|
||||||
ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second)
|
|
||||||
go func() {
|
|
||||||
ticker.Stop()
|
|
||||||
for range ticker.C {
|
|
||||||
if atomic.LoadInt32(&p.closed) == 1 {
|
|
||||||
p.lock.Lock()
|
|
||||||
for _, w := range p.workers {
|
|
||||||
w.stop()
|
|
||||||
}
|
|
||||||
p.lock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push submit a task to pool
|
// Push submit a task to pool
|
||||||
func (p *Pool) Push(task f) error {
|
func (p *Pool) Push(task f) error {
|
||||||
if atomic.LoadInt32(&p.closed) == 1 {
|
if len(p.release) > 0 {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
w := p.getWorker()
|
w := p.getWorker()
|
||||||
|
@ -123,10 +102,9 @@ func (p *Pool) Cap() int {
|
||||||
|
|
||||||
// Release Closed this pool
|
// Release Closed this pool
|
||||||
func (p *Pool) Release() error {
|
func (p *Pool) Release() error {
|
||||||
p.lock.Lock()
|
p.once.Do(func() {
|
||||||
atomic.StoreInt32(&p.closed, 1)
|
p.release <- sig{}
|
||||||
close(p.release)
|
})
|
||||||
p.lock.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (w *Worker) run() {
|
||||||
atomic.AddInt32(&w.pool.running, 1)
|
atomic.AddInt32(&w.pool.running, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for f := range w.task {
|
for f := range w.task {
|
||||||
if f == nil {
|
if f == nil || len(w.pool.release) > 0 {
|
||||||
atomic.AddInt32(&w.pool.running, -1)
|
atomic.AddInt32(&w.pool.running, -1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue