opt: fix the timeout error of ReleaseTimeout() with DisablePurge=true and improve tests

This commit is contained in:
Andy Pan 2022-10-11 22:52:13 +08:00
parent 8b106abaf3
commit b604f7dc64
4 changed files with 76 additions and 88 deletions

View File

@ -563,129 +563,116 @@ func TestInfinitePool(t *testing.T) {
} }
} }
func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int) { func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) {
sig := make(chan struct{}) sig := make(chan struct{})
wg := sync.WaitGroup{} var wg1, wg2 sync.WaitGroup
wg1.Add(numWorker)
wg.Add(numWorker) wg2.Add(numWorker)
for i := 0; i < numWorker; i++ { for i := 0; i < numWorker; i++ {
_ = p.Submit(func() { _ = p.Submit(func() {
wg.Done() wg1.Done()
sig <- struct{}{} <-sig
wg2.Done()
}) })
} }
wg.Wait() wg1.Wait()
runCnt := p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
runningCnt := p.Running()
assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt)
freeCnt := p.Free() freeCnt := p.Free()
assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt) assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt)
newCap := 10 // Finish all tasks and sleep for a while to wait for purging, since we've disabled purge mechanism,
// we should see that all workers are still running after the sleep.
p.Tune(newCap) close(sig)
capacity := p.Cap() wg2.Wait()
assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity) time.Sleep(waitForPurge + waitForPurge/2)
<-sig
time.Sleep(time.Millisecond * 10)
wg.Add(1)
_ = p.Submit(func() {
wg.Done()
sig <- struct{}{}
})
wg.Wait()
runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
<-sig
<-sig
runningCnt = p.Running()
assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt)
freeCnt = p.Free() freeCnt = p.Free()
assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt) assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt)
p.Release() err := p.ReleaseTimeout(waitForPurge + waitForPurge/2)
p.Reboot() assert.NoErrorf(t, err, "release pool failed: %v", err)
runCnt = p.Running() runningCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) assert.EqualValuesf(t, 0, runningCnt, "expect %d workers running, but got %d", 0, runningCnt)
freeCnt = p.Free()
assert.EqualValuesf(t, numWorker, freeCnt, "expect %d free workers, but got %d", numWorker, freeCnt)
} }
func TestWithDisablePurge(t *testing.T) { func TestWithDisablePurgePool(t *testing.T) {
numWorker := 2 numWorker := 10
p, _ := NewPool(numWorker, WithDisablePurge(true)) p, _ := NewPool(numWorker, WithDisablePurge(true))
testPoolWithDisablePurge(t, p, numWorker) testPoolWithDisablePurge(t, p, numWorker, DefaultCleanIntervalTime)
} }
func TestWithDisablePurgeAndWithExpiration(t *testing.T) { func TestWithDisablePurgeAndWithExpirationPool(t *testing.T) {
numWorker := 2 numWorker := 10
p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100)) expiredDuration := time.Millisecond * 100
testPoolWithDisablePurge(t, p, numWorker) p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(expiredDuration))
testPoolWithDisablePurge(t, p, numWorker, expiredDuration)
} }
func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg *sync.WaitGroup, sig chan struct{}) { func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg1, wg2 *sync.WaitGroup, sig chan struct{}, waitForPurge time.Duration) {
wg.Add(numWorker)
for i := 0; i < numWorker; i++ { for i := 0; i < numWorker; i++ {
_ = p.Invoke(i) _ = p.Invoke(i)
} }
wg.Wait() wg1.Wait()
runCnt := p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
runningCnt := p.Running()
assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt)
freeCnt := p.Free() freeCnt := p.Free()
assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt) assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt)
newCap := 10 // Finish all tasks and sleep for a while to wait for purging, since we've disabled purge mechanism,
p.Tune(newCap) // we should see that all workers are still running after the sleep.
capacity := p.Cap() close(sig)
assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity) wg2.Wait()
<-sig time.Sleep(waitForPurge + waitForPurge/2)
time.Sleep(time.Millisecond * 200)
wg.Add(1)
_ = p.Invoke(10)
wg.Wait()
runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
<-sig
<-sig
runningCnt = p.Running()
assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt)
freeCnt = p.Free() freeCnt = p.Free()
assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt) assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt)
p.Release() err := p.ReleaseTimeout(waitForPurge + waitForPurge/2)
p.Reboot() assert.NoErrorf(t, err, "release pool failed: %v", err)
runCnt = p.Running() runningCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) assert.EqualValuesf(t, 0, runningCnt, "expect %d workers running, but got %d", 0, runningCnt)
freeCnt = p.Free()
assert.EqualValuesf(t, numWorker, freeCnt, "expect %d free workers, but got %d", numWorker, freeCnt)
} }
func TestPoolFuncWithDisablePurge(t *testing.T) { func TestWithDisablePurgePoolFunc(t *testing.T) {
numWorker := 2 numWorker := 10
sig := make(chan struct{}) sig := make(chan struct{})
wg := sync.WaitGroup{} var wg1, wg2 sync.WaitGroup
wg1.Add(numWorker)
wg2.Add(numWorker)
p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { p, _ := NewPoolWithFunc(numWorker, func(i interface{}) {
wg.Done() wg1.Done()
sig <- struct{}{} <-sig
wg2.Done()
}, WithDisablePurge(true)) }, WithDisablePurge(true))
testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig) testPoolFuncWithDisablePurge(t, p, numWorker, &wg1, &wg2, sig, DefaultCleanIntervalTime)
} }
func TestPoolFuncWithDisablePurgeAndWithExpiration(t *testing.T) { func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) {
numWorker := 2 numWorker := 2
sig := make(chan struct{}) sig := make(chan struct{})
wg := sync.WaitGroup{} var wg1, wg2 sync.WaitGroup
wg1.Add(numWorker)
wg2.Add(numWorker)
expiredDuration := time.Millisecond * 100
p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { p, _ := NewPoolWithFunc(numWorker, func(i interface{}) {
wg.Done() wg1.Done()
sig <- struct{}{} <-sig
}, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100)) wg2.Done()
testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig) }, WithDisablePurge(true), WithExpiryDuration(expiredDuration))
testPoolFuncWithDisablePurge(t, p, numWorker, &wg1, &wg2, sig, expiredDuration)
} }
func TestInfinitePoolWithFunc(t *testing.T) { func TestInfinitePoolWithFunc(t *testing.T) {

View File

@ -84,6 +84,7 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
if p.IsClosed() { if p.IsClosed() {
break break
} }
p.lock.Lock() p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock() p.lock.Unlock()
@ -248,7 +249,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 { if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
return nil return nil
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)

View File

@ -86,6 +86,7 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
if p.IsClosed() { if p.IsClosed() {
break break
} }
currentTime := time.Now() currentTime := time.Now()
p.lock.Lock() p.lock.Lock()
idleWorkers := p.workers idleWorkers := p.workers
@ -269,7 +270,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 { if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
return nil return nil
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)

View File

@ -29,8 +29,7 @@ func TestLoopQueue(t *testing.T) {
} }
} }
assert.EqualValues(t, 5, q.len(), "Len error") assert.EqualValues(t, 5, q.len(), "Len error")
v := q.detach() _ = q.detach()
t.Log(v)
assert.EqualValues(t, 4, q.len(), "Len error") assert.EqualValues(t, 4, q.len(), "Len error")
time.Sleep(time.Second) time.Sleep(time.Second)