From 1ee81442725df50ef8931c8501f15ce2a18452f3 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 20 May 2018 18:57:11 +0800 Subject: [PATCH] add a new module of cleaning up goroutines --- ants.go | 18 ++++++++++++++-- ants_benchmark_test.go | 3 +-- ants_test.go | 6 ++++-- examples/main.go | 31 +++++++++++++++++++++++++++ pool.go | 48 +++++++++++++++++++++++++++++++----------- 5 files changed, 88 insertions(+), 18 deletions(-) create mode 100644 examples/main.go diff --git a/ants.go b/ants.go index de5494d..8096d06 100644 --- a/ants.go +++ b/ants.go @@ -19,9 +19,14 @@ package ants -const DEFAULT_POOL_SIZE = 50000 +import "github.com/iris-contrib/errors" -var defaultPool = NewPool(DEFAULT_POOL_SIZE) +const ( + DEFAULT_POOL_SIZE = 50000 + DEFAULT_CLEAN_INTERVAL_TIME = 30 +) + +var defaultPool, _ = NewPool(DEFAULT_POOL_SIZE) func Push(task f) error { return defaultPool.Push(task) @@ -38,3 +43,12 @@ func Cap() int { func Free() int { return defaultPool.Free() } + +func Release() { + +} + +var ( + PoolSizeInvalidError = errors.New("invalid size for pool") + PoolClosedError = errors.New("this pool has been closed") +) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 5ce9e2b..f6a1bdc 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -6,7 +6,7 @@ import ( "sync" ) -const RunTimes = 1000000 +const RunTimes = 10000000 func BenchmarkGoroutine(b *testing.B) { for i := 0; i < b.N; i++ { @@ -43,4 +43,3 @@ func BenchmarkPoolGroutine(b *testing.B) { // p.Push(demoFunc) // } //} - diff --git a/ants_test.go b/ants_test.go index e349a6d..5a1b696 100644 --- a/ants_test.go +++ b/ants_test.go @@ -4,8 +4,8 @@ import ( "testing" "github.com/panjf2000/ants" "sync" - "time" "runtime" + "time" ) var n = 1000000 @@ -27,6 +27,8 @@ var n = 1000000 func forSleep() { time.Sleep(time.Millisecond) + //for i := 0; i < 10000; i++ { + //} } func TestNoPool(t *testing.T) { @@ -68,7 +70,7 @@ func TestDefaultPool(t *testing.T) { } func TestCustomPool(t *testing.T) { - p := ants.NewPool(30000) + p, _ := ants.NewPool(30000) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) diff --git a/examples/main.go b/examples/main.go new file mode 100644 index 0000000..61a2c67 --- /dev/null +++ b/examples/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "github.com/panjf2000/ants" + "sync" +) + +func myFunc() { + fmt.Println("Hello World!") +} + +func main() { + // + runTimes := 10000 + + // set 100 the size of goroutine pool + p, _ := ants.NewPool(100) + + var wg sync.WaitGroup + // submit + for i := 0; i < runTimes; i++ { + wg.Add(1) + p.Push(func() { + myFunc() + wg.Done() + }) + } + wg.Wait() + fmt.Println("finish all tasks!") +} diff --git a/pool.go b/pool.go index 3e63551..eedbd23 100644 --- a/pool.go +++ b/pool.go @@ -19,10 +19,10 @@ package ants import ( - "runtime" "sync/atomic" "sync" "math" + "time" ) type sig struct{} @@ -35,25 +35,46 @@ type Pool struct { freeSignal chan sig workers []*Worker workerPool sync.Pool - destroy chan sig + release chan sig lock sync.Mutex + closed int32 } -func NewPool(size int) *Pool { +func NewPool(size int) (*Pool, error) { + if size <= 0 { + return nil, PoolSizeInvalidError + } p := &Pool{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - destroy: make(chan sig, runtime.GOMAXPROCS(-1)), + release: make(chan sig), + closed: 0, } - return p + return p, nil } //------------------------------------------------------------------------- +func (p *Pool) scanAndClean() { + ticker := time.NewTicker(DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + go func() { + ticker.Stop() + for range ticker.C { + if atomic.LoadInt32(&p.closed) == 1 { + p.lock.Lock() + for _, w := range p.workers { + w.stop() + } + p.lock.Unlock() + } + } + }() +} + func (p *Pool) Push(task f) error { - if len(p.destroy) > 0 { - return nil + if atomic.LoadInt32(&p.closed) == 1 { + return PoolClosedError } w := p.getWorker() w.sendTask(task) @@ -72,15 +93,18 @@ func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } -func (p *Pool) Destroy() error { +func (p *Pool) Release() error { p.lock.Lock() - defer p.lock.Unlock() - for i := 0; i < runtime.GOMAXPROCS(-1)+1; i++ { - p.destroy <- sig{} - } + atomic.StoreInt32(&p.closed, 1) + close(p.release) + p.lock.Unlock() return nil } +func (p *Pool) ReSize(size int) { + atomic.StoreInt32(&p.capacity, int32(size)) +} + //------------------------------------------------------------------------- func (p *Pool) getWorker() *Worker {