mirror of https://github.com/panjf2000/ants.git
Merge branch 'develop'
This commit is contained in:
commit
d44d7e0349
|
@ -89,6 +89,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) {
|
||||||
})
|
})
|
||||||
defer p.Release()
|
defer p.Release()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
for j := 0; j < RunTimes; j++ {
|
for j := 0; j < RunTimes; j++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
45
pool.go
45
pool.go
|
@ -23,7 +23,6 @@
|
||||||
package ants
|
package ants
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -45,10 +44,6 @@ type Pool struct {
|
||||||
// expiryDuration set the expired time (second) of every worker.
|
// expiryDuration set the expired time (second) of every worker.
|
||||||
expiryDuration time.Duration
|
expiryDuration time.Duration
|
||||||
|
|
||||||
// freeSignal is used to notice pool there are available
|
|
||||||
// workers which can be sent to work.
|
|
||||||
freeSignal chan sig
|
|
||||||
|
|
||||||
// workers is a slice that store the available workers.
|
// workers is a slice that store the available workers.
|
||||||
workers []*Worker
|
workers []*Worker
|
||||||
|
|
||||||
|
@ -77,7 +72,6 @@ func (p *Pool) periodicallyPurge() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
n = i
|
n = i
|
||||||
<-p.freeSignal
|
|
||||||
w.task <- nil
|
w.task <- nil
|
||||||
idleWorkers[i] = nil
|
idleWorkers[i] = nil
|
||||||
}
|
}
|
||||||
|
@ -104,7 +98,6 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
|
||||||
}
|
}
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
capacity: int32(size),
|
capacity: int32(size),
|
||||||
freeSignal: make(chan sig, math.MaxInt32),
|
|
||||||
release: make(chan sig, 1),
|
release: make(chan sig, 1),
|
||||||
expiryDuration: time.Duration(expiry) * time.Second,
|
expiryDuration: time.Duration(expiry) * time.Second,
|
||||||
}
|
}
|
||||||
|
@ -119,8 +112,7 @@ func (p *Pool) Submit(task f) error {
|
||||||
if len(p.release) > 0 {
|
if len(p.release) > 0 {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
w := p.getWorker()
|
p.getWorker().task <- task
|
||||||
w.task <- task
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +152,6 @@ func (p *Pool) Release() error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
for i, w := range idleWorkers {
|
for i, w := range idleWorkers {
|
||||||
<-p.freeSignal
|
|
||||||
w.task <- nil
|
w.task <- nil
|
||||||
idleWorkers[i] = nil
|
idleWorkers[i] = nil
|
||||||
}
|
}
|
||||||
|
@ -172,13 +163,13 @@ func (p *Pool) Release() error {
|
||||||
|
|
||||||
//-------------------------------------------------------------------------
|
//-------------------------------------------------------------------------
|
||||||
|
|
||||||
// incrRunning increases the number of the currently running goroutines
|
// incRunning increases the number of the currently running goroutines
|
||||||
func (p *Pool) incrRunning() {
|
func (p *Pool) incRunning() {
|
||||||
atomic.AddInt32(&p.running, 1)
|
atomic.AddInt32(&p.running, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// decrRunning decreases the number of the currently running goroutines
|
// decRunning decreases the number of the currently running goroutines
|
||||||
func (p *Pool) decrRunning() {
|
func (p *Pool) decRunning() {
|
||||||
atomic.AddInt32(&p.running, -1)
|
atomic.AddInt32(&p.running, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,7 +184,6 @@ func (p *Pool) getWorker() *Worker {
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
waiting = p.Running() >= p.Cap()
|
waiting = p.Running() >= p.Cap()
|
||||||
} else {
|
} else {
|
||||||
<-p.freeSignal
|
|
||||||
w = idleWorkers[n]
|
w = idleWorkers[n]
|
||||||
idleWorkers[n] = nil
|
idleWorkers[n] = nil
|
||||||
p.workers = idleWorkers[:n]
|
p.workers = idleWorkers[:n]
|
||||||
|
@ -201,21 +191,27 @@ func (p *Pool) getWorker() *Worker {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
|
||||||
if waiting {
|
if waiting {
|
||||||
<-p.freeSignal
|
for {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers = p.workers
|
idleWorkers = p.workers
|
||||||
l := len(idleWorkers) - 1
|
l := len(idleWorkers) - 1
|
||||||
w = idleWorkers[l]
|
if l < 0 {
|
||||||
idleWorkers[l] = nil
|
p.lock.Unlock()
|
||||||
p.workers = idleWorkers[:l]
|
continue
|
||||||
p.lock.Unlock()
|
}
|
||||||
|
w = idleWorkers[l]
|
||||||
|
idleWorkers[l] = nil
|
||||||
|
p.workers = idleWorkers[:l]
|
||||||
|
p.lock.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
} else if w == nil {
|
} else if w == nil {
|
||||||
w = &Worker{
|
w = &Worker{
|
||||||
pool: p,
|
pool: p,
|
||||||
task: make(chan f, 1),
|
task: make(chan f, 1),
|
||||||
}
|
}
|
||||||
w.run()
|
w.run()
|
||||||
p.incrRunning()
|
p.incRunning()
|
||||||
}
|
}
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
@ -226,5 +222,4 @@ func (p *Pool) putWorker(worker *Worker) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
p.workers = append(p.workers, worker)
|
p.workers = append(p.workers, worker)
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
p.freeSignal <- sig{}
|
|
||||||
}
|
}
|
||||||
|
|
45
pool_func.go
45
pool_func.go
|
@ -23,7 +23,6 @@
|
||||||
package ants
|
package ants
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -43,10 +42,6 @@ type PoolWithFunc struct {
|
||||||
// expiryDuration set the expired time (second) of every worker.
|
// expiryDuration set the expired time (second) of every worker.
|
||||||
expiryDuration time.Duration
|
expiryDuration time.Duration
|
||||||
|
|
||||||
// freeSignal is used to notice pool there are available
|
|
||||||
// workers which can be sent to work.
|
|
||||||
freeSignal chan sig
|
|
||||||
|
|
||||||
// workers is a slice that store the available workers.
|
// workers is a slice that store the available workers.
|
||||||
workers []*WorkerWithFunc
|
workers []*WorkerWithFunc
|
||||||
|
|
||||||
|
@ -78,7 +73,6 @@ func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
n = i
|
n = i
|
||||||
<-p.freeSignal
|
|
||||||
w.args <- nil
|
w.args <- nil
|
||||||
idleWorkers[i] = nil
|
idleWorkers[i] = nil
|
||||||
}
|
}
|
||||||
|
@ -105,7 +99,6 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
|
||||||
}
|
}
|
||||||
p := &PoolWithFunc{
|
p := &PoolWithFunc{
|
||||||
capacity: int32(size),
|
capacity: int32(size),
|
||||||
freeSignal: make(chan sig, math.MaxInt32),
|
|
||||||
release: make(chan sig, 1),
|
release: make(chan sig, 1),
|
||||||
expiryDuration: time.Duration(expiry) * time.Second,
|
expiryDuration: time.Duration(expiry) * time.Second,
|
||||||
poolFunc: f,
|
poolFunc: f,
|
||||||
|
@ -124,8 +117,7 @@ func (p *PoolWithFunc) Serve(args interface{}) error {
|
||||||
if len(p.release) > 0 {
|
if len(p.release) > 0 {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
w := p.getWorker()
|
p.getWorker().args <- args
|
||||||
w.args <- args
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +157,6 @@ func (p *PoolWithFunc) Release() error {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
for i, w := range idleWorkers {
|
for i, w := range idleWorkers {
|
||||||
<-p.freeSignal
|
|
||||||
w.args <- nil
|
w.args <- nil
|
||||||
idleWorkers[i] = nil
|
idleWorkers[i] = nil
|
||||||
}
|
}
|
||||||
|
@ -177,13 +168,13 @@ func (p *PoolWithFunc) Release() error {
|
||||||
|
|
||||||
//-------------------------------------------------------------------------
|
//-------------------------------------------------------------------------
|
||||||
|
|
||||||
// incrRunning increases the number of the currently running goroutines
|
// incRunning increases the number of the currently running goroutines
|
||||||
func (p *PoolWithFunc) incrRunning() {
|
func (p *PoolWithFunc) incRunning() {
|
||||||
atomic.AddInt32(&p.running, 1)
|
atomic.AddInt32(&p.running, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// decrRunning decreases the number of the currently running goroutines
|
// decRunning decreases the number of the currently running goroutines
|
||||||
func (p *PoolWithFunc) decrRunning() {
|
func (p *PoolWithFunc) decRunning() {
|
||||||
atomic.AddInt32(&p.running, -1)
|
atomic.AddInt32(&p.running, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,7 +189,6 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
waiting = p.Running() >= p.Cap()
|
waiting = p.Running() >= p.Cap()
|
||||||
} else {
|
} else {
|
||||||
<-p.freeSignal
|
|
||||||
w = idleWorkers[n]
|
w = idleWorkers[n]
|
||||||
idleWorkers[n] = nil
|
idleWorkers[n] = nil
|
||||||
p.workers = idleWorkers[:n]
|
p.workers = idleWorkers[:n]
|
||||||
|
@ -206,21 +196,27 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
|
||||||
if waiting {
|
if waiting {
|
||||||
<-p.freeSignal
|
for {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
idleWorkers = p.workers
|
idleWorkers = p.workers
|
||||||
l := len(idleWorkers) - 1
|
l := len(idleWorkers) - 1
|
||||||
w = idleWorkers[l]
|
if l < 0 {
|
||||||
idleWorkers[l] = nil
|
p.lock.Unlock()
|
||||||
p.workers = idleWorkers[:l]
|
continue
|
||||||
p.lock.Unlock()
|
}
|
||||||
|
w = idleWorkers[l]
|
||||||
|
idleWorkers[l] = nil
|
||||||
|
p.workers = idleWorkers[:l]
|
||||||
|
p.lock.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
} else if w == nil {
|
} else if w == nil {
|
||||||
w = &WorkerWithFunc{
|
w = &WorkerWithFunc{
|
||||||
pool: p,
|
pool: p,
|
||||||
args: make(chan interface{}, 1),
|
args: make(chan interface{}, 1),
|
||||||
}
|
}
|
||||||
w.run()
|
w.run()
|
||||||
p.incrRunning()
|
p.incRunning()
|
||||||
}
|
}
|
||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
@ -231,5 +227,4 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
p.workers = append(p.workers, worker)
|
p.workers = append(p.workers, worker)
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
p.freeSignal <- sig{}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (w *Worker) run() {
|
||||||
go func() {
|
go func() {
|
||||||
for f := range w.task {
|
for f := range w.task {
|
||||||
if f == nil {
|
if f == nil {
|
||||||
w.pool.decrRunning()
|
w.pool.decRunning()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
f()
|
f()
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (w *WorkerWithFunc) run() {
|
||||||
go func() {
|
go func() {
|
||||||
for args := range w.args {
|
for args := range w.args {
|
||||||
if args == nil {
|
if args == nil {
|
||||||
w.pool.decrRunning()
|
w.pool.decRunning()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.pool.poolFunc(args)
|
w.pool.poolFunc(args)
|
||||||
|
|
Loading…
Reference in New Issue