diff --git a/ants.go b/ants.go index b55872e..335ac87 100644 --- a/ants.go +++ b/ants.go @@ -1,6 +1,6 @@ package ants -const DEFAULT_POOL_SIZE = 5 +const DEFAULT_POOL_SIZE = 50000 var defaultPool = NewPool(DEFAULT_POOL_SIZE) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index be54823..274ba1c 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -6,7 +6,7 @@ import ( "sync" ) -const RunTimes = 10 +const RunTimes = 10000000 func BenchmarkGoroutine(b *testing.B) { for i := 0; i < b.N; i++ { diff --git a/pool.go b/pool.go index 8eb4215..a7ee612 100644 --- a/pool.go +++ b/pool.go @@ -4,6 +4,7 @@ import ( "runtime" "sync/atomic" "sync" + "math" ) type sig struct{} @@ -17,13 +18,13 @@ type Pool struct { workers []*Worker workerPool sync.Pool destroy chan sig - m sync.Mutex + lock sync.Mutex } func NewPool(size int) *Pool { p := &Pool{ capacity: int32(size), - freeSignal: make(chan sig, size), + freeSignal: make(chan sig, math.MaxInt32), destroy: make(chan sig, runtime.GOMAXPROCS(-1)), } @@ -68,8 +69,8 @@ func (p *Pool) Cap() int { } func (p *Pool) Destroy() error { - p.m.Lock() - defer p.m.Unlock() + p.lock.Lock() + defer p.lock.Unlock() for i := 0; i < runtime.GOMAXPROCS(-1)+1; i++ { p.destroy <- sig{} } @@ -82,50 +83,126 @@ func (p *Pool) reachLimit() bool { return p.Running() >= p.Cap() } -func (p *Pool) newWorker() *Worker { - var w *Worker - if p.reachLimit() { - <-p.freeSignal - return p.getWorker() - } - wp := p.workerPool.Get() - if wp == nil { - w = &Worker{ - pool: p, - task: make(chan f), - } - } else { - w = wp.(*Worker) - } - w.run() - atomic.AddInt32(&p.running, 1) - return w -} +//func (p *Pool) newWorker() *Worker { +// var w *Worker +// if p.reachLimit() { +// <-p.freeSignal +// return p.getWorker() +// } +// wp := p.workerPool.Get() +// if wp == nil { +// w = &Worker{ +// pool: p, +// task: make(chan f), +// } +// } else { +// w = wp.(*Worker) +// } +// w.run() +// atomic.AddInt32(&p.running, 1) +// return w +//} +// +//func (p *Pool) getWorker() *Worker { +// var w *Worker +// p.lock.Lock() +// workers := p.workers +// n := len(workers) - 1 +// if n < 0 { +// p.lock.Unlock() +// return p.newWorker() +// } else { +// w = workers[n] +// workers[n] = nil +// p.workers = workers[:n] +// //atomic.AddInt32(&p.running, 1) +// } +// p.lock.Unlock() +// return w +//} + +//func (p *Pool) newWorker() *Worker { +// var w *Worker +// if p.reachLimit() { +// <-p.freeSignal +// return p.getWorker() +// } +// wp := p.workerPool.Get() +// if wp == nil { +// w = &Worker{ +// pool: p, +// task: make(chan f), +// } +// } else { +// w = wp.(*Worker) +// } +// w.run() +// atomic.AddInt32(&p.running, 1) +// return w +//} func (p *Pool) getWorker() *Worker { + //fmt.Printf("init running workers number:%d\n", p.running) var w *Worker - p.m.Lock() + waiting := false + + p.lock.Lock() workers := p.workers n := len(workers) - 1 if n < 0 { - p.m.Unlock() - return p.newWorker() + //fmt.Printf("running workers number:%d\n", p.running) + if p.running >= p.capacity { + waiting = true + } } else { w = workers[n] workers[n] = nil p.workers = workers[:n] - atomic.AddInt32(&p.running, 1) + //atomic.AddInt32(&p.running, 1) + } + p.lock.Unlock() + + if waiting { + <-p.freeSignal + //p.lock.Lock() + //fmt.Println("wait for a worker") + //fmt.Println("get for a worker") + for { + p.lock.Lock() + workers = p.workers + l := len(workers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = workers[l] + workers[l] = nil + p.workers = workers[:l] + p.lock.Unlock() + break + } + //p.lock.Unlock() + } else { + wp := p.workerPool.Get() + if wp == nil { + w = &Worker{ + pool: p, + task: make(chan f), + } + w.run() + atomic.AddInt32(&p.running, 1) + } else { + w = wp.(*Worker) + } } - p.m.Unlock() return w } func (p *Pool) putWorker(worker *Worker) { p.workerPool.Put(worker) - p.m.Lock() + p.lock.Lock() p.workers = append(p.workers, worker) - if p.reachLimit() { - p.freeSignal <- sig{} - } - p.m.Unlock() + p.lock.Unlock() + p.freeSignal <- sig{} + //fmt.Printf("put a worker, running worker number:%d\n", p.Running()) }