From 9158bd37025ccdd29d6346a6639a282e0060c7e2 Mon Sep 17 00:00:00 2001 From: Cholerae Hu Date: Mon, 21 Jan 2019 18:57:23 +0800 Subject: [PATCH] feature: add PanicHandler Signed-off-by: Cholerae Hu --- ants_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++ pool.go | 4 +++ pool_func.go | 4 +++ worker.go | 10 +++++++ worker_func.go | 10 +++++++ 5 files changed, 99 insertions(+) diff --git a/ants_test.go b/ants_test.go index 699ae88..1ed174a 100644 --- a/ants_test.go +++ b/ants_test.go @@ -25,6 +25,7 @@ package ants_test import ( "runtime" "sync" + "sync/atomic" "testing" "time" @@ -111,6 +112,53 @@ func TestAntsPool(t *testing.T) { t.Logf("memory usage:%d MB", curMem) } +func TestPanicHandler(t *testing.T) { + p0, err := ants.NewPool(10) + if err != nil { + t.Fatalf("create new pool failed: %s", err.Error()) + } + defer p0.Release() + var panicCounter int64 + var wg sync.WaitGroup + p0.PanicHandler = func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + } + wg.Add(1) + p0.Submit(func() { + panic("test") + }) + wg.Wait() + c := atomic.LoadInt64(&panicCounter) + if c != 1 { + t.Errorf("panic handler didn't work, panicCounter: %d", c) + } + if p0.Running() != 0 { + t.Errorf("pool should be empty after panic") + } + p1, err := ants.NewPoolWithFunc(10, func (p interface{}) { + panic(p) + }) + if err != nil { + t.Fatalf("create new pool with func failed: %s", err.Error()) + } + defer p1.Release() + p1.PanicHandler = func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + } + wg.Add(1) + p1.Serve("test") + wg.Wait() + c = atomic.LoadInt64(&panicCounter) + if c != 2 { + t.Errorf("panic handler didn't work, panicCounter: %d", c) + } + if p1.Running() != 0 { + t.Errorf("pool should be empty after panic") + } +} + func TestCodeCov(t *testing.T) { _, err := ants.NewTimingPool(-1, -1) t.Log(err) @@ -148,3 +196,26 @@ func TestCodeCov(t *testing.T) { p.ReSize(AntsSize) t.Logf("pool with func, after resize, capacity:%d, running:%d", p.Cap(), p.Running()) } + +func TestPurge(t *testing.T) { + p, err := ants.NewTimingPool(10, 1) + defer p.Release() + if err != nil { + t.Fatalf("create TimingPool failed: %s", err.Error()) + } + p.Submit(demoFunc) + time.Sleep(5 * time.Second) + if p.Running() != 0 { + t.Error("all p should be purged") + } + p1, err := ants.NewTimingPoolWithFunc(10, 1, demoPoolFunc) + defer p1.Release() + if err != nil { + t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error()) + } + p1.Serve(1) + time.Sleep(5 * time.Second) + if p.Running() != 0 { + t.Error("all p should be purged") + } +} diff --git a/pool.go b/pool.go index d628784..cc55242 100644 --- a/pool.go +++ b/pool.go @@ -57,6 +57,10 @@ type Pool struct { cond *sync.Cond once sync.Once + + // PanicHandler is used to handle panics from each worker goroutine. + // if nil, panics will be thrown out again from worker goroutines. + PanicHandler func(interface{}) } // clear expired workers periodically. diff --git a/pool_func.go b/pool_func.go index b0dfbd6..fb998b6 100644 --- a/pool_func.go +++ b/pool_func.go @@ -58,6 +58,10 @@ type PoolWithFunc struct { poolFunc pf once sync.Once + + // PanicHandler is used to handle panics from each worker goroutine. + // if nil, panics will be thrown out again from worker goroutines. + PanicHandler func(interface{}) } // clear expired workers periodically. diff --git a/worker.go b/worker.go index a0c2faf..4875266 100644 --- a/worker.go +++ b/worker.go @@ -44,6 +44,16 @@ type Worker struct { // that performs the function calls. func (w *Worker) run() { go func() { + defer func() { + if p := recover(); p != nil { + w.pool.decRunning() + if w.pool.PanicHandler != nil { + w.pool.PanicHandler(p) + } else { + panic(p) + } + } + }() for f := range w.task { if f == nil { w.pool.decRunning() diff --git a/worker_func.go b/worker_func.go index a02264b..24b2bd3 100644 --- a/worker_func.go +++ b/worker_func.go @@ -44,6 +44,16 @@ type WorkerWithFunc struct { // that performs the function calls. func (w *WorkerWithFunc) run() { go func() { + defer func() { + if p := recover(); p != nil { + w.pool.decRunning() + if w.pool.PanicHandler != nil { + w.pool.PanicHandler(p) + } else { + panic(p) + } + } + }() for args := range w.args { if args == nil { w.pool.decRunning()