use sync.Pool

This commit is contained in:
Andy Pan 2018-05-19 18:24:36 +08:00
parent 0c7ba6d3ac
commit 2929cede54
4 changed files with 89 additions and 27 deletions

View File

@ -1,6 +1,8 @@
package ants package ants
const DEFAULT_POOL_SIZE = 1000 import "math"
const DEFAULT_POOL_SIZE = math.MaxInt32
var defaultPool = NewPool(DEFAULT_POOL_SIZE) var defaultPool = NewPool(DEFAULT_POOL_SIZE)
@ -19,3 +21,7 @@ func Cap() int {
func Free() int { func Free() int {
return defaultPool.Free() return defaultPool.Free()
} }
func Wait() {
defaultPool.Wait()
}

View File

@ -7,10 +7,14 @@ import (
"runtime" "runtime"
) )
var n = 100000 var n = 10
func demoFunc() { func demoFunc() {
for i := 0; i < 1000000; i++ {} var n int
for i := 0; i < 10000; i++ {
n += i
}
fmt.Printf("finish task with result:%d\n", n)
} }
func TestDefaultPool(t *testing.T) { func TestDefaultPool(t *testing.T) {
@ -22,6 +26,8 @@ func TestDefaultPool(t *testing.T) {
t.Logf("running workers number:%d", ants.Running()) t.Logf("running workers number:%d", ants.Running())
t.Logf("free workers number:%d", ants.Free()) t.Logf("free workers number:%d", ants.Free())
ants.Wait()
mem := runtime.MemStats{} mem := runtime.MemStats{}
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
fmt.Println("memory usage:", mem.TotalAlloc/1024) fmt.Println("memory usage:", mem.TotalAlloc/1024)

86
pool.go
View File

@ -4,28 +4,40 @@ import (
"runtime" "runtime"
"sync/atomic" "sync/atomic"
"sync" "sync"
"math"
) )
type sig struct{} type sig struct{}
type f func() type f func()
//type er interface{}
type Pool struct { type Pool struct {
capacity int32 capacity int32
running int32 running int32
tasks chan f //tasks chan er
workers chan *Worker //workers chan er
tasks *sync.Pool
workers *sync.Pool
freeSignal chan sig
launchSignal chan sig
destroy chan sig destroy chan sig
m sync.Mutex m *sync.Mutex
wg *sync.WaitGroup
} }
func NewPool(size int) *Pool { func NewPool(size int) *Pool {
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
tasks: make(chan f, size), //tasks: make(chan er, size),
//workers: &sync.Pool{New: func() interface{} { return &Worker{} }}, //workers: make(chan er, size),
workers: make(chan *Worker, size), tasks: &sync.Pool{},
workers: &sync.Pool{},
freeSignal: make(chan sig, math.MaxInt32),
launchSignal: make(chan sig, math.MaxInt32),
destroy: make(chan sig, runtime.GOMAXPROCS(-1)), destroy: make(chan sig, runtime.GOMAXPROCS(-1)),
wg: &sync.WaitGroup{},
} }
p.loop() p.loop()
return p return p
@ -38,8 +50,8 @@ func (p *Pool) loop() {
go func() { go func() {
for { for {
select { select {
case task := <-p.tasks: case <-p.launchSignal:
p.getWorker().sendTask(task) p.getWorker().sendTask(p.tasks.Get().(f))
case <-p.destroy: case <-p.destroy:
return return
} }
@ -52,7 +64,10 @@ func (p *Pool) Push(task f) error {
if len(p.destroy) > 0 { if len(p.destroy) > 0 {
return nil return nil
} }
p.tasks <- task //p.tasks <- task
p.tasks.Put(task)
p.launchSignal <- sig{}
p.wg.Add(1)
return nil return nil
} }
func (p *Pool) Running() int { func (p *Pool) Running() int {
@ -67,6 +82,10 @@ func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
func (p *Pool) Wait() {
p.wg.Wait()
}
func (p *Pool) Destroy() error { func (p *Pool) Destroy() error {
p.m.Lock() p.m.Lock()
defer p.m.Unlock() defer p.m.Unlock()
@ -83,6 +102,10 @@ func (p *Pool) reachLimit() bool {
} }
func (p *Pool) newWorker() *Worker { func (p *Pool) newWorker() *Worker {
if p.reachLimit() {
<-p.freeSignal
return p.getWorker()
}
worker := &Worker{ worker := &Worker{
pool: p, pool: p,
task: make(chan f), task: make(chan f),
@ -92,18 +115,43 @@ func (p *Pool) newWorker() *Worker {
return worker return worker
} }
//func (p *Pool) newWorker() *Worker {
// worker := &Worker{
// pool: p,
// task: make(chan f),
// exit: make(chan sig),
// }
// worker.run()
// return worker
//}
//func (p *Pool) getWorker() *Worker {
// defer atomic.AddInt32(&p.running, 1)
// var worker *Worker
// if p.reachLimit() {
// worker = (<-p.workers).(*Worker)
// } else {
// select {
// case w := <-p.workers:
// return w.(*Worker)
// default:
// worker = p.newWorker()
// }
// }
// return worker
//}
func (p *Pool) getWorker() *Worker { func (p *Pool) getWorker() *Worker {
defer atomic.AddInt32(&p.running, 1) defer atomic.AddInt32(&p.running, 1)
var worker *Worker if w := p.workers.Get(); w != nil {
return w.(*Worker)
}
return p.newWorker()
}
func (p *Pool) PutWorker(worker *Worker) {
p.workers.Put(worker)
if p.reachLimit() { if p.reachLimit() {
worker = <-p.workers p.freeSignal <- sig{}
} else {
select {
case worker = <-p.workers:
return worker
default:
worker = p.newWorker()
} }
} }
return worker
}

View File

@ -14,9 +14,11 @@ func (w *Worker) run() {
select { select {
case f := <-w.task: case f := <-w.task:
f() f()
w.pool.workers <- w //w.pool.workers <- w
atomic.AddInt32(&w.pool.running, -1) w.pool.workers.Put(w)
w.pool.wg.Done()
case <-w.exit: case <-w.exit:
atomic.AddInt32(&w.pool.running, -1)
return return
} }
} }