Add option to turn off automatically purge (#253)

Fixes #252 

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2022-10-11 21:16:04 +08:00 committed by GitHub
parent 06e6934c35
commit 8b106abaf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 160 additions and 15 deletions

View File

@ -563,6 +563,131 @@ func TestInfinitePool(t *testing.T) {
} }
} }
func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int) {
sig := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(numWorker)
for i := 0; i < numWorker; i++ {
_ = p.Submit(func() {
wg.Done()
sig <- struct{}{}
})
}
wg.Wait()
runCnt := p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
freeCnt := p.Free()
assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt)
newCap := 10
p.Tune(newCap)
capacity := p.Cap()
assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity)
<-sig
time.Sleep(time.Millisecond * 10)
wg.Add(1)
_ = p.Submit(func() {
wg.Done()
sig <- struct{}{}
})
wg.Wait()
runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
<-sig
<-sig
freeCnt = p.Free()
assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt)
p.Release()
p.Reboot()
runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
}
func TestWithDisablePurge(t *testing.T) {
numWorker := 2
p, _ := NewPool(numWorker, WithDisablePurge(true))
testPoolWithDisablePurge(t, p, numWorker)
}
func TestWithDisablePurgeAndWithExpiration(t *testing.T) {
numWorker := 2
p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100))
testPoolWithDisablePurge(t, p, numWorker)
}
func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg *sync.WaitGroup, sig chan struct{}) {
wg.Add(numWorker)
for i := 0; i < numWorker; i++ {
_ = p.Invoke(i)
}
wg.Wait()
runCnt := p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
freeCnt := p.Free()
assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt)
newCap := 10
p.Tune(newCap)
capacity := p.Cap()
assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity)
<-sig
time.Sleep(time.Millisecond * 200)
wg.Add(1)
_ = p.Invoke(10)
wg.Wait()
runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
<-sig
<-sig
freeCnt = p.Free()
assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt)
p.Release()
p.Reboot()
runCnt = p.Running()
assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt)
}
func TestPoolFuncWithDisablePurge(t *testing.T) {
numWorker := 2
sig := make(chan struct{})
wg := sync.WaitGroup{}
p, _ := NewPoolWithFunc(numWorker, func(i interface{}) {
wg.Done()
sig <- struct{}{}
}, WithDisablePurge(true))
testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig)
}
func TestPoolFuncWithDisablePurgeAndWithExpiration(t *testing.T) {
numWorker := 2
sig := make(chan struct{})
wg := sync.WaitGroup{}
p, _ := NewPoolWithFunc(numWorker, func(i interface{}) {
wg.Done()
sig <- struct{}{}
}, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100))
testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig)
}
func TestInfinitePoolWithFunc(t *testing.T) { func TestInfinitePoolWithFunc(t *testing.T) {
c := make(chan struct{}) c := make(chan struct{})
p, _ := NewPoolWithFunc(-1, func(i interface{}) { p, _ := NewPoolWithFunc(-1, func(i interface{}) {

View File

@ -39,6 +39,9 @@ type Options struct {
// Logger is the customized logger for logging info, if it is not set, // Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used. // default standard logger from log package is used.
Logger Logger Logger Logger
// When DisablePurge is true, workers are not purged and are resident.
DisablePurge bool
} }
// WithOptions accepts the whole options config. // WithOptions accepts the whole options config.
@ -89,3 +92,10 @@ func WithLogger(logger Logger) Option {
opts.Logger = logger opts.Logger = logger
} }
} }
// WithDisablePurge indicates whether we turn off automatically purge.
func WithDisablePurge(disable bool) Option {
return func(opts *Options) {
opts.DisablePurge = disable
}
}

21
pool.go
View File

@ -68,6 +68,7 @@ type Pool struct {
// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. // purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically(ctx context.Context) { func (p *Pool) purgePeriodically(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
heartbeat.Stop() heartbeat.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1) atomic.StoreInt32(&p.heartbeatDone, 1)
@ -83,7 +84,6 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
if p.IsClosed() { if p.IsClosed() {
break break
} }
p.lock.Lock() p.lock.Lock()
expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock() p.lock.Unlock()
@ -115,10 +115,12 @@ func NewPool(size int, options ...Option) (*Pool, error) {
size = -1 size = -1
} }
if expiry := opts.ExpiryDuration; expiry < 0 { if !opts.DisablePurge {
return nil, ErrInvalidPoolExpiry if expiry := opts.ExpiryDuration; expiry < 0 {
} else if expiry == 0 { return nil, ErrInvalidPoolExpiry
opts.ExpiryDuration = DefaultCleanIntervalTime } else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
} }
if opts.Logger == nil { if opts.Logger == nil {
@ -150,8 +152,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Start a goroutine to clean up expired workers periodically. // Start a goroutine to clean up expired workers periodically.
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx) if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
return p, nil return p, nil
} }
@ -259,7 +262,9 @@ func (p *Pool) Reboot() {
atomic.StoreInt32(&p.heartbeatDone, 0) atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx) if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
} }
} }

View File

@ -134,10 +134,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
opts := loadOptions(options...) opts := loadOptions(options...)
if expiry := opts.ExpiryDuration; expiry < 0 { if !opts.DisablePurge {
return nil, ErrInvalidPoolExpiry if expiry := opts.ExpiryDuration; expiry < 0 {
} else if expiry == 0 { return nil, ErrInvalidPoolExpiry
opts.ExpiryDuration = DefaultCleanIntervalTime } else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
} }
if opts.Logger == nil { if opts.Logger == nil {
@ -167,8 +169,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
// Start a goroutine to clean up expired workers periodically. // Start a goroutine to clean up expired workers periodically.
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx) if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
return p, nil return p, nil
} }
@ -280,7 +283,9 @@ func (p *PoolWithFunc) Reboot() {
atomic.StoreInt32(&p.heartbeatDone, 0) atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx) if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
}
} }
} }