optimization

This commit is contained in:
Andy Pan 2018-05-22 12:01:00 +08:00
parent fad443d7d8
commit 967b0b04e2
4 changed files with 90 additions and 31 deletions

View File

@ -28,7 +28,16 @@ import (
"testing"
)
const RunTimes = 10000000
const RunTimes = 1000000
func demoPoolFunc(args interface{}) error {
m := args.(int)
var n int
for i := 0; i < m; i++ {
n += i
}
return nil
}
func BenchmarkGoroutine(b *testing.B) {
for i := 0; i < b.N; i++ {
@ -36,7 +45,7 @@ func BenchmarkGoroutine(b *testing.B) {
for j := 0; j < RunTimes; j++ {
wg.Add(1)
go func() {
demoFunc()
demoPoolFunc(RunTimes)
wg.Done()
}()
}
@ -44,15 +53,27 @@ func BenchmarkGoroutine(b *testing.B) {
}
}
func BenchmarkPoolGoroutine(b *testing.B) {
//func BenchmarkAntsPool(b *testing.B) {
// for i := 0; i < b.N; i++ {
// var wg sync.WaitGroup
// for j := 0; j < RunTimes; j++ {
// wg.Add(1)
// ants.Push(func() {
// demoFunc()
// wg.Done()
// })
// }
// wg.Wait()
// }
//}
func BenchmarkAntsPoolWithFunc(b *testing.B) {
p, _ := ants.NewPoolWithFunc(100000, demoPoolFunc)
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
for j := 0; j < RunTimes; j++ {
wg.Add(1)
ants.Push(func() {
demoFunc()
wg.Done()
})
p.Serve(RunTimes)
}
wg.Wait()
}

View File

@ -29,7 +29,7 @@ import (
"testing"
)
var n = 10000000
var n = 1000000
//func demoFunc() {
// var n int
@ -54,21 +54,59 @@ func demoFunc() {
}
}
func TestDefaultPool(t *testing.T) {
//func TestDefaultPool(t *testing.T) {
// var wg sync.WaitGroup
// for i := 0; i < n; i++ {
// wg.Add(1)
// ants.Push(func() {
// demoFunc()
// wg.Done()
// })
// }
// wg.Wait()
//
// //t.Logf("pool capacity:%d", ants.Cap())
// //t.Logf("free workers number:%d", ants.Free())
//
// t.Logf("running workers number:%d", ants.Running())
// mem := runtime.MemStats{}
// runtime.ReadMemStats(&mem)
// t.Logf("memory usage:%d", mem.TotalAlloc/1024)
//}
//func TestNoPool(t *testing.T) {
// var wg sync.WaitGroup
// for i := 0; i < n; i++ {
// wg.Add(1)
// go func() {
// demoFunc()
// wg.Done()
// }()
// }
//
// wg.Wait()
// mem := runtime.MemStats{}
// runtime.ReadMemStats(&mem)
// t.Logf("memory usage:%d", mem.TotalAlloc/1024)
//}
func TestAntsPoolWithFunc(t *testing.T) {
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(100000, func(i interface{}) error {
demoPoolFunc(i)
wg.Done()
return nil
})
for i := 0; i < n; i++ {
wg.Add(1)
ants.Push(func() {
demoFunc()
wg.Done()
})
p.Serve(n)
}
wg.Wait()
//t.Logf("pool capacity:%d", ants.Cap())
//t.Logf("free workers number:%d", ants.Free())
t.Logf("running workers number:%d", ants.Running())
t.Logf("running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
t.Logf("memory usage:%d", mem.TotalAlloc/1024)
@ -79,7 +117,7 @@ func TestNoPool(t *testing.T) {
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
demoFunc()
demoPoolFunc(n)
wg.Done()
}()
}

View File

@ -81,7 +81,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {
// scanAndClean is a goroutine who will periodically clean up
// after it is noticed that this pool is closed.
func (p *PoolWithFunc) scanAndCleanWithFunc() {
func (p *PoolWithFunc) scanAndClean() {
ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second)
go func() {
ticker.Stop()
@ -89,7 +89,7 @@ func (p *PoolWithFunc) scanAndCleanWithFunc() {
if atomic.LoadInt32(&p.closed) == 1 {
p.lock.Lock()
for _, w := range p.workers {
w.stopWithFunc()
w.stop()
}
p.lock.Unlock()
}
@ -102,28 +102,28 @@ func (p *PoolWithFunc) Serve(args interface{}) error {
if atomic.LoadInt32(&p.closed) == 1 {
return ErrPoolClosed
}
w := p.getWorkerWithFunc()
w.sendTaskWithFunc(args)
w := p.getWorker()
w.sendTask(args)
return nil
}
// Running returns the number of the currently running goroutines
func (p *PoolWithFunc) RunningWithFunc() int {
func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running))
}
// Free returns the available goroutines to work
func (p *PoolWithFunc) FreeWithFunc() int {
func (p *PoolWithFunc) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
}
// Cap returns the capacity of this pool
func (p *PoolWithFunc) CapWithFunc() int {
func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}
// Release Closed this pool
func (p *PoolWithFunc) ReleaseWithFunc() error {
func (p *PoolWithFunc) Release() error {
p.lock.Lock()
atomic.StoreInt32(&p.closed, 1)
close(p.release)
@ -132,14 +132,14 @@ func (p *PoolWithFunc) ReleaseWithFunc() error {
}
// ReSize change the capacity of this pool
func (p *PoolWithFunc) ReSizeWithFunc(size int) {
func (p *PoolWithFunc) ReSize(size int) {
atomic.StoreInt32(&p.capacity, int32(size))
}
//-------------------------------------------------------------------------
// getWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc {
func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
var w *WorkerWithFunc
waiting := false
@ -180,7 +180,7 @@ func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc {
pool: p,
args: make(chan interface{}),
}
w.runWithFunc()
w.run()
atomic.AddInt32(&p.running, 1)
} else {
w = wp.(*WorkerWithFunc)
@ -190,7 +190,7 @@ func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc {
}
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) putWorkerWithFunc(worker *WorkerWithFunc) {
func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
p.workerPool.Put(worker)
p.lock.Lock()
p.workers = append(p.workers, worker)

View File

@ -39,7 +39,7 @@ type WorkerWithFunc struct {
// run will start a goroutine to repeat the process
// that perform the function calls.
func (w *WorkerWithFunc) runWithFunc() {
func (w *WorkerWithFunc) run() {
go func() {
for args := range w.args {
if args == nil {
@ -47,17 +47,17 @@ func (w *WorkerWithFunc) runWithFunc() {
return
}
w.pool.poolFunc(args)
w.pool.putWorkerWithFunc(w)
w.pool.putWorker(w)
}
}()
}
// stop this worker.
func (w *WorkerWithFunc) stopWithFunc() {
func (w *WorkerWithFunc) stop() {
w.args <- nil
}
// sendTask send a task to this worker.
func (w *WorkerWithFunc) sendTaskWithFunc(args interface{}) {
func (w *WorkerWithFunc) sendTask(args interface{}) {
w.args <- args
}