From ea5025d8ab80d06f245c77f0f80d74ed1d75a826 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 23 May 2018 23:55:42 +0800 Subject: [PATCH 1/9] update --- ants_test.go | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/ants_test.go b/ants_test.go index 74c95e9..1092e21 100644 --- a/ants_test.go +++ b/ants_test.go @@ -73,7 +73,7 @@ func TestDefaultPool(t *testing.T) { t.Logf("running workers number:%d", ants.Running()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/MiB) + t.Logf("memory usage:%d MB", mem.TotalAlloc/MiB) } func TestNoPool(t *testing.T) { @@ -89,30 +89,30 @@ func TestNoPool(t *testing.T) { wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/MiB) + t.Logf("memory usage:%d MB", mem.TotalAlloc/MiB) } -func TestAntsPoolWithFunc(t *testing.T) { - var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { - demoPoolFunc(i) - wg.Done() - return nil - }) - for i := 0; i < n; i++ { - wg.Add(1) - p.Serve(n) - } - wg.Wait() +//func TestAntsPoolWithFunc(t *testing.T) { +// var wg sync.WaitGroup +// p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { +// demoPoolFunc(i) +// wg.Done() +// return nil +// }) +// for i := 0; i < n; i++ { +// wg.Add(1) +// p.Serve(n) +// } +// wg.Wait() //t.Logf("pool capacity:%d", ants.Cap()) //t.Logf("free workers number:%d", ants.Free()) - t.Logf("running workers number:%d", p.Running()) - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/GiB) -} +// t.Logf("running workers number:%d", p.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/GiB) +//} // func TestNoPool(t *testing.T) { // var wg sync.WaitGroup From 08a652b1dcb8771f9fbc1b6ae0cf54099e2d19a2 Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 13:35:57 +0800 Subject: [PATCH 2/9] go test optimization --- ants_benchmark_test.go | 100 +++++++++++++++++------------------------ ants_test.go | 66 +++++++++------------------ examples/main.go | 2 +- 3 files changed, 64 insertions(+), 104 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index f04d158..698c5d4 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -23,7 +23,6 @@ package ants_test import ( - "runtime" "sync" "testing" "time" @@ -33,68 +32,37 @@ import ( const ( _ = 1 << (10 * iota) - KiB // 1024 - MiB // 1048576 - GiB // 1073741824 - TiB // 1099511627776 (超过了int32的范围) - PiB // 1125899906842624 - EiB // 1152921504606846976 - ZiB // 1180591620717411303424 (超过了int64的范围) - YiB // 1208925819614629174706176 + KiB // 1024 + MiB // 1048576 + GiB // 1073741824 + TiB // 1099511627776 (超过了int32的范围) + PiB // 1125899906842624 + EiB // 1152921504606846976 + ZiB // 1180591620717411303424 (超过了int64的范围) + YiB // 1208925819614629174706176 ) const RunTimes = 10000000 -const loop = 5 +const loop = 10 + +func demoFunc() { + time.Sleep(loop * time.Millisecond) +} func demoPoolFunc(args interface{}) error { - // m := args.(int) - // var n int - // for i := 0; i < m; i++ { - // n += i - // } - // return nil + //m := args.(int) + //var n int + //for i := 0; i < m; i++ { + // n += i + //} + //return nil n := args.(int) time.Sleep(time.Duration(n) * time.Millisecond) return nil } -func BenchmarkGoroutine(b *testing.B) { - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - for j := 0; j < RunTimes; j++ { - wg.Add(1) - go func() { - demoFunc() - wg.Done() - }() - } - wg.Wait() - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) -} - -func BenchmarkAntsPool(b *testing.B) { - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - for j := 0; j < RunTimes; j++ { - wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) - } - wg.Wait() - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) -} - func BenchmarkGoroutineWithFunc(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup - b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { @@ -104,9 +72,7 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { } wg.Wait() } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) + //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkAntsPoolWithFunc(b *testing.B) { @@ -117,14 +83,32 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { wg.Done() return nil }) - b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) p.Serve(loop) } wg.Wait() + //b.Logf("running goroutines: %d", p.Running()) } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) + //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) +} + +func BenchmarkGoroutine(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := 0; j < RunTimes; j++ { + go demoFunc() + } + } + + //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) +} + +func BenchmarkAntsPool(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := 0; j < RunTimes; j++ { + ants.Push(demoFunc) + } + b.Logf("running goroutines: %d", ants.Running()) + } + //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } diff --git a/ants_test.go b/ants_test.go index 74c95e9..451b5b3 100644 --- a/ants_test.go +++ b/ants_test.go @@ -24,7 +24,6 @@ package ants_test import ( "runtime" - "time" "sync" "testing" @@ -33,29 +32,6 @@ import ( var n = 10000000 -//func demoFunc() { -// var n int -// for i := 0; i < 1000000; i++ { -// n += i -// } -//} - -//func demoFunc() { -// var n int -// for i := 0; i < 10000; i++ { -// n += i -// } -// fmt.Printf("finish task with result:%d\n", n) -//} - -func demoFunc() { - time.Sleep(10 * time.Millisecond) - // var n int - // for i := 0; i < 1000000; i++ { - // n += i - // } -} - func TestDefaultPool(t *testing.T) { var wg sync.WaitGroup for i := 0; i < n; i++ { @@ -73,7 +49,7 @@ func TestDefaultPool(t *testing.T) { t.Logf("running workers number:%d", ants.Running()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/MiB) + t.Logf("memory usage:%d MB", mem.TotalAlloc/MiB) } func TestNoPool(t *testing.T) { @@ -89,30 +65,30 @@ func TestNoPool(t *testing.T) { wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/MiB) + t.Logf("memory usage:%d MB", mem.TotalAlloc/MiB) } -func TestAntsPoolWithFunc(t *testing.T) { - var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { - demoPoolFunc(i) - wg.Done() - return nil - }) - for i := 0; i < n; i++ { - wg.Add(1) - p.Serve(n) - } - wg.Wait() +// func TestAntsPoolWithFunc(t *testing.T) { +// var wg sync.WaitGroup +// p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { +// demoPoolFunc(i) +// wg.Done() +// return nil +// }) +// for i := 0; i < n; i++ { +// wg.Add(1) +// p.Serve(n) +// } +// wg.Wait() - //t.Logf("pool capacity:%d", ants.Cap()) - //t.Logf("free workers number:%d", ants.Free()) +// //t.Logf("pool capacity:%d", ants.Cap()) +// //t.Logf("free workers number:%d", ants.Free()) - t.Logf("running workers number:%d", p.Running()) - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/GiB) -} +// t.Logf("running workers number:%d", p.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/GiB) +// } // func TestNoPool(t *testing.T) { // var wg sync.WaitGroup diff --git a/examples/main.go b/examples/main.go index 9c5bb92..5f71174 100644 --- a/examples/main.go +++ b/examples/main.go @@ -58,7 +58,7 @@ func main() { // set 100 the size of goroutine pool var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(100, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error { myFunc(i) wg.Done() return nil From ff4b7d8a22966d4f44597f7cdb3a611242678d45 Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 18:30:58 +0800 Subject: [PATCH 3/9] update --- ants_benchmark_test.go | 7 ++----- examples/main.go | 1 + pool_func.go | 40 +++++++++++----------------------------- worker_func.go | 22 +++++++++++++++++++++- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 698c5d4..0d6e0c6 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -72,7 +72,6 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { } wg.Wait() } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkAntsPoolWithFunc(b *testing.B) { @@ -88,9 +87,8 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { p.Serve(loop) } wg.Wait() - //b.Logf("running goroutines: %d", p.Running()) + b.Logf("running goroutines: %d", p.Running()) } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkGoroutine(b *testing.B) { @@ -100,15 +98,14 @@ func BenchmarkGoroutine(b *testing.B) { } } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkAntsPool(b *testing.B) { + b.N = 3 for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { ants.Push(demoFunc) } b.Logf("running goroutines: %d", ants.Running()) } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } diff --git a/examples/main.go b/examples/main.go index 5f71174..719c6b5 100644 --- a/examples/main.go +++ b/examples/main.go @@ -69,5 +69,6 @@ func main() { p.Serve(str) } wg.Wait() + fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Println("finish all tasks!") } diff --git a/pool_func.go b/pool_func.go index 0e888e5..6ea84fb 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,7 +26,6 @@ import ( "math" "sync" "sync/atomic" - "time" ) type pf func(interface{}) error @@ -55,10 +54,10 @@ type PoolWithFunc struct { lock sync.Mutex - // closed is used to confirm whether this pool has been closed. - closed int32 - poolFunc pf + + once sync.Once + } // NewPoolWithFunc generates a instance of ants pool with a specific function. @@ -69,8 +68,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { p := &PoolWithFunc{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - release: make(chan sig), - closed: 0, + release: make(chan sig, 1), poolFunc: f, } @@ -79,27 +77,12 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { //------------------------------------------------------------------------- -// scanAndClean is a goroutine who will periodically clean up -// after it is noticed that this pool is closed. -func (p *PoolWithFunc) scanAndClean() { - ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) - go func() { - ticker.Stop() - for range ticker.C { - if atomic.LoadInt32(&p.closed) == 1 { - p.lock.Lock() - for _, w := range p.workers { - w.stop() - } - p.lock.Unlock() - } - } - }() -} - // Serve submit a task to pool func (p *PoolWithFunc) Serve(args interface{}) error { - if atomic.LoadInt32(&p.closed) == 1 { + //if atomic.LoadInt32(&p.closed) == 1 { + // return ErrPoolClosed + //} + if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() @@ -124,10 +107,9 @@ func (p *PoolWithFunc) Cap() int { // Release Closed this pool func (p *PoolWithFunc) Release() error { - p.lock.Lock() - atomic.StoreInt32(&p.closed, 1) - close(p.release) - p.lock.Unlock() + p.once.Do(func() { + p.release<- sig{} + }) return nil } diff --git a/worker_func.go b/worker_func.go index 87dd6a4..9580b79 100644 --- a/worker_func.go +++ b/worker_func.go @@ -43,7 +43,7 @@ func (w *WorkerWithFunc) run() { atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { - if args == nil { + if args == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } @@ -53,6 +53,26 @@ func (w *WorkerWithFunc) run() { }() } +//func (w *WorkerWithFunc) run() { +// atomic.AddInt32(&w.pool.running, 1) +// go func() { +// for { +// select { +// case args := <-w.args: +// if args == nil { +// atomic.AddInt32(&w.pool.running, -1) +// return +// } +// w.pool.poolFunc(args) +// w.pool.putWorker(w) +// case <-w.pool.release: +// atomic.AddInt32(&w.pool.running, -1) +// return +// } +// } +// }() +//} + // stop this worker. func (w *WorkerWithFunc) stop() { w.args <- nil From 6581f1821dd5fec9b925cdad569d59b2b146edea Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 18:35:26 +0800 Subject: [PATCH 4/9] update --- pool.go | 34 ++++++---------------------------- worker.go | 2 +- 2 files changed, 7 insertions(+), 29 deletions(-) diff --git a/pool.go b/pool.go index 113027f..25c347e 100644 --- a/pool.go +++ b/pool.go @@ -26,7 +26,6 @@ import ( "math" "sync" "sync/atomic" - "time" ) type sig struct{} @@ -57,8 +56,7 @@ type Pool struct { lock sync.Mutex - // closed is used to confirm whether this pool has been closed. - closed int32 + once sync.Once } // NewPool generates a instance of ants pool @@ -69,8 +67,7 @@ func NewPool(size int) (*Pool, error) { p := &Pool{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - release: make(chan sig), - closed: 0, + release: make(chan sig, 1), } return p, nil @@ -78,27 +75,9 @@ func NewPool(size int) (*Pool, error) { //------------------------------------------------------------------------- -// scanAndClean is a goroutine who will periodically clean up -// after it is noticed that this pool is closed. -func (p *Pool) scanAndClean() { - ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) - go func() { - ticker.Stop() - for range ticker.C { - if atomic.LoadInt32(&p.closed) == 1 { - p.lock.Lock() - for _, w := range p.workers { - w.stop() - } - p.lock.Unlock() - } - } - }() -} - // Push submit a task to pool func (p *Pool) Push(task f) error { - if atomic.LoadInt32(&p.closed) == 1 { + if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() @@ -123,10 +102,9 @@ func (p *Pool) Cap() int { // Release Closed this pool func (p *Pool) Release() error { - p.lock.Lock() - atomic.StoreInt32(&p.closed, 1) - close(p.release) - p.lock.Unlock() + p.once.Do(func() { + p.release <- sig{} + }) return nil } diff --git a/worker.go b/worker.go index 601836c..34b3004 100644 --- a/worker.go +++ b/worker.go @@ -43,7 +43,7 @@ func (w *Worker) run() { atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { - if f == nil { + if f == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } From ab6390f6d004ee5e9fb73f913d5d31449fca9e4c Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 19:27:54 +0800 Subject: [PATCH 5/9] optimization --- examples/main.go | 17 ++++++++++++----- pool.go | 22 +++++++++++++--------- pool_func.go | 30 ++++++++++++++++++------------ worker_func.go | 2 +- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/examples/main.go b/examples/main.go index 719c6b5..b1e6602 100644 --- a/examples/main.go +++ b/examples/main.go @@ -25,15 +25,17 @@ package main import ( "fmt" "sync" + "sync/atomic" "github.com/panjf2000/ants" ) -var str = "Hello World!" +var sum int32 func myFunc(i interface{}) error { - s := i.(string) - fmt.Println(s) + n := i.(int) + atomic.AddInt32(&sum, int32(n)) + fmt.Printf("run with %d\n", n) return nil } @@ -66,9 +68,14 @@ func main() { // submit for i := 0; i < runTimes; i++ { wg.Add(1) - p.Serve(str) + p.Serve(i) } wg.Wait() + //var m int + //var i int + //for n := range sum { + // m += n + //} fmt.Printf("running goroutines: %d\n", p.Running()) - fmt.Println("finish all tasks!") + fmt.Printf("finish all tasks, result is %d\n", sum) } diff --git a/pool.go b/pool.go index 25c347e..5065e3a 100644 --- a/pool.go +++ b/pool.go @@ -151,17 +151,21 @@ func (p *Pool) getWorker() *Worker { break } } else if w == nil { - wp := p.workerPool.Get() - if wp == nil { - w = &Worker{ - pool: p, - task: make(chan f, workerArgsCap), - } - } else { - w = wp.(*Worker) + //wp := p.workerPool.Get() + //if wp == nil { + // w = &Worker{ + // pool: p, + // task: make(chan f, workerArgsCap), + // } + //} else { + // w = wp.(*Worker) + //} + w = &Worker{ + pool: p, + task: make(chan f, workerArgsCap), } w.run() - p.workerPool.Put(w) + //p.workerPool.Put(w) } return w } diff --git a/pool_func.go b/pool_func.go index 6ea84fb..a8cb357 100644 --- a/pool_func.go +++ b/pool_func.go @@ -47,7 +47,7 @@ type PoolWithFunc struct { workers []*WorkerWithFunc // workerPool is a pool that saves a set of temporary objects. - workerPool sync.Pool + //workerPool sync.Pool // release is used to notice the pool to closed itself. release chan sig @@ -57,7 +57,6 @@ type PoolWithFunc struct { poolFunc pf once sync.Once - } // NewPoolWithFunc generates a instance of ants pool with a specific function. @@ -108,7 +107,7 @@ func (p *PoolWithFunc) Cap() int { // Release Closed this pool func (p *PoolWithFunc) Release() error { p.once.Do(func() { - p.release<- sig{} + p.release <- sig{} }) return nil } @@ -131,6 +130,8 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if n < 0 { if p.running >= p.capacity { waiting = true + } else { + p.running++ } } else { w = workers[n] @@ -156,23 +157,28 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { break } } else if w == nil { - wp := p.workerPool.Get() - if wp == nil { - w = &WorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerArgsCap), - } - } else { - w = wp.(*WorkerWithFunc) + //wp := p.workerPool.Get() + //if wp == nil { + // w = &WorkerWithFunc{ + // pool: p, + // args: make(chan interface{}, workerArgsCap), + // } + //} else { + // w = wp.(*WorkerWithFunc) + //} + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}), } w.run() - p.workerPool.Put(w) + //p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + //p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker_func.go b/worker_func.go index 9580b79..4ef969e 100644 --- a/worker_func.go +++ b/worker_func.go @@ -40,7 +40,7 @@ type WorkerWithFunc struct { // run will start a goroutine to repeat the process // that perform the function calls. func (w *WorkerWithFunc) run() { - atomic.AddInt32(&w.pool.running, 1) + //atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { if args == nil || len(w.pool.release) > 0 { From 07f98b4ac6bc55ba7ba199947e5e911b943e3ba7 Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 19:30:18 +0800 Subject: [PATCH 6/9] remove some comments --- pool.go | 2 ++ worker.go | 2 +- worker_func.go | 20 -------------------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/pool.go b/pool.go index 5065e3a..05dfe6a 100644 --- a/pool.go +++ b/pool.go @@ -126,6 +126,8 @@ func (p *Pool) getWorker() *Worker { if n < 0 { if p.running >= p.capacity { waiting = true + } else { + p.running++ } } else { w = workers[n] diff --git a/worker.go b/worker.go index 34b3004..ed941b7 100644 --- a/worker.go +++ b/worker.go @@ -40,7 +40,7 @@ type Worker struct { // run will start a goroutine to repeat the process // that perform the function calls. func (w *Worker) run() { - atomic.AddInt32(&w.pool.running, 1) + //atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { if f == nil || len(w.pool.release) > 0 { diff --git a/worker_func.go b/worker_func.go index 4ef969e..3d07f3d 100644 --- a/worker_func.go +++ b/worker_func.go @@ -53,26 +53,6 @@ func (w *WorkerWithFunc) run() { }() } -//func (w *WorkerWithFunc) run() { -// atomic.AddInt32(&w.pool.running, 1) -// go func() { -// for { -// select { -// case args := <-w.args: -// if args == nil { -// atomic.AddInt32(&w.pool.running, -1) -// return -// } -// w.pool.poolFunc(args) -// w.pool.putWorker(w) -// case <-w.pool.release: -// atomic.AddInt32(&w.pool.running, -1) -// return -// } -// } -// }() -//} - // stop this worker. func (w *WorkerWithFunc) stop() { w.args <- nil From aaa7bad491e00ff4871828b3a6823c756657f794 Mon Sep 17 00:00:00 2001 From: andy pan Date: Thu, 24 May 2018 19:32:45 +0800 Subject: [PATCH 7/9] rearrange task channel --- pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool.go b/pool.go index 05dfe6a..b7b58ef 100644 --- a/pool.go +++ b/pool.go @@ -164,7 +164,7 @@ func (p *Pool) getWorker() *Worker { //} w = &Worker{ pool: p, - task: make(chan f, workerArgsCap), + task: make(chan f), } w.run() //p.workerPool.Put(w) From 5326374a2255449d5b3bc4c9dcb7f541310409c2 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Thu, 24 May 2018 23:43:34 +0800 Subject: [PATCH 8/9] Auto stash before merge of "develop" and "origin/develop" --- ants.go | 6 +++--- ants_benchmark_test.go | 6 +++--- ants_test.go | 5 +++-- examples/main.go | 2 +- pool.go | 12 +++++------- pool_func.go | 7 +++---- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/ants.go b/ants.go index c306d41..3573b1c 100644 --- a/ants.go +++ b/ants.go @@ -39,9 +39,9 @@ const ( // Init a instance pool when importing ants var defaultPool, _ = NewPool(DefaultPoolSize) -// Push submit a task to pool -func Push(task f) error { - return defaultPool.Push(task) +// Submit submit a task to pool +func Submit(task f) error { + return defaultPool.Submit(task) } // Running returns the number of the currently running goroutines diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0d6e0c6..06fdcd3 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -44,8 +44,9 @@ const ( const RunTimes = 10000000 const loop = 10 -func demoFunc() { +func demoFunc() error { time.Sleep(loop * time.Millisecond) + return nil } func demoPoolFunc(args interface{}) error { @@ -101,10 +102,9 @@ func BenchmarkGoroutine(b *testing.B) { } func BenchmarkAntsPool(b *testing.B) { - b.N = 3 for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { - ants.Push(demoFunc) + ants.Submit(demoFunc) } b.Logf("running goroutines: %d", ants.Running()) } diff --git a/ants_test.go b/ants_test.go index 451b5b3..71e4493 100644 --- a/ants_test.go +++ b/ants_test.go @@ -36,9 +36,10 @@ func TestDefaultPool(t *testing.T) { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) - ants.Push(func() { + ants.Submit(func() error { demoFunc() wg.Done() + return nil }) } wg.Wait() @@ -111,7 +112,7 @@ func TestNoPool(t *testing.T) { // var wg sync.WaitGroup // for i := 0; i < n; i++ { // wg.Add(1) -// p.Push(func() { +// p.Submit(func() { // demoFunc() // //demoFunc() // wg.Done() diff --git a/examples/main.go b/examples/main.go index b1e6602..141029b 100644 --- a/examples/main.go +++ b/examples/main.go @@ -45,7 +45,7 @@ func myFunc(i interface{}) error { // // submit all your tasks to ants pool // for i := 0; i < runTimes; i++ { // wg.Add(1) -// ants.Push(func() { +// ants.Submit(func() { // myFunc() // wg.Done() // }) diff --git a/pool.go b/pool.go index b7b58ef..a080872 100644 --- a/pool.go +++ b/pool.go @@ -30,9 +30,9 @@ import ( type sig struct{} -type f func() +type f func() error -// Pool accept the tasks from client,it will limit the total +// Pool accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. @@ -48,12 +48,10 @@ type Pool struct { // workers is a slice that store the available workers. workers []*Worker - // workerPool is a pool that saves a set of temporary objects. - workerPool sync.Pool - // release is used to notice the pool to closed itself. release chan sig + // lock for synchronous operation lock sync.Mutex once sync.Once @@ -75,8 +73,8 @@ func NewPool(size int) (*Pool, error) { //------------------------------------------------------------------------- -// Push submit a task to pool -func (p *Pool) Push(task f) error { +// Submit submit a task to pool +func (p *Pool) Submit(task f) error { if len(p.release) > 0 { return ErrPoolClosed } diff --git a/pool_func.go b/pool_func.go index a8cb357..b236cf9 100644 --- a/pool_func.go +++ b/pool_func.go @@ -30,7 +30,7 @@ import ( type pf func(interface{}) error -// PoolWithFunc accept the tasks from client,it will limit the total +// PoolWithFunc accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. @@ -46,14 +46,13 @@ type PoolWithFunc struct { // workers is a slice that store the available workers. workers []*WorkerWithFunc - // workerPool is a pool that saves a set of temporary objects. - //workerPool sync.Pool - // release is used to notice the pool to closed itself. release chan sig + // lock for synchronous operation lock sync.Mutex + // pf is the function for processing tasks poolFunc pf once sync.Once From 4b806f461bafb5b087201504ca34d2e0174131bd Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Fri, 25 May 2018 00:43:53 +0800 Subject: [PATCH 9/9] change some comments --- worker.go | 12 ++++++------ worker_func.go | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/worker.go b/worker.go index ed941b7..5b8426b 100644 --- a/worker.go +++ b/worker.go @@ -26,9 +26,9 @@ import ( "sync/atomic" ) -// Worker is the actual executor who run the tasks, -// it will start a goroutine that accept tasks and -// perform function calls. +// Worker is the actual executor who runs the tasks, +// it starts a goroutine that accepts tasks and +// performs function calls. type Worker struct { // pool who owns this worker. pool *Pool @@ -37,8 +37,8 @@ type Worker struct { task chan f } -// run will start a goroutine to repeat the process -// that perform the function calls. +// run starts a goroutine to repeat the process +// that performs the function calls. func (w *Worker) run() { //atomic.AddInt32(&w.pool.running, 1) go func() { @@ -58,7 +58,7 @@ func (w *Worker) stop() { w.task <- nil } -// sendTask send a task to this worker. +// sendTask sends a task to this worker. func (w *Worker) sendTask(task f) { w.task <- task } diff --git a/worker_func.go b/worker_func.go index 3d07f3d..b8d59ec 100644 --- a/worker_func.go +++ b/worker_func.go @@ -26,9 +26,9 @@ import ( "sync/atomic" ) -// Worker is the actual executor who run the tasks, -// it will start a goroutine that accept tasks and -// perform function calls. +// WorkerWithFunc is the actual executor who runs the tasks, +// it starts a goroutine that accepts tasks and +// performs function calls. type WorkerWithFunc struct { // pool who owns this worker. pool *PoolWithFunc @@ -37,8 +37,8 @@ type WorkerWithFunc struct { args chan interface{} } -// run will start a goroutine to repeat the process -// that perform the function calls. +// run starts a goroutine to repeat the process +// that performs the function calls. func (w *WorkerWithFunc) run() { //atomic.AddInt32(&w.pool.running, 1) go func() { @@ -58,7 +58,7 @@ func (w *WorkerWithFunc) stop() { w.args <- nil } -// sendTask send a task to this worker. +// sendTask sends a task to this worker. func (w *WorkerWithFunc) sendTask(args interface{}) { w.args <- args }