From 63ebcb38ddc4d1480e38cdde694d3c291f22c26f Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 22 May 2018 11:17:19 +0800 Subject: [PATCH 1/9] update go test and benchmarks test --- ants.go | 7 +++-- ants_benchmark_test.go | 4 +-- ants_test.go | 67 +++++++++++++++++++++--------------------- 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/ants.go b/ants.go index af76b67..2f27f3b 100644 --- a/ants.go +++ b/ants.go @@ -22,11 +22,14 @@ package ants -import "errors" +import ( + "errors" + "math" +) const ( // DefaultPoolSize is the default capacity for a default goroutine pool - DefaultPoolSize = 50000 + DefaultPoolSize = math.MaxInt32 // DefaultCleanIntervalTime is the interval time to clean up goroutines DefaultCleanIntervalTime = 30 diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0358dfc..188a926 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -36,7 +36,7 @@ func BenchmarkGoroutine(b *testing.B) { for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { - forSleep() + demoFunc() wg.Done() }() } @@ -50,7 +50,7 @@ func BenchmarkPoolGoroutine(b *testing.B) { for j := 0; j < RunTimes; j++ { wg.Add(1) ants.Push(func() { - forSleep() + demoFunc() wg.Done() }) } diff --git a/ants_test.go b/ants_test.go index d1020ab..ff05806 100644 --- a/ants_test.go +++ b/ants_test.go @@ -27,10 +27,9 @@ import ( "runtime" "sync" "testing" - "time" ) -var n = 1000000 +var n = 10000000 //func demoFunc() { // var n int @@ -47,25 +46,12 @@ var n = 1000000 // fmt.Printf("finish task with result:%d\n", n) //} -func forSleep() { - time.Sleep(time.Millisecond) -} - -func TestNoPool(t *testing.T) { - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - go func() { - forSleep() - //demoFunc() - wg.Done() - }() +func demoFunc() { + //time.Sleep(time.Millisecond) + var n int + for i := 0; i < 1000000; i++ { + n += i } - - wg.Wait() - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/1024) } func TestDefaultPool(t *testing.T) { @@ -73,8 +59,7 @@ func TestDefaultPool(t *testing.T) { for i := 0; i < n; i++ { wg.Add(1) ants.Push(func() { - forSleep() - //demoFunc() + demoFunc() wg.Done() }) } @@ -89,24 +74,40 @@ func TestDefaultPool(t *testing.T) { t.Logf("memory usage:%d", mem.TotalAlloc/1024) } -func TestCustomPool(t *testing.T) { - p, _ := ants.NewPool(30000) +func TestNoPool(t *testing.T) { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) - p.Push(func() { - forSleep() - //demoFunc() + go func() { + demoFunc() wg.Done() - }) + }() } + wg.Wait() - - //t.Logf("pool capacity:%d", p.Cap()) - //t.Logf("free workers number:%d", p.Free()) - - t.Logf("running workers number:%d", p.Running()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) t.Logf("memory usage:%d", mem.TotalAlloc/1024) } + +//func TestCustomPool(t *testing.T) { +// p, _ := ants.NewPool(30000) +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// p.Push(func() { +// demoFunc() +// //demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// +// //t.Logf("pool capacity:%d", p.Cap()) +// //t.Logf("free workers number:%d", p.Free()) +// +// t.Logf("running workers number:%d", p.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} From fad443d7d8db7637212d76aeae8b4b429759cab8 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 22 May 2018 11:26:16 +0800 Subject: [PATCH 2/9] add a new type of pool, allowing to create a pool with a function --- pool_func.go | 199 +++++++++++++++++++++++++++++++++++++++++++++++++ worker_func.go | 63 ++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 pool_func.go create mode 100644 worker_func.go diff --git a/pool_func.go b/pool_func.go new file mode 100644 index 0000000..83b1cd4 --- /dev/null +++ b/pool_func.go @@ -0,0 +1,199 @@ +// MIT License + +// Copyright (c) 2018 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "math" + "sync" + "sync/atomic" + "time" +) + +type pf func(interface{}) error + +// PoolWithFunc accept the tasks from client,it will limit the total +// of goroutines to a given number by recycling goroutines. +type PoolWithFunc struct { + // capacity of the pool. + capacity int32 + + // running is the number of the currently running goroutines. + running int32 + + // signal is used to notice pool there are available + // workers which can be sent to work. + freeSignal chan sig + + // workers is a slice that store the available workers. + workers []*WorkerWithFunc + + // workerPool is a pool that saves a set of temporary objects. + workerPool sync.Pool + + // release is used to notice the pool to closed itself. + release chan sig + + lock sync.Mutex + + // closed is used to confirm whether this pool has been closed. + closed int32 + + poolFunc pf +} + +// NewPoolWithFunc generates a instance of ants pool with a specific function. +func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { + if size <= 0 { + return nil, ErrPoolSizeInvalid + } + p := &PoolWithFunc{ + capacity: int32(size), + freeSignal: make(chan sig, math.MaxInt32), + release: make(chan sig), + closed: 0, + poolFunc: f, + } + + return p, nil +} + +//------------------------------------------------------------------------- + +// scanAndClean is a goroutine who will periodically clean up +// after it is noticed that this pool is closed. +func (p *PoolWithFunc) scanAndCleanWithFunc() { + ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) + go func() { + ticker.Stop() + for range ticker.C { + if atomic.LoadInt32(&p.closed) == 1 { + p.lock.Lock() + for _, w := range p.workers { + w.stopWithFunc() + } + p.lock.Unlock() + } + } + }() +} + +// Push submit a task to pool +func (p *PoolWithFunc) Serve(args interface{}) error { + if atomic.LoadInt32(&p.closed) == 1 { + return ErrPoolClosed + } + w := p.getWorkerWithFunc() + w.sendTaskWithFunc(args) + return nil +} + +// Running returns the number of the currently running goroutines +func (p *PoolWithFunc) RunningWithFunc() int { + return int(atomic.LoadInt32(&p.running)) +} + +// Free returns the available goroutines to work +func (p *PoolWithFunc) FreeWithFunc() int { + return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) +} + +// Cap returns the capacity of this pool +func (p *PoolWithFunc) CapWithFunc() int { + return int(atomic.LoadInt32(&p.capacity)) +} + +// Release Closed this pool +func (p *PoolWithFunc) ReleaseWithFunc() error { + p.lock.Lock() + atomic.StoreInt32(&p.closed, 1) + close(p.release) + p.lock.Unlock() + return nil +} + +// ReSize change the capacity of this pool +func (p *PoolWithFunc) ReSizeWithFunc(size int) { + atomic.StoreInt32(&p.capacity, int32(size)) +} + +//------------------------------------------------------------------------- + +// getWorker returns a available worker to run the tasks. +func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { + var w *WorkerWithFunc + waiting := false + + p.lock.Lock() + workers := p.workers + n := len(workers) - 1 + if n < 0 { + if p.running >= p.capacity { + waiting = true + } + } else { + w = workers[n] + workers[n] = nil + p.workers = workers[:n] + } + p.lock.Unlock() + + if waiting { + <-p.freeSignal + for { + p.lock.Lock() + workers = p.workers + l := len(workers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = workers[l] + workers[l] = nil + p.workers = workers[:l] + p.lock.Unlock() + break + } + } else { + wp := p.workerPool.Get() + if wp == nil { + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}), + } + w.runWithFunc() + atomic.AddInt32(&p.running, 1) + } else { + w = wp.(*WorkerWithFunc) + } + } + return w +} + +// putWorker puts a worker back into free pool, recycling the goroutines. +func (p *PoolWithFunc) putWorkerWithFunc(worker *WorkerWithFunc) { + p.workerPool.Put(worker) + p.lock.Lock() + p.workers = append(p.workers, worker) + p.lock.Unlock() + p.freeSignal <- sig{} +} diff --git a/worker_func.go b/worker_func.go new file mode 100644 index 0000000..5deab61 --- /dev/null +++ b/worker_func.go @@ -0,0 +1,63 @@ +// MIT License + +// Copyright (c) 2018 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "sync/atomic" +) + +// Worker is the actual executor who run the tasks, +// it will start a goroutine that accept tasks and +// perform function calls. +type WorkerWithFunc struct { + // pool who owns this worker. + pool *PoolWithFunc + + // args is a job should be done. + args chan interface{} +} + +// run will start a goroutine to repeat the process +// that perform the function calls. +func (w *WorkerWithFunc) runWithFunc() { + go func() { + for args := range w.args { + if args == nil { + atomic.AddInt32(&w.pool.running, -1) + return + } + w.pool.poolFunc(args) + w.pool.putWorkerWithFunc(w) + } + }() +} + +// stop this worker. +func (w *WorkerWithFunc) stopWithFunc() { + w.args <- nil +} + +// sendTask send a task to this worker. +func (w *WorkerWithFunc) sendTaskWithFunc(args interface{}) { + w.args <- args +} From 967b0b04e299a478751eec2752e3ddf19e0f58ac Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 22 May 2018 12:01:00 +0800 Subject: [PATCH 3/9] optimization --- ants_benchmark_test.go | 35 +++++++++++++++++++++------ ants_test.go | 54 +++++++++++++++++++++++++++++++++++------- pool_func.go | 24 +++++++++---------- worker_func.go | 8 +++---- 4 files changed, 90 insertions(+), 31 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 188a926..e22a4a1 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -28,7 +28,16 @@ import ( "testing" ) -const RunTimes = 10000000 +const RunTimes = 1000000 + +func demoPoolFunc(args interface{}) error { + m := args.(int) + var n int + for i := 0; i < m; i++ { + n += i + } + return nil +} func BenchmarkGoroutine(b *testing.B) { for i := 0; i < b.N; i++ { @@ -36,7 +45,7 @@ func BenchmarkGoroutine(b *testing.B) { for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { - demoFunc() + demoPoolFunc(RunTimes) wg.Done() }() } @@ -44,15 +53,27 @@ func BenchmarkGoroutine(b *testing.B) { } } -func BenchmarkPoolGoroutine(b *testing.B) { +//func BenchmarkAntsPool(b *testing.B) { +// for i := 0; i < b.N; i++ { +// var wg sync.WaitGroup +// for j := 0; j < RunTimes; j++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// } +//} + +func BenchmarkAntsPoolWithFunc(b *testing.B) { + p, _ := ants.NewPoolWithFunc(100000, demoPoolFunc) for i := 0; i < b.N; i++ { var wg sync.WaitGroup for j := 0; j < RunTimes; j++ { wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) + p.Serve(RunTimes) } wg.Wait() } diff --git a/ants_test.go b/ants_test.go index ff05806..be18314 100644 --- a/ants_test.go +++ b/ants_test.go @@ -29,7 +29,7 @@ import ( "testing" ) -var n = 10000000 +var n = 1000000 //func demoFunc() { // var n int @@ -54,21 +54,59 @@ func demoFunc() { } } -func TestDefaultPool(t *testing.T) { +//func TestDefaultPool(t *testing.T) { +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// +// //t.Logf("pool capacity:%d", ants.Cap()) +// //t.Logf("free workers number:%d", ants.Free()) +// +// t.Logf("running workers number:%d", ants.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} + +//func TestNoPool(t *testing.T) { +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// go func() { +// demoFunc() +// wg.Done() +// }() +// } +// +// wg.Wait() +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} + +func TestAntsPoolWithFunc(t *testing.T) { var wg sync.WaitGroup + p, _ := ants.NewPoolWithFunc(100000, func(i interface{}) error { + demoPoolFunc(i) + wg.Done() + return nil + }) for i := 0; i < n; i++ { wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) + p.Serve(n) } wg.Wait() //t.Logf("pool capacity:%d", ants.Cap()) //t.Logf("free workers number:%d", ants.Free()) - t.Logf("running workers number:%d", ants.Running()) + t.Logf("running workers number:%d", p.Running()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) t.Logf("memory usage:%d", mem.TotalAlloc/1024) @@ -79,7 +117,7 @@ func TestNoPool(t *testing.T) { for i := 0; i < n; i++ { wg.Add(1) go func() { - demoFunc() + demoPoolFunc(n) wg.Done() }() } diff --git a/pool_func.go b/pool_func.go index 83b1cd4..c74eec8 100644 --- a/pool_func.go +++ b/pool_func.go @@ -81,7 +81,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { // scanAndClean is a goroutine who will periodically clean up // after it is noticed that this pool is closed. -func (p *PoolWithFunc) scanAndCleanWithFunc() { +func (p *PoolWithFunc) scanAndClean() { ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) go func() { ticker.Stop() @@ -89,7 +89,7 @@ func (p *PoolWithFunc) scanAndCleanWithFunc() { if atomic.LoadInt32(&p.closed) == 1 { p.lock.Lock() for _, w := range p.workers { - w.stopWithFunc() + w.stop() } p.lock.Unlock() } @@ -102,28 +102,28 @@ func (p *PoolWithFunc) Serve(args interface{}) error { if atomic.LoadInt32(&p.closed) == 1 { return ErrPoolClosed } - w := p.getWorkerWithFunc() - w.sendTaskWithFunc(args) + w := p.getWorker() + w.sendTask(args) return nil } // Running returns the number of the currently running goroutines -func (p *PoolWithFunc) RunningWithFunc() int { +func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } // Free returns the available goroutines to work -func (p *PoolWithFunc) FreeWithFunc() int { +func (p *PoolWithFunc) Free() int { return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) } // Cap returns the capacity of this pool -func (p *PoolWithFunc) CapWithFunc() int { +func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } // Release Closed this pool -func (p *PoolWithFunc) ReleaseWithFunc() error { +func (p *PoolWithFunc) Release() error { p.lock.Lock() atomic.StoreInt32(&p.closed, 1) close(p.release) @@ -132,14 +132,14 @@ func (p *PoolWithFunc) ReleaseWithFunc() error { } // ReSize change the capacity of this pool -func (p *PoolWithFunc) ReSizeWithFunc(size int) { +func (p *PoolWithFunc) ReSize(size int) { atomic.StoreInt32(&p.capacity, int32(size)) } //------------------------------------------------------------------------- // getWorker returns a available worker to run the tasks. -func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { +func (p *PoolWithFunc) getWorker() *WorkerWithFunc { var w *WorkerWithFunc waiting := false @@ -180,7 +180,7 @@ func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { pool: p, args: make(chan interface{}), } - w.runWithFunc() + w.run() atomic.AddInt32(&p.running, 1) } else { w = wp.(*WorkerWithFunc) @@ -190,7 +190,7 @@ func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { } // putWorker puts a worker back into free pool, recycling the goroutines. -func (p *PoolWithFunc) putWorkerWithFunc(worker *WorkerWithFunc) { +func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) diff --git a/worker_func.go b/worker_func.go index 5deab61..47bccdb 100644 --- a/worker_func.go +++ b/worker_func.go @@ -39,7 +39,7 @@ type WorkerWithFunc struct { // run will start a goroutine to repeat the process // that perform the function calls. -func (w *WorkerWithFunc) runWithFunc() { +func (w *WorkerWithFunc) run() { go func() { for args := range w.args { if args == nil { @@ -47,17 +47,17 @@ func (w *WorkerWithFunc) runWithFunc() { return } w.pool.poolFunc(args) - w.pool.putWorkerWithFunc(w) + w.pool.putWorker(w) } }() } // stop this worker. -func (w *WorkerWithFunc) stopWithFunc() { +func (w *WorkerWithFunc) stop() { w.args <- nil } // sendTask send a task to this worker. -func (w *WorkerWithFunc) sendTaskWithFunc(args interface{}) { +func (w *WorkerWithFunc) sendTask(args interface{}) { w.args <- args } From f4a3fc5efd7088c049076a0a1d33deaac59500fc Mon Sep 17 00:00:00 2001 From: andy pan Date: Tue, 22 May 2018 16:10:29 +0800 Subject: [PATCH 4/9] add one more check when getting worker --- pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool.go b/pool.go index 704571c..29de436 100644 --- a/pool.go +++ b/pool.go @@ -172,7 +172,7 @@ func (p *Pool) getWorker() *Worker { p.lock.Unlock() break } - } else { + } else if w == nil { wp := p.workerPool.Get() if wp == nil { w = &Worker{ From b2ab15fa961ee59c2757b8a097560befcf9d18f6 Mon Sep 17 00:00:00 2001 From: andy pan Date: Tue, 22 May 2018 16:20:01 +0800 Subject: [PATCH 5/9] update --- pool_func.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool_func.go b/pool_func.go index c74eec8..7fcc9b6 100644 --- a/pool_func.go +++ b/pool_func.go @@ -173,7 +173,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { p.lock.Unlock() break } - } else { + } else if w == nil { wp := p.workerPool.Get() if wp == nil { w = &WorkerWithFunc{ From d606fb79f669b69cea99d1536814af02cbed45af Mon Sep 17 00:00:00 2001 From: andy pan Date: Tue, 22 May 2018 18:00:15 +0800 Subject: [PATCH 6/9] benchmarks updated --- ants_benchmark_test.go | 40 +++++++++++++++++++++++----------------- ants_test.go | 7 ++++--- pool_func.go | 2 +- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index e22a4a1..2f44ba1 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -23,9 +23,10 @@ package ants_test import ( - "github.com/panjf2000/ants" "sync" "testing" + + "github.com/panjf2000/ants" ) const RunTimes = 1000000 @@ -53,24 +54,29 @@ func BenchmarkGoroutine(b *testing.B) { } } -//func BenchmarkAntsPool(b *testing.B) { -// for i := 0; i < b.N; i++ { -// var wg sync.WaitGroup -// for j := 0; j < RunTimes; j++ { -// wg.Add(1) -// ants.Push(func() { -// demoFunc() -// wg.Done() -// }) -// } -// wg.Wait() -// } -//} - -func BenchmarkAntsPoolWithFunc(b *testing.B) { - p, _ := ants.NewPoolWithFunc(100000, demoPoolFunc) +func BenchmarkAntsPool(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup + for j := 0; j < RunTimes; j++ { + wg.Add(1) + ants.Push(func() { + demoFunc() + wg.Done() + }) + } + wg.Wait() + } +} + +func BenchmarkAntsPoolWithFunc(b *testing.B) { + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + p, _ := ants.NewPoolWithFunc(100000, func(i interface{}) error { + demoPoolFunc(i) + wg.Done() + return nil + }) + b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) p.Serve(RunTimes) diff --git a/ants_test.go b/ants_test.go index be18314..79a3a57 100644 --- a/ants_test.go +++ b/ants_test.go @@ -23,13 +23,14 @@ package ants_test import ( - "github.com/panjf2000/ants" "runtime" "sync" "testing" + + "github.com/panjf2000/ants" ) -var n = 1000000 +var n = 100000 //func demoFunc() { // var n int @@ -92,7 +93,7 @@ func demoFunc() { func TestAntsPoolWithFunc(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(100000, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { demoPoolFunc(i) wg.Done() return nil diff --git a/pool_func.go b/pool_func.go index 7fcc9b6..81af5a2 100644 --- a/pool_func.go +++ b/pool_func.go @@ -97,7 +97,7 @@ func (p *PoolWithFunc) scanAndClean() { }() } -// Push submit a task to pool +// Serve submit a task to pool func (p *PoolWithFunc) Serve(args interface{}) error { if atomic.LoadInt32(&p.closed) == 1 { return ErrPoolClosed From ffe56cc35d61003bb689bb7f125fc04b1a3839d1 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 22 May 2018 22:37:55 +0800 Subject: [PATCH 7/9] Auto stash before merge of "develop" and "origin/develop" --- ants_benchmark_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 2f44ba1..37234fc 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -41,8 +41,10 @@ func demoPoolFunc(args interface{}) error { } func BenchmarkGoroutine(b *testing.B) { + b.N = 1 for i := 0; i < b.N; i++ { var wg sync.WaitGroup + b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { From 004caa6636017f14c7ea19f24d1cce2b4b10df24 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 23 May 2018 00:01:10 +0800 Subject: [PATCH 8/9] update benchmark --- ants_benchmark_test.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 37234fc..6485b61 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -30,6 +30,7 @@ import ( ) const RunTimes = 1000000 +const loop = 1000000 func demoPoolFunc(args interface{}) error { m := args.(int) @@ -41,14 +42,13 @@ func demoPoolFunc(args interface{}) error { } func BenchmarkGoroutine(b *testing.B) { - b.N = 1 for i := 0; i < b.N; i++ { var wg sync.WaitGroup b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { - demoPoolFunc(RunTimes) + demoPoolFunc(loop) wg.Done() }() } @@ -56,19 +56,19 @@ func BenchmarkGoroutine(b *testing.B) { } } -func BenchmarkAntsPool(b *testing.B) { - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - for j := 0; j < RunTimes; j++ { - wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) - } - wg.Wait() - } -} +//func BenchmarkAntsPool(b *testing.B) { +// for i := 0; i < b.N; i++ { +// var wg sync.WaitGroup +// for j := 0; j < RunTimes; j++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// } +//} func BenchmarkAntsPoolWithFunc(b *testing.B) { for i := 0; i < b.N; i++ { @@ -81,7 +81,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) - p.Serve(RunTimes) + p.Serve(loop) } wg.Wait() } From 0453f88168f1f2cac8e1f0938a2f14927286b7d7 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 23 May 2018 00:46:43 +0800 Subject: [PATCH 9/9] update --- ants.go | 8 ++++++++ pool.go | 7 +++---- pool_func.go | 7 +++---- worker.go | 1 + worker_func.go | 1 + 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/ants.go b/ants.go index 2f27f3b..c306d41 100644 --- a/ants.go +++ b/ants.go @@ -25,6 +25,7 @@ package ants import ( "errors" "math" + "runtime" ) const ( @@ -68,3 +69,10 @@ var ( ErrPoolSizeInvalid = errors.New("invalid size for pool") ErrPoolClosed = errors.New("this pool has been closed") ) + +var workerArgsCap = func() int { + if runtime.GOMAXPROCS(0) == 1 { + return 0 + } + return 1 +}() diff --git a/pool.go b/pool.go index 29de436..58b2dd2 100644 --- a/pool.go +++ b/pool.go @@ -177,20 +177,19 @@ func (p *Pool) getWorker() *Worker { if wp == nil { w = &Worker{ pool: p, - task: make(chan f), + task: make(chan f, workerArgsCap), } - w.run() - atomic.AddInt32(&p.running, 1) } else { w = wp.(*Worker) } + w.run() + p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { - p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/pool_func.go b/pool_func.go index 81af5a2..7223bfa 100644 --- a/pool_func.go +++ b/pool_func.go @@ -178,20 +178,19 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if wp == nil { w = &WorkerWithFunc{ pool: p, - args: make(chan interface{}), + args: make(chan interface{}, workerArgsCap), } - w.run() - atomic.AddInt32(&p.running, 1) } else { w = wp.(*WorkerWithFunc) } + w.run() + p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { - p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker.go b/worker.go index d46ffa4..601836c 100644 --- a/worker.go +++ b/worker.go @@ -40,6 +40,7 @@ type Worker struct { // run will start a goroutine to repeat the process // that perform the function calls. func (w *Worker) run() { + atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { if f == nil { diff --git a/worker_func.go b/worker_func.go index 47bccdb..87dd6a4 100644 --- a/worker_func.go +++ b/worker_func.go @@ -40,6 +40,7 @@ type WorkerWithFunc struct { // run will start a goroutine to repeat the process // that perform the function calls. func (w *WorkerWithFunc) run() { + atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { if args == nil {