From 0fd8ba8dae156194e755aa11faf3c743b37ca2c8 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 19 May 2018 19:08:31 +0800 Subject: [PATCH] use the "container/list" to keep workers --- ants.go | 6 ++--- ants_test.go | 16 ++++++++----- pool.go | 67 +++++++++++++--------------------------------------- worker.go | 39 ++++++++++++++++++++++++++---- 4 files changed, 65 insertions(+), 63 deletions(-) diff --git a/ants.go b/ants.go index 4abef68..3bd31f9 100644 --- a/ants.go +++ b/ants.go @@ -22,6 +22,6 @@ func Free() int { return defaultPool.Free() } -func Wait() { - defaultPool.Wait() -} +//func Wait() { +// defaultPool.Wait() +//} diff --git a/ants_test.go b/ants_test.go index 8f43b8a..29c4217 100644 --- a/ants_test.go +++ b/ants_test.go @@ -7,15 +7,19 @@ import ( "runtime" ) -var n = 10 +var n = 100000 func demoFunc() { - var n int - for i := 0; i < 10000; i++ { - n += i + for i := 0; i < 1000000; i++ { } - fmt.Printf("finish task with result:%d\n", n) } +//func demoFunc() { +// var n int +// for i := 0; i < 10000; i++ { +// n += i +// } +// fmt.Printf("finish task with result:%d\n", n) +//} func TestDefaultPool(t *testing.T) { for i := 0; i < n; i++ { @@ -26,7 +30,7 @@ func TestDefaultPool(t *testing.T) { t.Logf("running workers number:%d", ants.Running()) t.Logf("free workers number:%d", ants.Free()) - ants.Wait() + //ants.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) diff --git a/pool.go b/pool.go index 28b165e..3f30f76 100644 --- a/pool.go +++ b/pool.go @@ -11,33 +11,27 @@ type sig struct{} type f func() -//type er interface{} - type Pool struct { - capacity int32 - running int32 - //tasks chan er - //workers chan er - tasks *sync.Pool - workers *sync.Pool + capacity int32 + running int32 + tasks *ConcurrentQueue + workers *ConcurrentQueue freeSignal chan sig launchSignal chan sig destroy chan sig m *sync.Mutex - wg *sync.WaitGroup + //wg *sync.WaitGroup } func NewPool(size int) *Pool { p := &Pool{ - capacity: int32(size), - //tasks: make(chan er, size), - //workers: make(chan er, size), - tasks: &sync.Pool{}, - workers: &sync.Pool{}, + capacity: int32(size), + tasks: NewConcurrentQueue(), + workers: NewConcurrentQueue(), freeSignal: make(chan sig, math.MaxInt32), launchSignal: make(chan sig, math.MaxInt32), destroy: make(chan sig, runtime.GOMAXPROCS(-1)), - wg: &sync.WaitGroup{}, + //wg: &sync.WaitGroup{}, } p.loop() return p @@ -51,7 +45,7 @@ func (p *Pool) loop() { for { select { case <-p.launchSignal: - p.getWorker().sendTask(p.tasks.Get().(f)) + p.getWorker().sendTask(p.tasks.pop().(f)) case <-p.destroy: return } @@ -64,10 +58,9 @@ func (p *Pool) Push(task f) error { if len(p.destroy) > 0 { return nil } - //p.tasks <- task - p.tasks.Put(task) + p.tasks.push(task) p.launchSignal <- sig{} - p.wg.Add(1) + //p.wg.Add(1) return nil } func (p *Pool) Running() int { @@ -82,9 +75,9 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -func (p *Pool) Wait() { - p.wg.Wait() -} +//func (p *Pool) Wait() { +// p.wg.Wait() +//} func (p *Pool) Destroy() error { p.m.Lock() @@ -115,42 +108,16 @@ func (p *Pool) newWorker() *Worker { return worker } -//func (p *Pool) newWorker() *Worker { -// worker := &Worker{ -// pool: p, -// task: make(chan f), -// exit: make(chan sig), -// } -// worker.run() -// return worker -//} - -//func (p *Pool) getWorker() *Worker { -// defer atomic.AddInt32(&p.running, 1) -// var worker *Worker -// if p.reachLimit() { -// worker = (<-p.workers).(*Worker) -// } else { -// select { -// case w := <-p.workers: -// return w.(*Worker) -// default: -// worker = p.newWorker() -// } -// } -// return worker -//} - func (p *Pool) getWorker() *Worker { defer atomic.AddInt32(&p.running, 1) - if w := p.workers.Get(); w != nil { + if w := p.workers.pop(); w != nil { return w.(*Worker) } return p.newWorker() } func (p *Pool) PutWorker(worker *Worker) { - p.workers.Put(worker) + p.workers.push(worker) if p.reachLimit() { p.freeSignal <- sig{} } diff --git a/worker.go b/worker.go index f6b0098..42393f4 100644 --- a/worker.go +++ b/worker.go @@ -1,6 +1,10 @@ package ants -import "sync/atomic" +import ( + "sync/atomic" + "container/list" + "sync" +) type Worker struct { pool *Pool @@ -14,9 +18,8 @@ func (w *Worker) run() { select { case f := <-w.task: f() - //w.pool.workers <- w - w.pool.workers.Put(w) - w.pool.wg.Done() + w.pool.workers.push(w) + //w.pool.wg.Done() case <-w.exit: atomic.AddInt32(&w.pool.running, -1) return @@ -32,3 +35,31 @@ func (w *Worker) stop() { func (w *Worker) sendTask(task f) { w.task <- task } + +//-------------------------------------------------------------------------------- + +type ConcurrentQueue struct { + queue *list.List + m sync.Mutex +} + +func NewConcurrentQueue() *ConcurrentQueue { + q := new(ConcurrentQueue) + q.queue = list.New() + return q +} + +func (q *ConcurrentQueue) push(v interface{}) { + defer q.m.Unlock() + q.m.Lock() + q.queue.PushFront(v) +} + +func (q *ConcurrentQueue) pop() interface{} { + defer q.m.Unlock() + q.m.Lock() + if elem := q.queue.Back(); elem != nil{ + return q.queue.Remove(elem) + } + return nil +}