From d32d668565c40283b3d0066cc6d9406def693edb Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Thu, 16 Jan 2020 12:30:35 +0800 Subject: [PATCH] Add a feature of rebooting a released pool --- ants.go | 12 +++++++++++- ants_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++ pool.go | 32 ++++++++++++++++--------------- pool_func.go | 40 ++++++++++++++++++++------------------ 4 files changed, 103 insertions(+), 35 deletions(-) diff --git a/ants.go b/ants.go index 5473cb1..de24d49 100644 --- a/ants.go +++ b/ants.go @@ -35,9 +35,14 @@ const ( // DefaultCleanIntervalTime is the interval time to clean up goroutines. DefaultCleanIntervalTime = time.Second +) + +const ( + // OPENED represents that the pool is opened. + OPENED = iota // CLOSED represents that the pool is closed. - CLOSED = 1 + CLOSED ) var ( @@ -171,3 +176,8 @@ func Free() int { func Release() { defaultAntsPool.Release() } + +// Reboot reboots the default pool. +func Reboot() { + defaultAntsPool.Reboot() +} diff --git a/ants_test.go b/ants_test.go index bd04fe3..f424a0c 100644 --- a/ants_test.go +++ b/ants_test.go @@ -475,6 +475,60 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { default: } } + +func TestRebootDefaultPool(t *testing.T) { + defer Release() + Reboot() + var wg sync.WaitGroup + wg.Add(1) + _ = Submit(func() { + demoFunc() + wg.Done() + }) + wg.Wait() + Release() + assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed") + Reboot() + wg.Add(1) + assert.NoError(t, Submit(func() { wg.Done() }), "pool should be rebooted") + wg.Wait() +} + +func TestRebootNewPool(t *testing.T) { + var wg sync.WaitGroup + p, err := NewPool(10) + assert.NoErrorf(t, err, "create Pool failed: %v", err) + defer p.Release() + wg.Add(1) + _ = p.Submit(func() { + demoFunc() + wg.Done() + }) + wg.Wait() + p.Release() + assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed") + p.Reboot() + wg.Add(1) + assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted") + wg.Wait() + + p1, err := NewPoolWithFunc(10, func(i interface{}) { + demoPoolFunc(i) + wg.Done() + }) + assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err) + defer p1.Release() + wg.Add(1) + _ = p1.Invoke(1) + wg.Wait() + p1.Release() + assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed") + p1.Reboot() + wg.Add(1) + assert.NoError(t, p1.Invoke(1), "pool should be rebooted") + wg.Wait() +} + func TestRestCodeCoverage(t *testing.T) { _, err := NewPool(-1, WithExpiryDuration(-1)) t.Log(err) diff --git a/pool.go b/pool.go index cb6978b..8039435 100644 --- a/pool.go +++ b/pool.go @@ -41,8 +41,8 @@ type Pool struct { // workers is a slice that store the available workers. workers workerArray - // release is used to notice the pool to closed itself. - release int32 + // state is used to notice the pool to closed itself. + state int32 // lock for synchronous operation. lock sync.Locker @@ -50,9 +50,6 @@ type Pool struct { // cond for waiting to get a idle worker. cond *sync.Cond - // once makes sure releasing this pool will just be done for one time. - once sync.Once - // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. workerCache sync.Pool @@ -62,13 +59,13 @@ type Pool struct { options *Options } -// Clear expired workers periodically. +// periodicallyPurge clears expired workers periodically. func (p *Pool) periodicallyPurge() { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer heartbeat.Stop() for range heartbeat.C { - if atomic.LoadInt32(&p.release) == CLOSED { + if atomic.LoadInt32(&p.state) == CLOSED { break } @@ -139,7 +136,7 @@ func NewPool(size int, options ...Option) (*Pool, error) { // Submit submits a task to this pool. func (p *Pool) Submit(task func()) error { - if atomic.LoadInt32(&p.release) == CLOSED { + if atomic.LoadInt32(&p.state) == CLOSED { return ErrPoolClosed } var w *goWorker @@ -175,12 +172,17 @@ func (p *Pool) Tune(size int) { // Release Closes this pool. func (p *Pool) Release() { - p.once.Do(func() { - atomic.StoreInt32(&p.release, 1) - p.lock.Lock() - p.workers.reset() - p.lock.Unlock() - }) + atomic.StoreInt32(&p.state, CLOSED) + p.lock.Lock() + p.workers.reset() + p.lock.Unlock() +} + +// Reboot reboots a released pool. +func (p *Pool) Reboot() { + if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { + go p.periodicallyPurge() + } } // --------------------------------------------------------------------------- @@ -242,7 +244,7 @@ func (p *Pool) retrieveWorker() *goWorker { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *goWorker) bool { - if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() { + if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() { return false } worker.recycleTime = time.Now() diff --git a/pool_func.go b/pool_func.go index 241753e..96a26ea 100644 --- a/pool_func.go +++ b/pool_func.go @@ -41,8 +41,8 @@ type PoolWithFunc struct { // workers is a slice that store the available workers. workers []*goWorkerWithFunc - // release is used to notice the pool to closed itself. - release int32 + // state is used to notice the pool to closed itself. + state int32 // lock for synchronous operation. lock sync.Locker @@ -53,9 +53,6 @@ type PoolWithFunc struct { // poolFunc is the function for processing tasks. poolFunc func(interface{}) - // once makes sure releasing this pool will just be done for one time. - once sync.Once - // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. workerCache sync.Pool @@ -65,14 +62,14 @@ type PoolWithFunc struct { options *Options } -// Clear expired workers periodically. +// periodicallyPurge clears expired workers periodically. func (p *PoolWithFunc) periodicallyPurge() { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer heartbeat.Stop() var expiredWorkers []*goWorkerWithFunc for range heartbeat.C { - if atomic.LoadInt32(&p.release) == CLOSED { + if atomic.LoadInt32(&p.state) == CLOSED { break } currentTime := time.Now() @@ -158,7 +155,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi // Invoke submits a task to pool. func (p *PoolWithFunc) Invoke(args interface{}) error { - if atomic.LoadInt32(&p.release) == CLOSED { + if atomic.LoadInt32(&p.state) == CLOSED { return ErrPoolClosed } var w *goWorkerWithFunc @@ -194,16 +191,21 @@ func (p *PoolWithFunc) Tune(size int) { // Release Closes this pool. func (p *PoolWithFunc) Release() { - p.once.Do(func() { - atomic.StoreInt32(&p.release, 1) - p.lock.Lock() - idleWorkers := p.workers - for _, w := range idleWorkers { - w.args <- nil - } - p.workers = nil - p.lock.Unlock() - }) + atomic.StoreInt32(&p.state, CLOSED) + p.lock.Lock() + idleWorkers := p.workers + for _, w := range idleWorkers { + w.args <- nil + } + p.workers = nil + p.lock.Unlock() +} + +// Reboot reboots a released pool. +func (p *PoolWithFunc) Reboot() { + if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { + go p.periodicallyPurge() + } } //--------------------------------------------------------------------------- @@ -269,7 +271,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { - if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() { + if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() { return false } worker.recycleTime = time.Now()