add a new module of cleaning up goroutines

This commit is contained in:
Andy Pan 2018-05-20 18:57:11 +08:00
parent 2b0c0227b6
commit 1ee8144272
5 changed files with 88 additions and 18 deletions

18
ants.go
View File

@ -19,9 +19,14 @@
package ants package ants
const DEFAULT_POOL_SIZE = 50000 import "github.com/iris-contrib/errors"
var defaultPool = NewPool(DEFAULT_POOL_SIZE) const (
DEFAULT_POOL_SIZE = 50000
DEFAULT_CLEAN_INTERVAL_TIME = 30
)
var defaultPool, _ = NewPool(DEFAULT_POOL_SIZE)
func Push(task f) error { func Push(task f) error {
return defaultPool.Push(task) return defaultPool.Push(task)
@ -38,3 +43,12 @@ func Cap() int {
func Free() int { func Free() int {
return defaultPool.Free() return defaultPool.Free()
} }
func Release() {
}
var (
PoolSizeInvalidError = errors.New("invalid size for pool")
PoolClosedError = errors.New("this pool has been closed")
)

View File

@ -6,7 +6,7 @@ import (
"sync" "sync"
) )
const RunTimes = 1000000 const RunTimes = 10000000
func BenchmarkGoroutine(b *testing.B) { func BenchmarkGoroutine(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -43,4 +43,3 @@ func BenchmarkPoolGroutine(b *testing.B) {
// p.Push(demoFunc) // p.Push(demoFunc)
// } // }
//} //}

View File

@ -4,8 +4,8 @@ import (
"testing" "testing"
"github.com/panjf2000/ants" "github.com/panjf2000/ants"
"sync" "sync"
"time"
"runtime" "runtime"
"time"
) )
var n = 1000000 var n = 1000000
@ -27,6 +27,8 @@ var n = 1000000
func forSleep() { func forSleep() {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
//for i := 0; i < 10000; i++ {
//}
} }
func TestNoPool(t *testing.T) { func TestNoPool(t *testing.T) {
@ -68,7 +70,7 @@ func TestDefaultPool(t *testing.T) {
} }
func TestCustomPool(t *testing.T) { func TestCustomPool(t *testing.T) {
p := ants.NewPool(30000) p, _ := ants.NewPool(30000)
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
wg.Add(1) wg.Add(1)

31
examples/main.go Normal file
View File

@ -0,0 +1,31 @@
package main
import (
"fmt"
"github.com/panjf2000/ants"
"sync"
)
func myFunc() {
fmt.Println("Hello World!")
}
func main() {
//
runTimes := 10000
// set 100 the size of goroutine pool
p, _ := ants.NewPool(100)
var wg sync.WaitGroup
// submit
for i := 0; i < runTimes; i++ {
wg.Add(1)
p.Push(func() {
myFunc()
wg.Done()
})
}
wg.Wait()
fmt.Println("finish all tasks!")
}

48
pool.go
View File

@ -19,10 +19,10 @@
package ants package ants
import ( import (
"runtime"
"sync/atomic" "sync/atomic"
"sync" "sync"
"math" "math"
"time"
) )
type sig struct{} type sig struct{}
@ -35,25 +35,46 @@ type Pool struct {
freeSignal chan sig freeSignal chan sig
workers []*Worker workers []*Worker
workerPool sync.Pool workerPool sync.Pool
destroy chan sig release chan sig
lock sync.Mutex lock sync.Mutex
closed int32
} }
func NewPool(size int) *Pool { func NewPool(size int) (*Pool, error) {
if size <= 0 {
return nil, PoolSizeInvalidError
}
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32), freeSignal: make(chan sig, math.MaxInt32),
destroy: make(chan sig, runtime.GOMAXPROCS(-1)), release: make(chan sig),
closed: 0,
} }
return p return p, nil
} }
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
func (p *Pool) scanAndClean() {
ticker := time.NewTicker(DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
go func() {
ticker.Stop()
for range ticker.C {
if atomic.LoadInt32(&p.closed) == 1 {
p.lock.Lock()
for _, w := range p.workers {
w.stop()
}
p.lock.Unlock()
}
}
}()
}
func (p *Pool) Push(task f) error { func (p *Pool) Push(task f) error {
if len(p.destroy) > 0 { if atomic.LoadInt32(&p.closed) == 1 {
return nil return PoolClosedError
} }
w := p.getWorker() w := p.getWorker()
w.sendTask(task) w.sendTask(task)
@ -72,15 +93,18 @@ func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
func (p *Pool) Destroy() error { func (p *Pool) Release() error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() atomic.StoreInt32(&p.closed, 1)
for i := 0; i < runtime.GOMAXPROCS(-1)+1; i++ { close(p.release)
p.destroy <- sig{} p.lock.Unlock()
}
return nil return nil
} }
func (p *Pool) ReSize(size int) {
atomic.StoreInt32(&p.capacity, int32(size))
}
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
func (p *Pool) getWorker() *Worker { func (p *Pool) getWorker() *Worker {