From ab6390f6d004ee5e9fb73f913d5d31449fca9e4c Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 19:27:54 +0800 Subject: [PATCH] optimization --- examples/main.go | 17 ++++++++++++----- pool.go | 22 +++++++++++++--------- pool_func.go | 30 ++++++++++++++++++------------ worker_func.go | 2 +- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/examples/main.go b/examples/main.go index 719c6b5..b1e6602 100644 --- a/examples/main.go +++ b/examples/main.go @@ -25,15 +25,17 @@ package main import ( "fmt" "sync" + "sync/atomic" "github.com/panjf2000/ants" ) -var str = "Hello World!" +var sum int32 func myFunc(i interface{}) error { - s := i.(string) - fmt.Println(s) + n := i.(int) + atomic.AddInt32(&sum, int32(n)) + fmt.Printf("run with %d\n", n) return nil } @@ -66,9 +68,14 @@ func main() { // submit for i := 0; i < runTimes; i++ { wg.Add(1) - p.Serve(str) + p.Serve(i) } wg.Wait() + //var m int + //var i int + //for n := range sum { + // m += n + //} fmt.Printf("running goroutines: %d\n", p.Running()) - fmt.Println("finish all tasks!") + fmt.Printf("finish all tasks, result is %d\n", sum) } diff --git a/pool.go b/pool.go index 25c347e..5065e3a 100644 --- a/pool.go +++ b/pool.go @@ -151,17 +151,21 @@ func (p *Pool) getWorker() *Worker { break } } else if w == nil { - wp := p.workerPool.Get() - if wp == nil { - w = &Worker{ - pool: p, - task: make(chan f, workerArgsCap), - } - } else { - w = wp.(*Worker) + //wp := p.workerPool.Get() + //if wp == nil { + // w = &Worker{ + // pool: p, + // task: make(chan f, workerArgsCap), + // } + //} else { + // w = wp.(*Worker) + //} + w = &Worker{ + pool: p, + task: make(chan f, workerArgsCap), } w.run() - p.workerPool.Put(w) + //p.workerPool.Put(w) } return w } diff --git a/pool_func.go b/pool_func.go index 6ea84fb..a8cb357 100644 --- a/pool_func.go +++ b/pool_func.go @@ -47,7 +47,7 @@ type PoolWithFunc struct { workers []*WorkerWithFunc // workerPool is a pool that saves a set of temporary objects. - workerPool sync.Pool + //workerPool sync.Pool // release is used to notice the pool to closed itself. release chan sig @@ -57,7 +57,6 @@ type PoolWithFunc struct { poolFunc pf once sync.Once - } // NewPoolWithFunc generates a instance of ants pool with a specific function. @@ -108,7 +107,7 @@ func (p *PoolWithFunc) Cap() int { // Release Closed this pool func (p *PoolWithFunc) Release() error { p.once.Do(func() { - p.release<- sig{} + p.release <- sig{} }) return nil } @@ -131,6 +130,8 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if n < 0 { if p.running >= p.capacity { waiting = true + } else { + p.running++ } } else { w = workers[n] @@ -156,23 +157,28 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { break } } else if w == nil { - wp := p.workerPool.Get() - if wp == nil { - w = &WorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerArgsCap), - } - } else { - w = wp.(*WorkerWithFunc) + //wp := p.workerPool.Get() + //if wp == nil { + // w = &WorkerWithFunc{ + // pool: p, + // args: make(chan interface{}, workerArgsCap), + // } + //} else { + // w = wp.(*WorkerWithFunc) + //} + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}), } w.run() - p.workerPool.Put(w) + //p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + //p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker_func.go b/worker_func.go index 9580b79..4ef969e 100644 --- a/worker_func.go +++ b/worker_func.go @@ -40,7 +40,7 @@ type WorkerWithFunc struct { // run will start a goroutine to repeat the process // that perform the function calls. func (w *WorkerWithFunc) run() { - atomic.AddInt32(&w.pool.running, 1) + //atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { if args == nil || len(w.pool.release) > 0 {