fixed issue #6

This commit is contained in:
Andy Pan 2018-08-03 21:15:11 +08:00
parent 1444008b67
commit 4927155de3
3 changed files with 31 additions and 40 deletions

View File

@ -89,6 +89,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) {
}) })
defer p.Release() defer p.Release()
b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {
wg.Add(1) wg.Add(1)

35
pool.go
View File

@ -23,7 +23,6 @@
package ants package ants
import ( import (
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -45,10 +44,6 @@ type Pool struct {
// expiryDuration set the expired time (second) of every worker. // expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration expiryDuration time.Duration
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers []*Worker workers []*Worker
@ -77,7 +72,6 @@ func (p *Pool) periodicallyPurge() {
break break
} }
n = i n = i
<-p.freeSignal
w.task <- nil w.task <- nil
idleWorkers[i] = nil idleWorkers[i] = nil
} }
@ -104,7 +98,6 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
} }
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1), release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
} }
@ -119,8 +112,7 @@ func (p *Pool) Submit(task f) error {
if len(p.release) > 0 { if len(p.release) > 0 {
return ErrPoolClosed return ErrPoolClosed
} }
w := p.getWorker() p.getWorker().task <- task
w.task <- task
return nil return nil
} }
@ -160,7 +152,6 @@ func (p *Pool) Release() error {
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
for i, w := range idleWorkers { for i, w := range idleWorkers {
<-p.freeSignal
w.task <- nil w.task <- nil
idleWorkers[i] = nil idleWorkers[i] = nil
} }
@ -193,7 +184,6 @@ func (p *Pool) getWorker() *Worker {
if n < 0 { if n < 0 {
waiting = p.Running() >= p.Cap() waiting = p.Running() >= p.Cap()
} else { } else {
<-p.freeSignal
w = idleWorkers[n] w = idleWorkers[n]
idleWorkers[n] = nil idleWorkers[n] = nil
p.workers = idleWorkers[:n] p.workers = idleWorkers[:n]
@ -201,14 +191,20 @@ func (p *Pool) getWorker() *Worker {
p.lock.Unlock() p.lock.Unlock()
if waiting { if waiting {
<-p.freeSignal for {
p.lock.Lock() p.lock.Lock()
idleWorkers = p.workers idleWorkers = p.workers
l := len(idleWorkers) - 1 l := len(idleWorkers) - 1
w = idleWorkers[l] if l < 0 {
idleWorkers[l] = nil p.lock.Unlock()
p.workers = idleWorkers[:l] continue
p.lock.Unlock() }
w = idleWorkers[l]
idleWorkers[l] = nil
p.workers = idleWorkers[:l]
p.lock.Unlock()
break
}
} else if w == nil { } else if w == nil {
w = &Worker{ w = &Worker{
pool: p, pool: p,
@ -226,5 +222,4 @@ func (p *Pool) putWorker(worker *Worker) {
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
p.lock.Unlock() p.lock.Unlock()
p.freeSignal <- sig{}
} }

View File

@ -23,7 +23,6 @@
package ants package ants
import ( import (
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -43,10 +42,6 @@ type PoolWithFunc struct {
// expiryDuration set the expired time (second) of every worker. // expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration expiryDuration time.Duration
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers []*WorkerWithFunc workers []*WorkerWithFunc
@ -78,7 +73,6 @@ func (p *PoolWithFunc) periodicallyPurge() {
break break
} }
n = i n = i
<-p.freeSignal
w.args <- nil w.args <- nil
idleWorkers[i] = nil idleWorkers[i] = nil
} }
@ -105,7 +99,6 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
} }
p := &PoolWithFunc{ p := &PoolWithFunc{
capacity: int32(size), capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1), release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
poolFunc: f, poolFunc: f,
@ -124,8 +117,7 @@ func (p *PoolWithFunc) Serve(args interface{}) error {
if len(p.release) > 0 { if len(p.release) > 0 {
return ErrPoolClosed return ErrPoolClosed
} }
w := p.getWorker() p.getWorker().args <- args
w.args <- args
return nil return nil
} }
@ -165,7 +157,6 @@ func (p *PoolWithFunc) Release() error {
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
for i, w := range idleWorkers { for i, w := range idleWorkers {
<-p.freeSignal
w.args <- nil w.args <- nil
idleWorkers[i] = nil idleWorkers[i] = nil
} }
@ -198,7 +189,6 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
if n < 0 { if n < 0 {
waiting = p.Running() >= p.Cap() waiting = p.Running() >= p.Cap()
} else { } else {
<-p.freeSignal
w = idleWorkers[n] w = idleWorkers[n]
idleWorkers[n] = nil idleWorkers[n] = nil
p.workers = idleWorkers[:n] p.workers = idleWorkers[:n]
@ -206,14 +196,20 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
p.lock.Unlock() p.lock.Unlock()
if waiting { if waiting {
<-p.freeSignal for {
p.lock.Lock() p.lock.Lock()
idleWorkers = p.workers idleWorkers = p.workers
l := len(idleWorkers) - 1 l := len(idleWorkers) - 1
w = idleWorkers[l] if l < 0 {
idleWorkers[l] = nil p.lock.Unlock()
p.workers = idleWorkers[:l] continue
p.lock.Unlock() }
w = idleWorkers[l]
idleWorkers[l] = nil
p.workers = idleWorkers[:l]
p.lock.Unlock()
break
}
} else if w == nil { } else if w == nil {
w = &WorkerWithFunc{ w = &WorkerWithFunc{
pool: p, pool: p,
@ -231,5 +227,4 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
p.lock.Unlock() p.lock.Unlock()
p.freeSignal <- sig{}
} }