Fix the bug that blocks forever when call Release() before all tasks are done

Fixes #202
This commit is contained in:
Andy Pan 2022-02-08 13:52:25 +08:00
parent 1bd4304727
commit 8d03fcf77f
3 changed files with 77 additions and 0 deletions

View File

@ -590,6 +590,81 @@ func TestInfinitePoolWithFunc(t *testing.T) {
} }
} }
func TestReleaseWhenRunningPool(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPool(1)
wg.Add(2)
go func() {
t.Log("start aaa")
defer func() {
wg.Done()
t.Log("stop aaa")
}()
for i := 0; i < 30; i++ {
j := i
p.Submit(func() {
t.Log("do task", j)
time.Sleep(1 * time.Second)
})
}
}()
go func() {
t.Log("start bbb")
defer func() {
wg.Done()
t.Log("stop bbb")
}()
for i := 100; i < 130; i++ {
j := i
p.Submit(func() {
t.Log("do task", j)
time.Sleep(1 * time.Second)
})
}
}()
time.Sleep(3 * time.Second)
p.Release()
t.Log("wait for all goroutines to exit...")
wg.Wait()
}
func TestReleaseWhenRunningPoolWithFunc(t *testing.T) {
var wg sync.WaitGroup
p, _ := NewPoolWithFunc(1, func(i interface{}) {
t.Log("do task", i)
time.Sleep(1 * time.Second)
})
wg.Add(2)
go func() {
t.Log("start aaa")
defer func() {
wg.Done()
t.Log("stop aaa")
}()
for i := 0; i < 30; i++ {
p.Invoke(i)
}
}()
go func() {
t.Log("start bbb")
defer func() {
wg.Done()
t.Log("stop bbb")
}()
for i := 100; i < 130; i++ {
p.Invoke(i)
}
}()
time.Sleep(3 * time.Second)
p.Release()
t.Log("wait for all goroutines to exit...")
wg.Wait()
}
func TestRestCodeCoverage(t *testing.T) { func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1)) _, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err) t.Log(err)

View File

@ -276,6 +276,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool { func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false return false
} }
worker.recycleTime = time.Now() worker.recycleTime = time.Now()

View File

@ -303,6 +303,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false return false
} }
worker.recycleTime = time.Now() worker.recycleTime = time.Now()