feature: add PanicHandler

Signed-off-by: Cholerae Hu <choleraehyq@gmail.com>
This commit is contained in:
Cholerae Hu 2019-01-21 18:57:23 +08:00 committed by Andy Pan
parent 812dd4e010
commit 9158bd3702
5 changed files with 99 additions and 0 deletions

View File

@ -25,6 +25,7 @@ package ants_test
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
@ -111,6 +112,53 @@ func TestAntsPool(t *testing.T) {
t.Logf("memory usage:%d MB", curMem)
}
func TestPanicHandler(t *testing.T) {
p0, err := ants.NewPool(10)
if err != nil {
t.Fatalf("create new pool failed: %s", err.Error())
}
defer p0.Release()
var panicCounter int64
var wg sync.WaitGroup
p0.PanicHandler = func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}
wg.Add(1)
p0.Submit(func() {
panic("test")
})
wg.Wait()
c := atomic.LoadInt64(&panicCounter)
if c != 1 {
t.Errorf("panic handler didn't work, panicCounter: %d", c)
}
if p0.Running() != 0 {
t.Errorf("pool should be empty after panic")
}
p1, err := ants.NewPoolWithFunc(10, func (p interface{}) {
panic(p)
})
if err != nil {
t.Fatalf("create new pool with func failed: %s", err.Error())
}
defer p1.Release()
p1.PanicHandler = func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}
wg.Add(1)
p1.Serve("test")
wg.Wait()
c = atomic.LoadInt64(&panicCounter)
if c != 2 {
t.Errorf("panic handler didn't work, panicCounter: %d", c)
}
if p1.Running() != 0 {
t.Errorf("pool should be empty after panic")
}
}
func TestCodeCov(t *testing.T) {
_, err := ants.NewTimingPool(-1, -1)
t.Log(err)
@ -148,3 +196,26 @@ func TestCodeCov(t *testing.T) {
p.ReSize(AntsSize)
t.Logf("pool with func, after resize, capacity:%d, running:%d", p.Cap(), p.Running())
}
func TestPurge(t *testing.T) {
p, err := ants.NewTimingPool(10, 1)
defer p.Release()
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.Submit(demoFunc)
time.Sleep(5 * time.Second)
if p.Running() != 0 {
t.Error("all p should be purged")
}
p1, err := ants.NewTimingPoolWithFunc(10, 1, demoPoolFunc)
defer p1.Release()
if err != nil {
t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error())
}
p1.Serve(1)
time.Sleep(5 * time.Second)
if p.Running() != 0 {
t.Error("all p should be purged")
}
}

View File

@ -57,6 +57,10 @@ type Pool struct {
cond *sync.Cond
once sync.Once
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
}
// clear expired workers periodically.

View File

@ -58,6 +58,10 @@ type PoolWithFunc struct {
poolFunc pf
once sync.Once
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
}
// clear expired workers periodically.

View File

@ -44,6 +44,16 @@ type Worker struct {
// that performs the function calls.
func (w *Worker) run() {
go func() {
defer func() {
if p := recover(); p != nil {
w.pool.decRunning()
if w.pool.PanicHandler != nil {
w.pool.PanicHandler(p)
} else {
panic(p)
}
}
}()
for f := range w.task {
if f == nil {
w.pool.decRunning()

View File

@ -44,6 +44,16 @@ type WorkerWithFunc struct {
// that performs the function calls.
func (w *WorkerWithFunc) run() {
go func() {
defer func() {
if p := recover(); p != nil {
w.pool.decRunning()
if w.pool.PanicHandler != nil {
w.pool.PanicHandler(p)
} else {
panic(p)
}
}
}()
for args := range w.args {
if args == nil {
w.pool.decRunning()