mirror of https://github.com/panjf2000/ants.git
fix: exit ticktock goroutine when pool is closed
This commit is contained in:
parent
3fbd9567c9
commit
23c4f48d0d
65
pool.go
65
pool.go
|
@ -62,6 +62,9 @@ type Pool struct {
|
|||
heartbeatDone int32
|
||||
stopHeartbeat context.CancelFunc
|
||||
|
||||
ticktockDone int32
|
||||
stopTicktock context.CancelFunc
|
||||
|
||||
now atomic.Value
|
||||
|
||||
options *Options
|
||||
|
@ -111,15 +114,44 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
|
|||
}
|
||||
|
||||
// ticktock is a goroutine that updates the current time in the pool regularly.
|
||||
func (p *Pool) ticktock() {
|
||||
func (p *Pool) ticktock(ctx context.Context) {
|
||||
ticker := time.NewTicker(nowTimeUpdateInterval)
|
||||
defer ticker.Stop()
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
atomic.StoreInt32(&p.ticktockDone, 1)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
if p.IsClosed() {
|
||||
break
|
||||
}
|
||||
|
||||
for range ticker.C {
|
||||
p.now.Store(time.Now())
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) startHeartbeat() {
|
||||
// Start a goroutine to clean up expired workers periodically.
|
||||
var ctx context.Context
|
||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
||||
if !p.options.DisablePurge {
|
||||
go p.purgeStaleWorkers(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) startTicktock() {
|
||||
p.now.Store(time.Now())
|
||||
var ctx context.Context
|
||||
ctx, p.stopTicktock = context.WithCancel(context.Background())
|
||||
go p.ticktock(ctx)
|
||||
}
|
||||
|
||||
func (p *Pool) nowTime() time.Time {
|
||||
return p.now.Load().(time.Time)
|
||||
}
|
||||
|
@ -166,15 +198,8 @@ func NewPool(size int, options ...Option) (*Pool, error) {
|
|||
|
||||
p.cond = sync.NewCond(p.lock)
|
||||
|
||||
// Start a goroutine to clean up expired workers periodically.
|
||||
var ctx context.Context
|
||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
||||
if !p.options.DisablePurge {
|
||||
go p.purgeStaleWorkers(ctx)
|
||||
}
|
||||
|
||||
p.now.Store(time.Now())
|
||||
go p.ticktock()
|
||||
p.startHeartbeat()
|
||||
p.startTicktock()
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
@ -259,17 +284,21 @@ func (p *Pool) Release() {
|
|||
|
||||
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
|
||||
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
|
||||
if p.IsClosed() || p.stopHeartbeat == nil {
|
||||
if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil {
|
||||
return ErrPoolClosed
|
||||
}
|
||||
|
||||
p.stopHeartbeat()
|
||||
p.stopHeartbeat = nil
|
||||
p.stopTicktock()
|
||||
p.stopTicktock = nil
|
||||
p.Release()
|
||||
|
||||
endTime := time.Now().Add(timeout)
|
||||
for time.Now().Before(endTime) {
|
||||
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
|
||||
if p.Running() == 0 &&
|
||||
(p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) &&
|
||||
atomic.LoadInt32(&p.ticktockDone) == 1 {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -281,11 +310,9 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
|
|||
func (p *Pool) Reboot() {
|
||||
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
|
||||
atomic.StoreInt32(&p.heartbeatDone, 0)
|
||||
var ctx context.Context
|
||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
||||
if !p.options.DisablePurge {
|
||||
go p.purgeStaleWorkers(ctx)
|
||||
}
|
||||
p.startHeartbeat()
|
||||
atomic.StoreInt32(&p.ticktockDone, 0)
|
||||
p.startTicktock()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
65
pool_func.go
65
pool_func.go
|
@ -64,6 +64,9 @@ type PoolWithFunc struct {
|
|||
heartbeatDone int32
|
||||
stopHeartbeat context.CancelFunc
|
||||
|
||||
ticktockDone int32
|
||||
stopTicktock context.CancelFunc
|
||||
|
||||
now atomic.Value
|
||||
|
||||
options *Options
|
||||
|
@ -134,15 +137,44 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
|
|||
}
|
||||
|
||||
// ticktock is a goroutine that updates the current time in the pool regularly.
|
||||
func (p *PoolWithFunc) ticktock() {
|
||||
func (p *PoolWithFunc) ticktock(ctx context.Context) {
|
||||
ticker := time.NewTicker(nowTimeUpdateInterval)
|
||||
defer ticker.Stop()
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
atomic.StoreInt32(&p.ticktockDone, 1)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
if p.IsClosed() {
|
||||
break
|
||||
}
|
||||
|
||||
for range ticker.C {
|
||||
p.now.Store(time.Now())
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PoolWithFunc) startHeartbeat() {
|
||||
// Start a goroutine to clean up expired workers periodically.
|
||||
var ctx context.Context
|
||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
||||
if !p.options.DisablePurge {
|
||||
go p.purgeStaleWorkers(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PoolWithFunc) startTicktock() {
|
||||
p.now.Store(time.Now())
|
||||
var ctx context.Context
|
||||
ctx, p.stopTicktock = context.WithCancel(context.Background())
|
||||
go p.ticktock(ctx)
|
||||
}
|
||||
|
||||
func (p *PoolWithFunc) nowTime() time.Time {
|
||||
return p.now.Load().(time.Time)
|
||||
}
|
||||
|
@ -191,15 +223,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
|
|||
}
|
||||
p.cond = sync.NewCond(p.lock)
|
||||
|
||||
// Start a goroutine to clean up expired workers periodically.
|
||||
var ctx context.Context
|
||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
||||
if !p.options.DisablePurge {
|
||||
go p.purgeStaleWorkers(ctx)
|
||||
}
|
||||
|
||||
p.now.Store(time.Now())
|
||||
go p.ticktock()
|
||||
p.startHeartbeat()
|
||||
p.startTicktock()
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
@ -288,17 +313,21 @@ func (p *PoolWithFunc) Release() {
|
|||
|
||||
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
|
||||
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
||||
if p.IsClosed() || p.stopHeartbeat == nil {
|
||||
if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil {
|
||||
return ErrPoolClosed
|
||||
}
|
||||
|
||||
p.stopHeartbeat()
|
||||
p.stopHeartbeat = nil
|
||||
p.stopTicktock()
|
||||
p.stopTicktock = nil
|
||||
p.Release()
|
||||
|
||||
endTime := time.Now().Add(timeout)
|
||||
for time.Now().Before(endTime) {
|
||||
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
|
||||
if p.Running() == 0 &&
|
||||
(p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) &&
|
||||
atomic.LoadInt32(&p.ticktockDone) == 1 {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
@ -310,11 +339,9 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
|
|||
func (p *PoolWithFunc) Reboot() {
|
||||
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
|
||||
atomic.StoreInt32(&p.heartbeatDone, 0)
|
||||
var ctx context.Context
|
||||
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
|
||||
if !p.options.DisablePurge {
|
||||
go p.purgeStaleWorkers(ctx)
|
||||
}
|
||||
p.startHeartbeat()
|
||||
atomic.StoreInt32(&p.ticktockDone, 0)
|
||||
p.startTicktock()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue