diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 188a926..e22a4a1 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -28,7 +28,16 @@ import ( "testing" ) -const RunTimes = 10000000 +const RunTimes = 1000000 + +func demoPoolFunc(args interface{}) error { + m := args.(int) + var n int + for i := 0; i < m; i++ { + n += i + } + return nil +} func BenchmarkGoroutine(b *testing.B) { for i := 0; i < b.N; i++ { @@ -36,7 +45,7 @@ func BenchmarkGoroutine(b *testing.B) { for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { - demoFunc() + demoPoolFunc(RunTimes) wg.Done() }() } @@ -44,15 +53,27 @@ func BenchmarkGoroutine(b *testing.B) { } } -func BenchmarkPoolGoroutine(b *testing.B) { +//func BenchmarkAntsPool(b *testing.B) { +// for i := 0; i < b.N; i++ { +// var wg sync.WaitGroup +// for j := 0; j < RunTimes; j++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// } +//} + +func BenchmarkAntsPoolWithFunc(b *testing.B) { + p, _ := ants.NewPoolWithFunc(100000, demoPoolFunc) for i := 0; i < b.N; i++ { var wg sync.WaitGroup for j := 0; j < RunTimes; j++ { wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) + p.Serve(RunTimes) } wg.Wait() } diff --git a/ants_test.go b/ants_test.go index ff05806..be18314 100644 --- a/ants_test.go +++ b/ants_test.go @@ -29,7 +29,7 @@ import ( "testing" ) -var n = 10000000 +var n = 1000000 //func demoFunc() { // var n int @@ -54,21 +54,59 @@ func demoFunc() { } } -func TestDefaultPool(t *testing.T) { +//func TestDefaultPool(t *testing.T) { +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// +// //t.Logf("pool capacity:%d", ants.Cap()) +// //t.Logf("free workers number:%d", ants.Free()) +// +// t.Logf("running workers number:%d", ants.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} + +//func TestNoPool(t *testing.T) { +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// go func() { +// demoFunc() +// wg.Done() +// }() +// } +// +// wg.Wait() +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} + +func TestAntsPoolWithFunc(t *testing.T) { var wg sync.WaitGroup + p, _ := ants.NewPoolWithFunc(100000, func(i interface{}) error { + demoPoolFunc(i) + wg.Done() + return nil + }) for i := 0; i < n; i++ { wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) + p.Serve(n) } wg.Wait() //t.Logf("pool capacity:%d", ants.Cap()) //t.Logf("free workers number:%d", ants.Free()) - t.Logf("running workers number:%d", ants.Running()) + t.Logf("running workers number:%d", p.Running()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) t.Logf("memory usage:%d", mem.TotalAlloc/1024) @@ -79,7 +117,7 @@ func TestNoPool(t *testing.T) { for i := 0; i < n; i++ { wg.Add(1) go func() { - demoFunc() + demoPoolFunc(n) wg.Done() }() } diff --git a/pool_func.go b/pool_func.go index 83b1cd4..c74eec8 100644 --- a/pool_func.go +++ b/pool_func.go @@ -81,7 +81,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { // scanAndClean is a goroutine who will periodically clean up // after it is noticed that this pool is closed. -func (p *PoolWithFunc) scanAndCleanWithFunc() { +func (p *PoolWithFunc) scanAndClean() { ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) go func() { ticker.Stop() @@ -89,7 +89,7 @@ func (p *PoolWithFunc) scanAndCleanWithFunc() { if atomic.LoadInt32(&p.closed) == 1 { p.lock.Lock() for _, w := range p.workers { - w.stopWithFunc() + w.stop() } p.lock.Unlock() } @@ -102,28 +102,28 @@ func (p *PoolWithFunc) Serve(args interface{}) error { if atomic.LoadInt32(&p.closed) == 1 { return ErrPoolClosed } - w := p.getWorkerWithFunc() - w.sendTaskWithFunc(args) + w := p.getWorker() + w.sendTask(args) return nil } // Running returns the number of the currently running goroutines -func (p *PoolWithFunc) RunningWithFunc() int { +func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } // Free returns the available goroutines to work -func (p *PoolWithFunc) FreeWithFunc() int { +func (p *PoolWithFunc) Free() int { return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) } // Cap returns the capacity of this pool -func (p *PoolWithFunc) CapWithFunc() int { +func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } // Release Closed this pool -func (p *PoolWithFunc) ReleaseWithFunc() error { +func (p *PoolWithFunc) Release() error { p.lock.Lock() atomic.StoreInt32(&p.closed, 1) close(p.release) @@ -132,14 +132,14 @@ func (p *PoolWithFunc) ReleaseWithFunc() error { } // ReSize change the capacity of this pool -func (p *PoolWithFunc) ReSizeWithFunc(size int) { +func (p *PoolWithFunc) ReSize(size int) { atomic.StoreInt32(&p.capacity, int32(size)) } //------------------------------------------------------------------------- // getWorker returns a available worker to run the tasks. -func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { +func (p *PoolWithFunc) getWorker() *WorkerWithFunc { var w *WorkerWithFunc waiting := false @@ -180,7 +180,7 @@ func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { pool: p, args: make(chan interface{}), } - w.runWithFunc() + w.run() atomic.AddInt32(&p.running, 1) } else { w = wp.(*WorkerWithFunc) @@ -190,7 +190,7 @@ func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { } // putWorker puts a worker back into free pool, recycling the goroutines. -func (p *PoolWithFunc) putWorkerWithFunc(worker *WorkerWithFunc) { +func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) diff --git a/worker_func.go b/worker_func.go index 5deab61..47bccdb 100644 --- a/worker_func.go +++ b/worker_func.go @@ -39,7 +39,7 @@ type WorkerWithFunc struct { // run will start a goroutine to repeat the process // that perform the function calls. -func (w *WorkerWithFunc) runWithFunc() { +func (w *WorkerWithFunc) run() { go func() { for args := range w.args { if args == nil { @@ -47,17 +47,17 @@ func (w *WorkerWithFunc) runWithFunc() { return } w.pool.poolFunc(args) - w.pool.putWorkerWithFunc(w) + w.pool.putWorker(w) } }() } // stop this worker. -func (w *WorkerWithFunc) stopWithFunc() { +func (w *WorkerWithFunc) stop() { w.args <- nil } // sendTask send a task to this worker. -func (w *WorkerWithFunc) sendTaskWithFunc(args interface{}) { +func (w *WorkerWithFunc) sendTask(args interface{}) { w.args <- args }