diff --git a/ants.go b/ants.go index af76b67..c306d41 100644 --- a/ants.go +++ b/ants.go @@ -22,11 +22,15 @@ package ants -import "errors" +import ( + "errors" + "math" + "runtime" +) const ( // DefaultPoolSize is the default capacity for a default goroutine pool - DefaultPoolSize = 50000 + DefaultPoolSize = math.MaxInt32 // DefaultCleanIntervalTime is the interval time to clean up goroutines DefaultCleanIntervalTime = 30 @@ -65,3 +69,10 @@ var ( ErrPoolSizeInvalid = errors.New("invalid size for pool") ErrPoolClosed = errors.New("this pool has been closed") ) + +var workerArgsCap = func() int { + if runtime.GOMAXPROCS(0) == 1 { + return 0 + } + return 1 +}() diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0358dfc..6485b61 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -23,20 +23,32 @@ package ants_test import ( - "github.com/panjf2000/ants" "sync" "testing" + + "github.com/panjf2000/ants" ) -const RunTimes = 10000000 +const RunTimes = 1000000 +const loop = 1000000 + +func demoPoolFunc(args interface{}) error { + m := args.(int) + var n int + for i := 0; i < m; i++ { + n += i + } + return nil +} func BenchmarkGoroutine(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup + b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) go func() { - forSleep() + demoPoolFunc(loop) wg.Done() }() } @@ -44,15 +56,32 @@ func BenchmarkGoroutine(b *testing.B) { } } -func BenchmarkPoolGoroutine(b *testing.B) { +//func BenchmarkAntsPool(b *testing.B) { +// for i := 0; i < b.N; i++ { +// var wg sync.WaitGroup +// for j := 0; j < RunTimes; j++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// } +//} + +func BenchmarkAntsPoolWithFunc(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup + p, _ := ants.NewPoolWithFunc(100000, func(i interface{}) error { + demoPoolFunc(i) + wg.Done() + return nil + }) + b.ResetTimer() for j := 0; j < RunTimes; j++ { wg.Add(1) - ants.Push(func() { - forSleep() - wg.Done() - }) + p.Serve(loop) } wg.Wait() } diff --git a/ants_test.go b/ants_test.go index d1020ab..79a3a57 100644 --- a/ants_test.go +++ b/ants_test.go @@ -23,14 +23,14 @@ package ants_test import ( - "github.com/panjf2000/ants" "runtime" "sync" "testing" - "time" + + "github.com/panjf2000/ants" ) -var n = 1000000 +var n = 100000 //func demoFunc() { // var n int @@ -47,8 +47,70 @@ var n = 1000000 // fmt.Printf("finish task with result:%d\n", n) //} -func forSleep() { - time.Sleep(time.Millisecond) +func demoFunc() { + //time.Sleep(time.Millisecond) + var n int + for i := 0; i < 1000000; i++ { + n += i + } +} + +//func TestDefaultPool(t *testing.T) { +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// ants.Push(func() { +// demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// +// //t.Logf("pool capacity:%d", ants.Cap()) +// //t.Logf("free workers number:%d", ants.Free()) +// +// t.Logf("running workers number:%d", ants.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} + +//func TestNoPool(t *testing.T) { +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// go func() { +// demoFunc() +// wg.Done() +// }() +// } +// +// wg.Wait() +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} + +func TestAntsPoolWithFunc(t *testing.T) { + var wg sync.WaitGroup + p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error { + demoPoolFunc(i) + wg.Done() + return nil + }) + for i := 0; i < n; i++ { + wg.Add(1) + p.Serve(n) + } + wg.Wait() + + //t.Logf("pool capacity:%d", ants.Cap()) + //t.Logf("free workers number:%d", ants.Free()) + + t.Logf("running workers number:%d", p.Running()) + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + t.Logf("memory usage:%d", mem.TotalAlloc/1024) } func TestNoPool(t *testing.T) { @@ -56,8 +118,7 @@ func TestNoPool(t *testing.T) { for i := 0; i < n; i++ { wg.Add(1) go func() { - forSleep() - //demoFunc() + demoPoolFunc(n) wg.Done() }() } @@ -68,45 +129,24 @@ func TestNoPool(t *testing.T) { t.Logf("memory usage:%d", mem.TotalAlloc/1024) } -func TestDefaultPool(t *testing.T) { - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - ants.Push(func() { - forSleep() - //demoFunc() - wg.Done() - }) - } - wg.Wait() - - //t.Logf("pool capacity:%d", ants.Cap()) - //t.Logf("free workers number:%d", ants.Free()) - - t.Logf("running workers number:%d", ants.Running()) - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/1024) -} - -func TestCustomPool(t *testing.T) { - p, _ := ants.NewPool(30000) - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - p.Push(func() { - forSleep() - //demoFunc() - wg.Done() - }) - } - wg.Wait() - - //t.Logf("pool capacity:%d", p.Cap()) - //t.Logf("free workers number:%d", p.Free()) - - t.Logf("running workers number:%d", p.Running()) - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - t.Logf("memory usage:%d", mem.TotalAlloc/1024) -} +//func TestCustomPool(t *testing.T) { +// p, _ := ants.NewPool(30000) +// var wg sync.WaitGroup +// for i := 0; i < n; i++ { +// wg.Add(1) +// p.Push(func() { +// demoFunc() +// //demoFunc() +// wg.Done() +// }) +// } +// wg.Wait() +// +// //t.Logf("pool capacity:%d", p.Cap()) +// //t.Logf("free workers number:%d", p.Free()) +// +// t.Logf("running workers number:%d", p.Running()) +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// t.Logf("memory usage:%d", mem.TotalAlloc/1024) +//} diff --git a/pool.go b/pool.go index 704571c..58b2dd2 100644 --- a/pool.go +++ b/pool.go @@ -172,25 +172,24 @@ func (p *Pool) getWorker() *Worker { p.lock.Unlock() break } - } else { + } else if w == nil { wp := p.workerPool.Get() if wp == nil { w = &Worker{ pool: p, - task: make(chan f), + task: make(chan f, workerArgsCap), } - w.run() - atomic.AddInt32(&p.running, 1) } else { w = wp.(*Worker) } + w.run() + p.workerPool.Put(w) } return w } // putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { - p.workerPool.Put(worker) p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() diff --git a/pool_func.go b/pool_func.go new file mode 100644 index 0000000..7223bfa --- /dev/null +++ b/pool_func.go @@ -0,0 +1,198 @@ +// MIT License + +// Copyright (c) 2018 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "math" + "sync" + "sync/atomic" + "time" +) + +type pf func(interface{}) error + +// PoolWithFunc accept the tasks from client,it will limit the total +// of goroutines to a given number by recycling goroutines. +type PoolWithFunc struct { + // capacity of the pool. + capacity int32 + + // running is the number of the currently running goroutines. + running int32 + + // signal is used to notice pool there are available + // workers which can be sent to work. + freeSignal chan sig + + // workers is a slice that store the available workers. + workers []*WorkerWithFunc + + // workerPool is a pool that saves a set of temporary objects. + workerPool sync.Pool + + // release is used to notice the pool to closed itself. + release chan sig + + lock sync.Mutex + + // closed is used to confirm whether this pool has been closed. + closed int32 + + poolFunc pf +} + +// NewPoolWithFunc generates a instance of ants pool with a specific function. +func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { + if size <= 0 { + return nil, ErrPoolSizeInvalid + } + p := &PoolWithFunc{ + capacity: int32(size), + freeSignal: make(chan sig, math.MaxInt32), + release: make(chan sig), + closed: 0, + poolFunc: f, + } + + return p, nil +} + +//------------------------------------------------------------------------- + +// scanAndClean is a goroutine who will periodically clean up +// after it is noticed that this pool is closed. +func (p *PoolWithFunc) scanAndClean() { + ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) + go func() { + ticker.Stop() + for range ticker.C { + if atomic.LoadInt32(&p.closed) == 1 { + p.lock.Lock() + for _, w := range p.workers { + w.stop() + } + p.lock.Unlock() + } + } + }() +} + +// Serve submit a task to pool +func (p *PoolWithFunc) Serve(args interface{}) error { + if atomic.LoadInt32(&p.closed) == 1 { + return ErrPoolClosed + } + w := p.getWorker() + w.sendTask(args) + return nil +} + +// Running returns the number of the currently running goroutines +func (p *PoolWithFunc) Running() int { + return int(atomic.LoadInt32(&p.running)) +} + +// Free returns the available goroutines to work +func (p *PoolWithFunc) Free() int { + return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) +} + +// Cap returns the capacity of this pool +func (p *PoolWithFunc) Cap() int { + return int(atomic.LoadInt32(&p.capacity)) +} + +// Release Closed this pool +func (p *PoolWithFunc) Release() error { + p.lock.Lock() + atomic.StoreInt32(&p.closed, 1) + close(p.release) + p.lock.Unlock() + return nil +} + +// ReSize change the capacity of this pool +func (p *PoolWithFunc) ReSize(size int) { + atomic.StoreInt32(&p.capacity, int32(size)) +} + +//------------------------------------------------------------------------- + +// getWorker returns a available worker to run the tasks. +func (p *PoolWithFunc) getWorker() *WorkerWithFunc { + var w *WorkerWithFunc + waiting := false + + p.lock.Lock() + workers := p.workers + n := len(workers) - 1 + if n < 0 { + if p.running >= p.capacity { + waiting = true + } + } else { + w = workers[n] + workers[n] = nil + p.workers = workers[:n] + } + p.lock.Unlock() + + if waiting { + <-p.freeSignal + for { + p.lock.Lock() + workers = p.workers + l := len(workers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = workers[l] + workers[l] = nil + p.workers = workers[:l] + p.lock.Unlock() + break + } + } else if w == nil { + wp := p.workerPool.Get() + if wp == nil { + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}, workerArgsCap), + } + } else { + w = wp.(*WorkerWithFunc) + } + w.run() + p.workerPool.Put(w) + } + return w +} + +// putWorker puts a worker back into free pool, recycling the goroutines. +func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { + p.lock.Lock() + p.workers = append(p.workers, worker) + p.lock.Unlock() + p.freeSignal <- sig{} +} diff --git a/worker.go b/worker.go index d46ffa4..601836c 100644 --- a/worker.go +++ b/worker.go @@ -40,6 +40,7 @@ type Worker struct { // run will start a goroutine to repeat the process // that perform the function calls. func (w *Worker) run() { + atomic.AddInt32(&w.pool.running, 1) go func() { for f := range w.task { if f == nil { diff --git a/worker_func.go b/worker_func.go new file mode 100644 index 0000000..87dd6a4 --- /dev/null +++ b/worker_func.go @@ -0,0 +1,64 @@ +// MIT License + +// Copyright (c) 2018 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "sync/atomic" +) + +// Worker is the actual executor who run the tasks, +// it will start a goroutine that accept tasks and +// perform function calls. +type WorkerWithFunc struct { + // pool who owns this worker. + pool *PoolWithFunc + + // args is a job should be done. + args chan interface{} +} + +// run will start a goroutine to repeat the process +// that perform the function calls. +func (w *WorkerWithFunc) run() { + atomic.AddInt32(&w.pool.running, 1) + go func() { + for args := range w.args { + if args == nil { + atomic.AddInt32(&w.pool.running, -1) + return + } + w.pool.poolFunc(args) + w.pool.putWorker(w) + } + }() +} + +// stop this worker. +func (w *WorkerWithFunc) stop() { + w.args <- nil +} + +// sendTask send a task to this worker. +func (w *WorkerWithFunc) sendTask(args interface{}) { + w.args <- args +}