Add a feature of rebooting a released pool

This commit is contained in:
Andy Pan 2020-01-16 12:30:35 +08:00
parent b7fb5f33c9
commit d32d668565
4 changed files with 103 additions and 35 deletions

12
ants.go
View File

@ -35,9 +35,14 @@ const (
// DefaultCleanIntervalTime is the interval time to clean up goroutines. // DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = time.Second DefaultCleanIntervalTime = time.Second
)
const (
// OPENED represents that the pool is opened.
OPENED = iota
// CLOSED represents that the pool is closed. // CLOSED represents that the pool is closed.
CLOSED = 1 CLOSED
) )
var ( var (
@ -171,3 +176,8 @@ func Free() int {
func Release() { func Release() {
defaultAntsPool.Release() defaultAntsPool.Release()
} }
// Reboot reboots the default pool.
func Reboot() {
defaultAntsPool.Reboot()
}

View File

@ -475,6 +475,60 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
default: default:
} }
} }
func TestRebootDefaultPool(t *testing.T) {
defer Release()
Reboot()
var wg sync.WaitGroup
wg.Add(1)
_ = Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
Release()
assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
Reboot()
wg.Add(1)
assert.NoError(t, Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()
}
func TestRebootNewPool(t *testing.T) {
var wg sync.WaitGroup
p, err := NewPool(10)
assert.NoErrorf(t, err, "create Pool failed: %v", err)
defer p.Release()
wg.Add(1)
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
p.Release()
assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
p.Reboot()
wg.Add(1)
assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()
p1, err := NewPoolWithFunc(10, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
})
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
wg.Add(1)
_ = p1.Invoke(1)
wg.Wait()
p1.Release()
assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed")
p1.Reboot()
wg.Add(1)
assert.NoError(t, p1.Invoke(1), "pool should be rebooted")
wg.Wait()
}
func TestRestCodeCoverage(t *testing.T) { func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1)) _, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err) t.Log(err)

32
pool.go
View File

@ -41,8 +41,8 @@ type Pool struct {
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers workerArray workers workerArray
// release is used to notice the pool to closed itself. // state is used to notice the pool to closed itself.
release int32 state int32
// lock for synchronous operation. // lock for synchronous operation.
lock sync.Locker lock sync.Locker
@ -50,9 +50,6 @@ type Pool struct {
// cond for waiting to get a idle worker. // cond for waiting to get a idle worker.
cond *sync.Cond cond *sync.Cond
// once makes sure releasing this pool will just be done for one time.
once sync.Once
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool workerCache sync.Pool
@ -62,13 +59,13 @@ type Pool struct {
options *Options options *Options
} }
// Clear expired workers periodically. // periodicallyPurge clears expired workers periodically.
func (p *Pool) periodicallyPurge() { func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop() defer heartbeat.Stop()
for range heartbeat.C { for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.state) == CLOSED {
break break
} }
@ -139,7 +136,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
// Submit submits a task to this pool. // Submit submits a task to this pool.
func (p *Pool) Submit(task func()) error { func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.state) == CLOSED {
return ErrPoolClosed return ErrPoolClosed
} }
var w *goWorker var w *goWorker
@ -175,12 +172,17 @@ func (p *Pool) Tune(size int) {
// Release Closes this pool. // Release Closes this pool.
func (p *Pool) Release() { func (p *Pool) Release() {
p.once.Do(func() { atomic.StoreInt32(&p.state, CLOSED)
atomic.StoreInt32(&p.release, 1) p.lock.Lock()
p.lock.Lock() p.workers.reset()
p.workers.reset() p.lock.Unlock()
p.lock.Unlock() }
})
// Reboot reboots a released pool.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.periodicallyPurge()
}
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -242,7 +244,7 @@ func (p *Pool) retrieveWorker() *goWorker {
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool { func (p *Pool) revertWorker(worker *goWorker) bool {
if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() { if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
return false return false
} }
worker.recycleTime = time.Now() worker.recycleTime = time.Now()

View File

@ -41,8 +41,8 @@ type PoolWithFunc struct {
// workers is a slice that store the available workers. // workers is a slice that store the available workers.
workers []*goWorkerWithFunc workers []*goWorkerWithFunc
// release is used to notice the pool to closed itself. // state is used to notice the pool to closed itself.
release int32 state int32
// lock for synchronous operation. // lock for synchronous operation.
lock sync.Locker lock sync.Locker
@ -53,9 +53,6 @@ type PoolWithFunc struct {
// poolFunc is the function for processing tasks. // poolFunc is the function for processing tasks.
poolFunc func(interface{}) poolFunc func(interface{})
// once makes sure releasing this pool will just be done for one time.
once sync.Once
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool workerCache sync.Pool
@ -65,14 +62,14 @@ type PoolWithFunc struct {
options *Options options *Options
} }
// Clear expired workers periodically. // periodicallyPurge clears expired workers periodically.
func (p *PoolWithFunc) periodicallyPurge() { func (p *PoolWithFunc) periodicallyPurge() {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop() defer heartbeat.Stop()
var expiredWorkers []*goWorkerWithFunc var expiredWorkers []*goWorkerWithFunc
for range heartbeat.C { for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.state) == CLOSED {
break break
} }
currentTime := time.Now() currentTime := time.Now()
@ -158,7 +155,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
// Invoke submits a task to pool. // Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error { func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.state) == CLOSED {
return ErrPoolClosed return ErrPoolClosed
} }
var w *goWorkerWithFunc var w *goWorkerWithFunc
@ -194,16 +191,21 @@ func (p *PoolWithFunc) Tune(size int) {
// Release Closes this pool. // Release Closes this pool.
func (p *PoolWithFunc) Release() { func (p *PoolWithFunc) Release() {
p.once.Do(func() { atomic.StoreInt32(&p.state, CLOSED)
atomic.StoreInt32(&p.release, 1) p.lock.Lock()
p.lock.Lock() idleWorkers := p.workers
idleWorkers := p.workers for _, w := range idleWorkers {
for _, w := range idleWorkers { w.args <- nil
w.args <- nil }
} p.workers = nil
p.workers = nil p.lock.Unlock()
p.lock.Unlock() }
})
// Reboot reboots a released pool.
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.periodicallyPurge()
}
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@ -269,7 +271,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
// revertWorker puts a worker back into free pool, recycling the goroutines. // revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() { if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
return false return false
} }
worker.recycleTime = time.Now() worker.recycleTime = time.Now()