bug: alleviate the data race between Release() and Reboot() (#330)

This commit is contained in:
Andy Pan 2024-06-18 02:00:36 +08:00 committed by GitHub
parent 1933478e2e
commit 95dad45c7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 17 additions and 9 deletions

View File

@ -503,7 +503,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
func TestRebootDefaultPool(t *testing.T) { func TestRebootDefaultPool(t *testing.T) {
defer Release() defer Release()
Reboot() Reboot() // should do nothing inside
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
_ = Submit(func() { _ = Submit(func() {
@ -511,7 +511,7 @@ func TestRebootDefaultPool(t *testing.T) {
wg.Done() wg.Done()
}) })
wg.Wait() wg.Wait()
Release() assert.NoError(t, ReleaseTimeout(time.Second))
assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed") assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
Reboot() Reboot()
wg.Add(1) wg.Add(1)
@ -530,7 +530,7 @@ func TestRebootNewPool(t *testing.T) {
wg.Done() wg.Done()
}) })
wg.Wait() wg.Wait()
p.Release() assert.NoError(t, p.ReleaseTimeout(time.Second))
assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed") assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
p.Reboot() p.Reboot()
wg.Add(1) wg.Add(1)
@ -546,7 +546,7 @@ func TestRebootNewPool(t *testing.T) {
wg.Add(1) wg.Add(1)
_ = p1.Invoke(1) _ = p1.Invoke(1)
wg.Wait() wg.Wait()
p1.Release() assert.NoError(t, p1.ReleaseTimeout(time.Second))
assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed") assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed")
p1.Reboot() p1.Reboot()
wg.Add(1) wg.Add(1)
@ -975,7 +975,7 @@ func TestReleaseTimeout(t *testing.T) {
} }
func TestDefaultPoolReleaseTimeout(t *testing.T) { func TestDefaultPoolReleaseTimeout(t *testing.T) {
Reboot() Reboot() // should do nothing inside
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
_ = Submit(func() { _ = Submit(func() {
time.Sleep(time.Second) time.Sleep(time.Second)

View File

@ -91,9 +91,10 @@ func (p *Pool) purgeStaleWorkers() {
atomic.StoreInt32(&p.purgeDone, 1) atomic.StoreInt32(&p.purgeDone, 1)
}() }()
purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for { for {
select { select {
case <-p.purgeCtx.Done(): case <-purgeCtx.Done():
return return
case <-ticker.C: case <-ticker.C:
} }
@ -344,7 +345,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
} }
} }
// Reboot reboots a closed pool. // Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *Pool) Reboot() { func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0) atomic.StoreInt32(&p.purgeDone, 0)

View File

@ -48,9 +48,10 @@ func (p *PoolWithFunc) purgeStaleWorkers() {
atomic.StoreInt32(&p.purgeDone, 1) atomic.StoreInt32(&p.purgeDone, 1)
}() }()
purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot()
for { for {
select { select {
case <-p.purgeCtx.Done(): case <-purgeCtx.Done():
return return
case <-ticker.C: case <-ticker.C:
} }
@ -308,7 +309,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
} }
} }
// Reboot reboots a closed pool. // Reboot reboots a closed pool, it does nothing if the pool is not closed.
// If you intend to reboot a closed pool, use ReleaseTimeout() instead of
// Release() to ensure that all workers are stopped and resource are released
// before rebooting, otherwise you may run into data race.
func (p *PoolWithFunc) Reboot() { func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0) atomic.StoreInt32(&p.purgeDone, 0)