diff --git a/ants.go b/ants.go index ba16dbe..7c51439 100644 --- a/ants.go +++ b/ants.go @@ -1,6 +1,6 @@ package ants -const DEFAULT_POOL_SIZE = 10000 +const DEFAULT_POOL_SIZE = 50000 var defaultPool = NewPool(DEFAULT_POOL_SIZE) diff --git a/ants_test.go b/ants_test.go index b460152..21fb2f7 100644 --- a/ants_test.go +++ b/ants_test.go @@ -29,9 +29,9 @@ func TestDefaultPool(t *testing.T) { ants.Push(demoFunc) } - t.Logf("pool capacity:%d", ants.Cap()) - t.Logf("running workers number:%d", ants.Running()) - t.Logf("free workers number:%d", ants.Free()) + //t.Logf("pool capacity:%d", ants.Cap()) + //t.Logf("running workers number:%d", ants.Running()) + //t.Logf("free workers number:%d", ants.Free()) ants.Wait() @@ -69,5 +69,4 @@ func TestNoPool(t *testing.T) { mem := runtime.MemStats{} runtime.ReadMemStats(&mem) t.Logf("memory usage:%d", mem.TotalAlloc/1024) - } diff --git a/pool.go b/pool.go index 297b400..50e4a09 100644 --- a/pool.go +++ b/pool.go @@ -4,7 +4,6 @@ import ( "runtime" "sync/atomic" "sync" - "math" ) type sig struct{} @@ -12,57 +11,39 @@ type sig struct{} type f func() type Pool struct { - capacity int32 - running int32 - tasks *ConcurrentQueue - workers *ConcurrentQueue - freeSignal chan sig - launchSignal chan sig - destroy chan sig - m *sync.Mutex - wg *sync.WaitGroup + capacity int32 + running int32 + freeSignal chan sig + workers []*Worker + workerPool sync.Pool + destroy chan sig + m sync.Mutex + wg *sync.WaitGroup } func NewPool(size int) *Pool { p := &Pool{ - capacity: int32(size), - tasks: NewConcurrentQueue(), - workers: NewConcurrentQueue(), - freeSignal: make(chan sig, math.MaxInt32), - launchSignal: make(chan sig, math.MaxInt32), - destroy: make(chan sig, runtime.GOMAXPROCS(-1)), - wg: &sync.WaitGroup{}, + capacity: int32(size), + freeSignal: make(chan sig, size), + destroy: make(chan sig, runtime.GOMAXPROCS(-1)), + wg: &sync.WaitGroup{}, } - p.loop() return p } //------------------------------------------------------------------------- -func (p *Pool) loop() { - for i := 0; i < runtime.GOMAXPROCS(-1); i++ { - go func() { - for { - select { - case <-p.launchSignal: - p.getWorker().sendTask(p.tasks.pop().(f)) - case <-p.destroy: - return - } - } - }() - } -} - func (p *Pool) Push(task f) error { if len(p.destroy) > 0 { return nil } - p.tasks.push(task) p.wg.Add(1) - p.launchSignal <- sig{} + w := p.getWorker() + w.sendTask(task) + //p.launchSignal <- sig{} return nil } + func (p *Pool) Running() int { return int(atomic.LoadInt32(&p.running)) } @@ -102,7 +83,6 @@ func (p *Pool) newWorker() *Worker { worker := &Worker{ pool: p, task: make(chan f), - exit: make(chan sig), } worker.run() atomic.AddInt32(&p.running, 1) @@ -110,15 +90,18 @@ func (p *Pool) newWorker() *Worker { } func (p *Pool) getWorker() *Worker { - if w := p.workers.pop(); w != nil { + if w := p.workerPool.Get(); w != nil { return w.(*Worker) } return p.newWorker() } func (p *Pool) putWorker(worker *Worker) { - p.workers.push(worker) + p.workerPool.Put(worker) + p.m.Lock() + p.workers = append(p.workers, worker) if p.reachLimit() { p.freeSignal <- sig{} } + p.m.Unlock() } diff --git a/worker.go b/worker.go index 21240cc..1cd5949 100644 --- a/worker.go +++ b/worker.go @@ -2,64 +2,47 @@ package ants import ( "sync/atomic" - "container/list" - "sync" ) type Worker struct { pool *Pool task chan f - exit chan sig } +//func (w *Worker) run() { +// go func() { +// for { +// select { +// case f := <-w.task: +// f() +// w.pool.putWorker(w) +// w.pool.wg.Done() +// case <-w.exit: +// atomic.AddInt32(&w.pool.running, -1) +// return +// } +// } +// }() +//} + func (w *Worker) run() { go func() { - for { - select { - case f := <-w.task: - f() - w.pool.putWorker(w) - w.pool.wg.Done() - case <-w.exit: + for f := range w.task { + if f == nil { atomic.AddInt32(&w.pool.running, -1) return } + f() + w.pool.putWorker(w) + w.pool.wg.Done() } }() } func (w *Worker) stop() { - w.exit <- sig{} + w.task <- nil } func (w *Worker) sendTask(task f) { w.task <- task } - -//-------------------------------------------------------------------------------- - -type ConcurrentQueue struct { - queue *list.List - m sync.Mutex -} - -func NewConcurrentQueue() *ConcurrentQueue { - q := new(ConcurrentQueue) - q.queue = list.New() - return q -} - -func (q *ConcurrentQueue) push(v interface{}) { - defer q.m.Unlock() - q.m.Lock() - q.queue.PushFront(v) -} - -func (q *ConcurrentQueue) pop() interface{} { - defer q.m.Unlock() - q.m.Lock() - if elem := q.queue.Back(); elem != nil { - return q.queue.Remove(elem) - } - return nil -}