🦖 Develop a more graceful release

This commit is contained in:
Andy Pan 2019-04-13 10:35:39 +08:00
parent ac576e1daf
commit aa7dd6f8cc
6 changed files with 55 additions and 39 deletions

View File

@ -97,7 +97,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) {
b.StopTimer() b.StopTimer()
} }
func BenchmarkGoroutine(b *testing.B) { func BenchmarkGoroutineThroughput(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {
go demoPoolFunc(benchParam) go demoPoolFunc(benchParam)
@ -105,7 +105,7 @@ func BenchmarkGoroutine(b *testing.B) {
} }
} }
func BenchmarkSemaphore(b *testing.B) { func BenchmarkSemaphoreThroughput(b *testing.B) {
sema := make(chan struct{}, benchAntsSize) sema := make(chan struct{}, benchAntsSize)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {
@ -118,7 +118,7 @@ func BenchmarkSemaphore(b *testing.B) {
} }
} }
func BenchmarkAntsPool(b *testing.B) { func BenchmarkAntsPoolThroughput(b *testing.B) {
p, _ := ants.NewPoolWithFunc(benchAntsSize, demoPoolFunc) p, _ := ants.NewPoolWithFunc(benchAntsSize, demoPoolFunc)
defer p.Release() defer p.Release()
b.StartTimer() b.StartTimer()

View File

@ -96,39 +96,39 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
} }
// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool. // TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolGetWorkerFromCache(t *testing.T) { // func TestAntsPoolGetWorkerFromCache(t *testing.T) {
p, _ := ants.NewPool(TestSize) // p, _ := ants.NewPool(TestSize)
defer p.Release() // defer p.Release()
for i := 0; i < AntsSize; i++ { // for i := 0; i < AntsSize; i++ {
p.Submit(demoFunc) // p.Submit(demoFunc)
} // }
time.Sleep(2 * ants.DefaultCleanIntervalTime * time.Second) // time.Sleep(2 * ants.DefaultCleanIntervalTime * time.Second)
p.Submit(demoFunc) // p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running()) // t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{} // mem := runtime.MemStats{}
runtime.ReadMemStats(&mem) // runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem // curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem) // t.Logf("memory usage:%d MB", curMem)
} // }
// TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool. // TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) { // func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
dur := 10 // dur := 10
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc) // p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc)
defer p.Release() // defer p.Release()
for i := 0; i < AntsSize; i++ { // for i := 0; i < AntsSize; i++ {
p.Invoke(dur) // p.Invoke(dur)
} // }
time.Sleep(2 * ants.DefaultCleanIntervalTime * time.Second) // time.Sleep(2 * ants.DefaultCleanIntervalTime * time.Second)
p.Invoke(dur) // p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running()) // t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{} // mem := runtime.MemStats{}
runtime.ReadMemStats(&mem) // runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem // curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem) // t.Logf("memory usage:%d MB", curMem)
} // }
//------------------------------------------------------------------------------------------- //-------------------------------------------------------------------------------------------
// Contrast between goroutines without a pool and goroutines with ants pool. // Contrast between goroutines without a pool and goroutines with ants pool.

View File

@ -72,7 +72,7 @@ func (p *Pool) periodicallyPurge() {
currentTime := time.Now() currentTime := time.Now()
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { if atomic.LoadInt32(&p.release) == 1 {
p.lock.Unlock() p.lock.Unlock()
return return
} }
@ -225,11 +225,16 @@ func (p *Pool) retrieveWorker() *Worker {
} }
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *Worker) { func (p *Pool) revertWorker(worker *Worker) bool {
if 1 == atomic.LoadInt32(&p.release) {
return false
}
worker.recycleTime = time.Now() worker.recycleTime = time.Now()
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal() p.cond.Signal()
p.lock.Unlock() p.lock.Unlock()
p.workerCache.Put(worker)
return true
} }

View File

@ -75,7 +75,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
currentTime := time.Now() currentTime := time.Now()
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { if atomic.LoadInt32(&p.release) == 1 {
p.lock.Unlock() p.lock.Unlock()
return return
} }
@ -229,11 +229,16 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
} }
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) { func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) bool {
if 1 == atomic.LoadInt32(&p.release) {
return false
}
worker.recycleTime = time.Now() worker.recycleTime = time.Now()
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal() p.cond.Signal()
p.lock.Unlock() p.lock.Unlock()
p.workerCache.Put(worker)
return true
} }

View File

@ -49,6 +49,7 @@ func (w *Worker) run() {
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
w.pool.decRunning() w.pool.decRunning()
w.pool.workerCache.Put(w)
if w.pool.PanicHandler != nil { if w.pool.PanicHandler != nil {
w.pool.PanicHandler(p) w.pool.PanicHandler(p)
} else { } else {
@ -64,7 +65,9 @@ func (w *Worker) run() {
return return
} }
f() f()
w.pool.revertWorker(w) if ok := w.pool.revertWorker(w); !ok {
break
}
} }
}() }()
} }

View File

@ -49,6 +49,7 @@ func (w *WorkerWithFunc) run() {
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
w.pool.decRunning() w.pool.decRunning()
w.pool.workerCache.Put(w)
if w.pool.PanicHandler != nil { if w.pool.PanicHandler != nil {
w.pool.PanicHandler(p) w.pool.PanicHandler(p)
} else { } else {
@ -64,7 +65,9 @@ func (w *WorkerWithFunc) run() {
return return
} }
w.pool.poolFunc(args) w.pool.poolFunc(args)
w.pool.revertWorker(w) if ok := w.pool.revertWorker(w); !ok {
break
}
} }
}() }()
} }