From 8b106abaf3510108f5f1b1e6c0a7acfef1e1bd08 Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Tue, 11 Oct 2022 21:16:04 +0800 Subject: [PATCH] Add option to turn off automatically purge (#253) Fixes #252 Signed-off-by: zhenshan.cao --- ants_test.go | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++ options.go | 10 +++++ pool.go | 21 +++++---- pool_func.go | 19 +++++--- 4 files changed, 160 insertions(+), 15 deletions(-) diff --git a/ants_test.go b/ants_test.go index 41b1566..5dcb654 100644 --- a/ants_test.go +++ b/ants_test.go @@ -563,6 +563,131 @@ func TestInfinitePool(t *testing.T) { } } +func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int) { + sig := make(chan struct{}) + wg := sync.WaitGroup{} + + wg.Add(numWorker) + for i := 0; i < numWorker; i++ { + _ = p.Submit(func() { + wg.Done() + sig <- struct{}{} + }) + } + wg.Wait() + runCnt := p.Running() + assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + + freeCnt := p.Free() + assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt) + + newCap := 10 + + p.Tune(newCap) + capacity := p.Cap() + assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity) + <-sig + time.Sleep(time.Millisecond * 10) + + wg.Add(1) + _ = p.Submit(func() { + wg.Done() + sig <- struct{}{} + }) + wg.Wait() + + runCnt = p.Running() + assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + + <-sig + <-sig + + freeCnt = p.Free() + assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt) + + p.Release() + p.Reboot() + + runCnt = p.Running() + assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) +} + +func TestWithDisablePurge(t *testing.T) { + numWorker := 2 + p, _ := NewPool(numWorker, WithDisablePurge(true)) + testPoolWithDisablePurge(t, p, numWorker) +} + +func TestWithDisablePurgeAndWithExpiration(t *testing.T) { + numWorker := 2 + p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100)) + testPoolWithDisablePurge(t, p, numWorker) +} + +func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg *sync.WaitGroup, sig chan struct{}) { + wg.Add(numWorker) + for i := 0; i < numWorker; i++ { + _ = p.Invoke(i) + } + wg.Wait() + runCnt := p.Running() + assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + + freeCnt := p.Free() + assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt) + + newCap := 10 + p.Tune(newCap) + capacity := p.Cap() + assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity) + <-sig + + time.Sleep(time.Millisecond * 200) + + wg.Add(1) + _ = p.Invoke(10) + wg.Wait() + + runCnt = p.Running() + assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + + <-sig + <-sig + + freeCnt = p.Free() + assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt) + + p.Release() + p.Reboot() + + runCnt = p.Running() + assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) +} + +func TestPoolFuncWithDisablePurge(t *testing.T) { + numWorker := 2 + sig := make(chan struct{}) + wg := sync.WaitGroup{} + + p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { + wg.Done() + sig <- struct{}{} + }, WithDisablePurge(true)) + testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig) +} + +func TestPoolFuncWithDisablePurgeAndWithExpiration(t *testing.T) { + numWorker := 2 + sig := make(chan struct{}) + wg := sync.WaitGroup{} + + p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { + wg.Done() + sig <- struct{}{} + }, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100)) + testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig) +} + func TestInfinitePoolWithFunc(t *testing.T) { c := make(chan struct{}) p, _ := NewPoolWithFunc(-1, func(i interface{}) { diff --git a/options.go b/options.go index caa830b..90d1ad5 100644 --- a/options.go +++ b/options.go @@ -39,6 +39,9 @@ type Options struct { // Logger is the customized logger for logging info, if it is not set, // default standard logger from log package is used. Logger Logger + + // When DisablePurge is true, workers are not purged and are resident. + DisablePurge bool } // WithOptions accepts the whole options config. @@ -89,3 +92,10 @@ func WithLogger(logger Logger) Option { opts.Logger = logger } } + +// WithDisablePurge indicates whether we turn off automatically purge. +func WithDisablePurge(disable bool) Option { + return func(opts *Options) { + opts.DisablePurge = disable + } +} diff --git a/pool.go b/pool.go index ddd1fc3..407f9a1 100644 --- a/pool.go +++ b/pool.go @@ -68,6 +68,7 @@ type Pool struct { // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. func (p *Pool) purgePeriodically(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) + defer func() { heartbeat.Stop() atomic.StoreInt32(&p.heartbeatDone, 1) @@ -83,7 +84,6 @@ func (p *Pool) purgePeriodically(ctx context.Context) { if p.IsClosed() { break } - p.lock.Lock() expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) p.lock.Unlock() @@ -115,10 +115,12 @@ func NewPool(size int, options ...Option) (*Pool, error) { size = -1 } - if expiry := opts.ExpiryDuration; expiry < 0 { - return nil, ErrInvalidPoolExpiry - } else if expiry == 0 { - opts.ExpiryDuration = DefaultCleanIntervalTime + if !opts.DisablePurge { + if expiry := opts.ExpiryDuration; expiry < 0 { + return nil, ErrInvalidPoolExpiry + } else if expiry == 0 { + opts.ExpiryDuration = DefaultCleanIntervalTime + } } if opts.Logger == nil { @@ -150,8 +152,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { // Start a goroutine to clean up expired workers periodically. var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - go p.purgePeriodically(ctx) - + if !p.options.DisablePurge { + go p.purgePeriodically(ctx) + } return p, nil } @@ -259,7 +262,9 @@ func (p *Pool) Reboot() { atomic.StoreInt32(&p.heartbeatDone, 0) var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - go p.purgePeriodically(ctx) + if !p.options.DisablePurge { + go p.purgePeriodically(ctx) + } } } diff --git a/pool_func.go b/pool_func.go index 3b5d5a7..08c2181 100644 --- a/pool_func.go +++ b/pool_func.go @@ -134,10 +134,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi opts := loadOptions(options...) - if expiry := opts.ExpiryDuration; expiry < 0 { - return nil, ErrInvalidPoolExpiry - } else if expiry == 0 { - opts.ExpiryDuration = DefaultCleanIntervalTime + if !opts.DisablePurge { + if expiry := opts.ExpiryDuration; expiry < 0 { + return nil, ErrInvalidPoolExpiry + } else if expiry == 0 { + opts.ExpiryDuration = DefaultCleanIntervalTime + } } if opts.Logger == nil { @@ -167,8 +169,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi // Start a goroutine to clean up expired workers periodically. var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - go p.purgePeriodically(ctx) - + if !p.options.DisablePurge { + go p.purgePeriodically(ctx) + } return p, nil } @@ -280,7 +283,9 @@ func (p *PoolWithFunc) Reboot() { atomic.StoreInt32(&p.heartbeatDone, 0) var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - go p.purgePeriodically(ctx) + if !p.options.DisablePurge { + go p.purgePeriodically(ctx) + } } }