From 8d03fcf77f75c5c2a5b4cfc98e2ec9978af07593 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 8 Feb 2022 13:52:25 +0800 Subject: [PATCH] Fix the bug that blocks forever when call Release() before all tasks are done Fixes #202 --- ants_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++ pool.go | 1 + pool_func.go | 1 + 3 files changed, 77 insertions(+) diff --git a/ants_test.go b/ants_test.go index 5dfe95d..31a5e02 100644 --- a/ants_test.go +++ b/ants_test.go @@ -590,6 +590,81 @@ func TestInfinitePoolWithFunc(t *testing.T) { } } +func TestReleaseWhenRunningPool(t *testing.T) { + var wg sync.WaitGroup + p, _ := NewPool(1) + wg.Add(2) + go func() { + t.Log("start aaa") + defer func() { + wg.Done() + t.Log("stop aaa") + }() + for i := 0; i < 30; i++ { + j := i + p.Submit(func() { + t.Log("do task", j) + time.Sleep(1 * time.Second) + }) + } + }() + + go func() { + t.Log("start bbb") + defer func() { + wg.Done() + t.Log("stop bbb") + }() + for i := 100; i < 130; i++ { + j := i + p.Submit(func() { + t.Log("do task", j) + time.Sleep(1 * time.Second) + }) + } + }() + + time.Sleep(3 * time.Second) + p.Release() + t.Log("wait for all goroutines to exit...") + wg.Wait() +} + +func TestReleaseWhenRunningPoolWithFunc(t *testing.T) { + var wg sync.WaitGroup + p, _ := NewPoolWithFunc(1, func(i interface{}) { + t.Log("do task", i) + time.Sleep(1 * time.Second) + }) + wg.Add(2) + go func() { + t.Log("start aaa") + defer func() { + wg.Done() + t.Log("stop aaa") + }() + for i := 0; i < 30; i++ { + p.Invoke(i) + } + }() + + go func() { + t.Log("start bbb") + defer func() { + wg.Done() + t.Log("stop bbb") + }() + for i := 100; i < 130; i++ { + p.Invoke(i) + } + }() + + time.Sleep(3 * time.Second) + p.Release() + t.Log("wait for all goroutines to exit...") + wg.Wait() +} + func TestRestCodeCoverage(t *testing.T) { _, err := NewPool(-1, WithExpiryDuration(-1)) t.Log(err) diff --git a/pool.go b/pool.go index 633de17..4676fd5 100644 --- a/pool.go +++ b/pool.go @@ -276,6 +276,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *goWorker) bool { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { + p.cond.Broadcast() return false } worker.recycleTime = time.Now() diff --git a/pool_func.go b/pool_func.go index ef2034b..18cd320 100644 --- a/pool_func.go +++ b/pool_func.go @@ -303,6 +303,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { + p.cond.Broadcast() return false } worker.recycleTime = time.Now()