forked from mirror/ants
🐯 Refactor variable "release" with atomic operation
This commit is contained in:
parent
b091435432
commit
f8d71bb276
17
pool.go
17
pool.go
|
@ -46,7 +46,7 @@ type Pool struct {
|
||||||
workers []*Worker
|
workers []*Worker
|
||||||
|
|
||||||
// release is used to notice the pool to closed itself.
|
// release is used to notice the pool to closed itself.
|
||||||
release bool
|
release int32
|
||||||
|
|
||||||
// lock for synchronous operation.
|
// lock for synchronous operation.
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
@ -70,7 +70,7 @@ func (p *Pool) periodicallyPurge() {
|
||||||
currentTime := time.Now()
|
currentTime := time.Now()
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
if len(idleWorkers) == 0 && p.Running() == 0 && p.release {
|
if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -120,12 +120,11 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
|
||||||
|
|
||||||
// Submit submits a task to this pool.
|
// Submit submits a task to this pool.
|
||||||
func (p *Pool) Submit(task f) error {
|
func (p *Pool) Submit(task f) error {
|
||||||
if worker := p.getWorker(); worker != nil {
|
if 1 == atomic.LoadInt32(&p.release) {
|
||||||
worker.task <- task
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
p.getWorker().task <- task
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Running returns the number of the currently running goroutines.
|
// Running returns the number of the currently running goroutines.
|
||||||
|
@ -158,8 +157,8 @@ func (p *Pool) ReSize(size int) {
|
||||||
// Release Closes this pool.
|
// Release Closes this pool.
|
||||||
func (p *Pool) Release() error {
|
func (p *Pool) Release() error {
|
||||||
p.once.Do(func() {
|
p.once.Do(func() {
|
||||||
|
atomic.StoreInt32(&p.release, 1)
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
p.release = true
|
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
for i, w := range idleWorkers {
|
for i, w := range idleWorkers {
|
||||||
w.task <- nil
|
w.task <- nil
|
||||||
|
@ -191,10 +190,6 @@ func (p *Pool) getWorker() *Worker {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
if p.release {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
n := len(idleWorkers) - 1
|
n := len(idleWorkers) - 1
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
|
|
17
pool_func.go
17
pool_func.go
|
@ -46,7 +46,7 @@ type PoolWithFunc struct {
|
||||||
workers []*WorkerWithFunc
|
workers []*WorkerWithFunc
|
||||||
|
|
||||||
// release is used to notice the pool to closed itself.
|
// release is used to notice the pool to closed itself.
|
||||||
release bool
|
release int32
|
||||||
|
|
||||||
// lock for synchronous operation.
|
// lock for synchronous operation.
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
@ -73,7 +73,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
currentTime := time.Now()
|
currentTime := time.Now()
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
if len(idleWorkers) == 0 && p.Running() == 0 && p.release {
|
if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -124,12 +124,11 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
|
||||||
|
|
||||||
// Serve submits a task to pool.
|
// Serve submits a task to pool.
|
||||||
func (p *PoolWithFunc) Serve(args interface{}) error {
|
func (p *PoolWithFunc) Serve(args interface{}) error {
|
||||||
if worker := p.getWorker(); worker != nil {
|
if 1 == atomic.LoadInt32(&p.release) {
|
||||||
worker.args <- args
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
p.getWorker().args <- args
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Running returns the number of the currently running goroutines.
|
// Running returns the number of the currently running goroutines.
|
||||||
|
@ -162,8 +161,8 @@ func (p *PoolWithFunc) ReSize(size int) {
|
||||||
// Release Closed this pool.
|
// Release Closed this pool.
|
||||||
func (p *PoolWithFunc) Release() error {
|
func (p *PoolWithFunc) Release() error {
|
||||||
p.once.Do(func() {
|
p.once.Do(func() {
|
||||||
|
atomic.StoreInt32(&p.release, 1)
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
p.release = true
|
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
for i, w := range idleWorkers {
|
for i, w := range idleWorkers {
|
||||||
w.args <- nil
|
w.args <- nil
|
||||||
|
@ -195,10 +194,6 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
defer p.lock.Unlock()
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
if p.release {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
n := len(idleWorkers) - 1
|
n := len(idleWorkers) - 1
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
|
|
Loading…
Reference in New Issue