diff --git a/ants.go b/ants.go index c306d41..3573b1c 100644 --- a/ants.go +++ b/ants.go @@ -39,9 +39,9 @@ const ( // Init a instance pool when importing ants var defaultPool, _ = NewPool(DefaultPoolSize) -// Push submit a task to pool -func Push(task f) error { - return defaultPool.Push(task) +// Submit submit a task to pool +func Submit(task f) error { + return defaultPool.Submit(task) } // Running returns the number of the currently running goroutines diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index f04d158..06fdcd3 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -23,7 +23,6 @@ package ants_test import ( - "runtime" "sync" "testing" "time" @@ -33,68 +32,38 @@ import ( const ( _ = 1 << (10 * iota) - KiB // 1024 - MiB // 1048576 - GiB // 1073741824 - TiB // 1099511627776 (超过了int32的范围) - PiB // 1125899906842624 - EiB // 1152921504606846976 - ZiB // 1180591620717411303424 (超过了int64的范围) - YiB // 1208925819614629174706176 + KiB // 1024 + MiB // 1048576 + GiB // 1073741824 + TiB // 1099511627776 (超过了int32的范围) + PiB // 1125899906842624 + EiB // 1152921504606846976 + ZiB // 1180591620717411303424 (超过了int64的范围) + YiB // 1208925819614629174706176 ) const RunTimes = 10000000 -const loop = 5 +const loop = 10 + +func demoFunc() error { + time.Sleep(loop * time.Millisecond) + return nil +} func demoPoolFunc(args interface{}) error { - // m := args.(int) - // var n int - // for i := 0; i < m; i++ { - // n += i - // } - // return nil + //m := args.(int) + //var n int + //for i := 0; i < m; i++ { + // n += i + //} + //return nil n := args.(int) time.Sleep(time.Duration(n) * time.Millisecond) return nil } -func BenchmarkGoroutine(b *testing.B) { - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - for j := 0; j < RunTimes; j++ { - wg.Add(1) - go func() { - demoFunc() - wg.Done() - }() - } - wg.Wait() - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) -} - -func BenchmarkAntsPool(b *testing.B) { - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - for j := 0; j < RunTimes; j++ { - wg.Add(1) - ants.Push(func() { - demoFunc() - wg.Done() - }) - } - wg.Wait() - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) -} - func BenchmarkGoroutineWithFunc(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup - b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { @@ -104,9 +73,6 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { } wg.Wait() } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkAntsPoolWithFunc(b *testing.B) { @@ -117,14 +83,29 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { wg.Done() return nil }) - b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) p.Serve(loop) } wg.Wait() + b.Logf("running goroutines: %d", p.Running()) + } +} + +func BenchmarkGoroutine(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := 0; j < RunTimes; j++ { + go demoFunc() + } + } + +} + +func BenchmarkAntsPool(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := 0; j < RunTimes; j++ { + ants.Submit(demoFunc) + } + b.Logf("running goroutines: %d", ants.Running()) } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } diff --git a/ants_test.go b/ants_test.go index 74c95e9..71e4493 100644 --- a/ants_test.go +++ b/ants_test.go @@ -24,7 +24,6 @@ package ants_test import ( "runtime" - "time" "sync" "testing" @@ -33,36 +32,14 @@ import ( var n = 10000000 -//func demoFunc() { -// var n int -// for i := 0; i < 1000000; i++ { -// n += i -// } -//} - -//func demoFunc() { -// var n int -// for i := 0; i < 10000; i++ { -// n += i -// } -// fmt.Printf("finish task with result:%d\n", n) -//} - -func demoFunc() { - time.Sleep(10 * time.Millisecond) - // var n int - // for i := 0; i < 1000000; i++ { - // n += i - // } -} - func TestDefaultPool(t *testing.T) { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) - ants.Push(func() { + ants.Submit(func() error { demoFunc() wg.Done() + return nil }) } wg.Wait() @@ -73,7 +50,7 @@ func TestDefaultPool(t *testing.T) { t.Logf("running workers number:%d", ants.Running()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/MiB) + t.Logf("memory usage:%d MB", mem.TotalAlloc/MiB) } func TestNoPool(t *testing.T) { @@ -89,30 +66,30 @@ func TestNoPool(t *testing.T) { wg.Wait() mem := runtime.MemStats{} runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/MiB) + t.Logf("memory usage:%d MB", mem.TotalAlloc/MiB) } -func TestAntsPoolWithFunc(t *testing.T) { - var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { - demoPoolFunc(i) - wg.Done() - return nil - }) - for i := 0; i < n; i++ { - wg.Add(1) - p.Serve(n) - } - wg.Wait() +// func TestAntsPoolWithFunc(t *testing.T) { +// var wg sync.WaitGroup +// p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { +// demoPoolFunc(i) +// wg.Done() +// return nil +// }) +// for i := 0; i < n; i++ { +// wg.Add(1) +// p.Serve(n) +// } +// wg.Wait() - //t.Logf("pool capacity:%d", ants.Cap()) - //t.Logf("free workers number:%d", ants.Free()) +// //t.Logf("pool capacity:%d", ants.Cap()) +// //t.Logf("free workers number:%d", ants.Free()) - t.Logf("running workers number:%d", p.Running()) - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/GiB) -} +// t.Logf("running workers number:%d", p.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/GiB) +// } // func TestNoPool(t *testing.T) { // var wg sync.WaitGroup @@ -135,7 +112,7 @@ func TestAntsPoolWithFunc(t *testing.T) { // var wg sync.WaitGroup // for i := 0; i < n; i++ { // wg.Add(1) -// p.Push(func() { +// p.Submit(func() { // demoFunc() // //demoFunc() // wg.Done() diff --git a/examples/main.go b/examples/main.go index 9c5bb92..141029b 100644 --- a/examples/main.go +++ b/examples/main.go @@ -25,15 +25,17 @@ package main import ( "fmt" "sync" + "sync/atomic" "github.com/panjf2000/ants" ) -var str = "Hello World!" +var sum int32 func myFunc(i interface{}) error { - s := i.(string) - fmt.Println(s) + n := i.(int) + atomic.AddInt32(&sum, int32(n)) + fmt.Printf("run with %d\n", n) return nil } @@ -43,7 +45,7 @@ func myFunc(i interface{}) error { // // submit all your tasks to ants pool // for i := 0; i < runTimes; i++ { // wg.Add(1) -// ants.Push(func() { +// ants.Submit(func() { // myFunc() // wg.Done() // }) @@ -58,7 +60,7 @@ func main() { // set 100 the size of goroutine pool var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(100, func(i interface{}) error { + p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error { myFunc(i) wg.Done() return nil @@ -66,8 +68,14 @@ func main() { // submit for i := 0; i < runTimes; i++ { wg.Add(1) - p.Serve(str) + p.Serve(i) } wg.Wait() - fmt.Println("finish all tasks!") + //var m int + //var i int + //for n := range sum { + // m += n + //} + fmt.Printf("running goroutines: %d\n", p.Running()) + fmt.Printf("finish all tasks, result is %d\n", sum) } diff --git a/pool.go b/pool.go index 113027f..a080872 100644 --- a/pool.go +++ b/pool.go @@ -26,14 +26,13 @@ import ( "math" "sync" "sync/atomic" - "time" ) type sig struct{} -type f func() +type f func() error -// Pool accept the tasks from client,it will limit the total +// Pool accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. @@ -49,16 +48,13 @@ type Pool struct { // workers is a slice that store the available workers. workers []*Worker - // workerPool is a pool that saves a set of temporary objects. - workerPool sync.Pool - // release is used to notice the pool to closed itself. release chan sig + // lock for synchronous operation lock sync.Mutex - // closed is used to confirm whether this pool has been closed. - closed int32 + once sync.Once } // NewPool generates a instance of ants pool @@ -69,8 +65,7 @@ func NewPool(size int) (*Pool, error) { p := &Pool{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - release: make(chan sig), - closed: 0, + release: make(chan sig, 1), } return p, nil @@ -78,27 +73,9 @@ func NewPool(size int) (*Pool, error) { //------------------------------------------------------------------------- -// scanAndClean is a goroutine who will periodically clean up -// after it is noticed that this pool is closed. -func (p *Pool) scanAndClean() { - ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) - go func() { - ticker.Stop() - for range ticker.C { - if atomic.LoadInt32(&p.closed) == 1 { - p.lock.Lock() - for _, w := range p.workers { - w.stop() - } - p.lock.Unlock() - } - } - }() -} - -// Push submit a task to pool -func (p *Pool) Push(task f) error { - if atomic.LoadInt32(&p.closed) == 1 { +// Submit submit a task to pool +func (p *Pool) Submit(task f) error { + if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() @@ -123,10 +100,9 @@ func (p *Pool) Cap() int { // Release Closed this pool func (p *Pool) Release() error { - p.lock.Lock() - atomic.StoreInt32(&p.closed, 1) - close(p.release) - p.lock.Unlock() + p.once.Do(func() { + p.release <- sig{} + }) return nil } @@ -148,6 +124,8 @@ func (p *Pool) getWorker() *Worker { if n < 0 { if p.running >= p.capacity { waiting = true + } else { + p.running++ } } else { w = workers[n] @@ -173,17 +151,21 @@ func (p *Pool) getWorker() *Worker { break } } else if w == nil { - wp := p.workerPool.Get() - if wp == nil { - w = &Worker{ - pool: p, - task: make(chan f, workerArgsCap), - } - } else { - w = wp.(*Worker) + //wp := p.workerPool.Get() + //if wp == nil { + // w = &Worker{ + // pool: p, + // task: make(chan f, workerArgsCap), + // } + //} else { + // w = wp.(*Worker) + //} + w = &Worker{ + pool: p, + task: make(chan f), } w.run() - p.workerPool.Put(w) + //p.workerPool.Put(w) } return w } diff --git a/pool_func.go b/pool_func.go index 0e888e5..b236cf9 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,12 +26,11 @@ import ( "math" "sync" "sync/atomic" - "time" ) type pf func(interface{}) error -// PoolWithFunc accept the tasks from client,it will limit the total +// PoolWithFunc accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. @@ -47,18 +46,16 @@ type PoolWithFunc struct { // workers is a slice that store the available workers. workers []*WorkerWithFunc - // workerPool is a pool that saves a set of temporary objects. - workerPool sync.Pool - // release is used to notice the pool to closed itself. release chan sig + // lock for synchronous operation lock sync.Mutex - // closed is used to confirm whether this pool has been closed. - closed int32 - + // pf is the function for processing tasks poolFunc pf + + once sync.Once } // NewPoolWithFunc generates a instance of ants pool with a specific function. @@ -69,8 +66,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { p := &PoolWithFunc{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - release: make(chan sig), - closed: 0, + release: make(chan sig, 1), poolFunc: f, } @@ -79,27 +75,12 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { //------------------------------------------------------------------------- -// scanAndClean is a goroutine who will periodically clean up -// after it is noticed that this pool is closed. -func (p *PoolWithFunc) scanAndClean() { - ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) - go func() { - ticker.Stop() - for range ticker.C { - if atomic.LoadInt32(&p.closed) == 1 { - p.lock.Lock() - for _, w := range p.workers { - w.stop() - } - p.lock.Unlock() - } - } - }() -} - // Serve submit a task to pool func (p *PoolWithFunc) Serve(args interface{}) error { - if atomic.LoadInt32(&p.closed) == 1 { + //if atomic.LoadInt32(&p.closed) == 1 { + // return ErrPoolClosed + //} + if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() @@ -124,10 +105,9 @@ func (p *PoolWithFunc) Cap() int { // Release Closed this pool func (p *PoolWithFunc) Release() error { - p.lock.Lock() - atomic.StoreInt32(&p.closed, 1) - close(p.release) - p.lock.Unlock() + p.once.Do(func() { + p.release <- sig{} + }) return nil } @@ -149,6 +129,8 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { if n < 0 { if p.running >= p.capacity { waiting = true + } else { + p.running++ } } else { w = workers[n] @@ -174,23 +156,28 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { break } } else if w == nil { - wp := p.workerPool.Get() - if wp == nil { - w = &WorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerArgsCap), - } - } else { - w = wp.(*WorkerWithFunc) + //wp := p.workerPool.Get() + //if wp == nil { + // w = &WorkerWithFunc{ + // pool: p, + // args: make(chan interface{}, workerArgsCap), + // } + //} else { + // w = wp.(*WorkerWithFunc) + //} + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}), } w.run() - p.workerPool.Put(w) + //p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + //p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/worker.go b/worker.go index 601836c..5b8426b 100644 --- a/worker.go +++ b/worker.go @@ -26,9 +26,9 @@ import ( "sync/atomic" ) -// Worker is the actual executor who run the tasks, -// it will start a goroutine that accept tasks and -// perform function calls. +// Worker is the actual executor who runs the tasks, +// it starts a goroutine that accepts tasks and +// performs function calls. type Worker struct { // pool who owns this worker. pool *Pool @@ -37,13 +37,13 @@ type Worker struct { task chan f } -// run will start a goroutine to repeat the process -// that perform the function calls. +// run starts a goroutine to repeat the process +// that performs the function calls. func (w *Worker) run() { - atomic.AddInt32(&w.pool.running, 1) + //atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { - if f == nil { + if f == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } @@ -58,7 +58,7 @@ func (w *Worker) stop() { w.task <- nil } -// sendTask send a task to this worker. +// sendTask sends a task to this worker. func (w *Worker) sendTask(task f) { w.task <- task } diff --git a/worker_func.go b/worker_func.go index 87dd6a4..b8d59ec 100644 --- a/worker_func.go +++ b/worker_func.go @@ -26,9 +26,9 @@ import ( "sync/atomic" ) -// Worker is the actual executor who run the tasks, -// it will start a goroutine that accept tasks and -// perform function calls. +// WorkerWithFunc is the actual executor who runs the tasks, +// it starts a goroutine that accepts tasks and +// performs function calls. type WorkerWithFunc struct { // pool who owns this worker. pool *PoolWithFunc @@ -37,13 +37,13 @@ type WorkerWithFunc struct { args chan interface{} } -// run will start a goroutine to repeat the process -// that perform the function calls. +// run starts a goroutine to repeat the process +// that performs the function calls. func (w *WorkerWithFunc) run() { - atomic.AddInt32(&w.pool.running, 1) + //atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { - if args == nil { + if args == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } @@ -58,7 +58,7 @@ func (w *WorkerWithFunc) stop() { w.args <- nil } -// sendTask send a task to this worker. +// sendTask sends a task to this worker. func (w *WorkerWithFunc) sendTask(args interface{}) { w.args <- args }