diff --git a/ants_test.go b/ants_test.go index 230972f..5297243 100644 --- a/ants_test.go +++ b/ants_test.go @@ -503,7 +503,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { func TestRebootDefaultPool(t *testing.T) { defer Release() - Reboot() + Reboot() // should do nothing inside var wg sync.WaitGroup wg.Add(1) _ = Submit(func() { @@ -511,7 +511,7 @@ func TestRebootDefaultPool(t *testing.T) { wg.Done() }) wg.Wait() - Release() + assert.NoError(t, ReleaseTimeout(time.Second)) assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed") Reboot() wg.Add(1) @@ -530,7 +530,7 @@ func TestRebootNewPool(t *testing.T) { wg.Done() }) wg.Wait() - p.Release() + assert.NoError(t, p.ReleaseTimeout(time.Second)) assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed") p.Reboot() wg.Add(1) @@ -546,7 +546,7 @@ func TestRebootNewPool(t *testing.T) { wg.Add(1) _ = p1.Invoke(1) wg.Wait() - p1.Release() + assert.NoError(t, p1.ReleaseTimeout(time.Second)) assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed") p1.Reboot() wg.Add(1) @@ -975,7 +975,7 @@ func TestReleaseTimeout(t *testing.T) { } func TestDefaultPoolReleaseTimeout(t *testing.T) { - Reboot() + Reboot() // should do nothing inside for i := 0; i < 5; i++ { _ = Submit(func() { time.Sleep(time.Second) diff --git a/pool.go b/pool.go index aeab50b..9c28874 100644 --- a/pool.go +++ b/pool.go @@ -91,9 +91,10 @@ func (p *Pool) purgeStaleWorkers() { atomic.StoreInt32(&p.purgeDone, 1) }() + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-p.purgeCtx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -344,7 +345,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { } } -// Reboot reboots a closed pool. +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) diff --git a/pool_func.go b/pool_func.go index c9ab90e..a73d979 100644 --- a/pool_func.go +++ b/pool_func.go @@ -48,9 +48,10 @@ func (p *PoolWithFunc) purgeStaleWorkers() { atomic.StoreInt32(&p.purgeDone, 1) }() + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { - case <-p.purgeCtx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -308,7 +309,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { } } -// Reboot reboots a closed pool. +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0)