chore: code cleanup

This commit is contained in:
Andy Pan 2022-12-20 21:55:28 +08:00
parent 4e0cb8cd03
commit 858f91f48b
2 changed files with 36 additions and 36 deletions

36
pool.go
View File

@ -59,8 +59,8 @@ type Pool struct {
// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32 waiting int32
heartbeatDone int32 purgeDone int32
stopHeartbeat context.CancelFunc stopPurge context.CancelFunc
ticktockDone int32 ticktockDone int32
stopTicktock context.CancelFunc stopTicktock context.CancelFunc
@ -72,18 +72,18 @@ type Pool struct {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) { func (p *Pool) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
heartbeat.Stop() ticker.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1) atomic.StoreInt32(&p.purgeDone, 1)
}() }()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C: case <-ticker.C:
} }
if p.IsClosed() { if p.IsClosed() {
@ -136,16 +136,16 @@ func (p *Pool) ticktock(ctx context.Context) {
} }
} }
func (p *Pool) startHeartbeat() { func (p *Pool) goPurge() {
// Start a goroutine to clean up expired workers periodically. // Start a goroutine to clean up expired workers periodically.
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopPurge = context.WithCancel(context.Background())
if !p.options.DisablePurge { if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx) go p.purgeStaleWorkers(ctx)
} }
} }
func (p *Pool) startTicktock() { func (p *Pool) goTicktock() {
p.now.Store(time.Now()) p.now.Store(time.Now())
var ctx context.Context var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background()) ctx, p.stopTicktock = context.WithCancel(context.Background())
@ -198,8 +198,8 @@ func NewPool(size int, options ...Option) (*Pool, error) {
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
p.startHeartbeat() p.goPurge()
p.startTicktock() p.goTicktock()
return p, nil return p, nil
} }
@ -284,12 +284,12 @@ func (p *Pool) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error { func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil { if p.IsClosed() || p.stopPurge == nil || p.stopTicktock == nil {
return ErrPoolClosed return ErrPoolClosed
} }
p.stopHeartbeat() p.stopPurge()
p.stopHeartbeat = nil p.stopPurge = nil
p.stopTicktock() p.stopTicktock()
p.stopTicktock = nil p.stopTicktock = nil
p.Release() p.Release()
@ -297,7 +297,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 && if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) && (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 { atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil return nil
} }
@ -309,10 +309,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *Pool) Reboot() { func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.heartbeatDone, 0) atomic.StoreInt32(&p.purgeDone, 0)
p.startHeartbeat() p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0) atomic.StoreInt32(&p.ticktockDone, 0)
p.startTicktock() p.goTicktock()
} }
} }

View File

@ -61,8 +61,8 @@ type PoolWithFunc struct {
// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32 waiting int32
heartbeatDone int32 purgeDone int32
stopHeartbeat context.CancelFunc stopPurge context.CancelFunc
ticktockDone int32 ticktockDone int32
stopTicktock context.CancelFunc stopTicktock context.CancelFunc
@ -74,10 +74,10 @@ type PoolWithFunc struct {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
heartbeat.Stop() ticker.Stop()
atomic.StoreInt32(&p.heartbeatDone, 1) atomic.StoreInt32(&p.purgeDone, 1)
}() }()
var expiredWorkers []*goWorkerWithFunc var expiredWorkers []*goWorkerWithFunc
@ -85,7 +85,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C: case <-ticker.C:
} }
if p.IsClosed() { if p.IsClosed() {
@ -159,16 +159,16 @@ func (p *PoolWithFunc) ticktock(ctx context.Context) {
} }
} }
func (p *PoolWithFunc) startHeartbeat() { func (p *PoolWithFunc) goPurge() {
// Start a goroutine to clean up expired workers periodically. // Start a goroutine to clean up expired workers periodically.
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopPurge = context.WithCancel(context.Background())
if !p.options.DisablePurge { if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx) go p.purgeStaleWorkers(ctx)
} }
} }
func (p *PoolWithFunc) startTicktock() { func (p *PoolWithFunc) goTicktock() {
p.now.Store(time.Now()) p.now.Store(time.Now())
var ctx context.Context var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background()) ctx, p.stopTicktock = context.WithCancel(context.Background())
@ -223,8 +223,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
} }
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
p.startHeartbeat() p.goPurge()
p.startTicktock() p.goTicktock()
return p, nil return p, nil
} }
@ -313,12 +313,12 @@ func (p *PoolWithFunc) Release() {
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil { if p.IsClosed() || p.stopPurge == nil || p.stopTicktock == nil {
return ErrPoolClosed return ErrPoolClosed
} }
p.stopHeartbeat() p.stopPurge()
p.stopHeartbeat = nil p.stopPurge = nil
p.stopTicktock() p.stopTicktock()
p.stopTicktock = nil p.stopTicktock = nil
p.Release() p.Release()
@ -326,7 +326,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
endTime := time.Now().Add(timeout) endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) { for time.Now().Before(endTime) {
if p.Running() == 0 && if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) && (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 { atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil return nil
} }
@ -338,10 +338,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
// Reboot reboots a closed pool. // Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() { func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.heartbeatDone, 0) atomic.StoreInt32(&p.purgeDone, 0)
p.startHeartbeat() p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0) atomic.StoreInt32(&p.ticktockDone, 0)
p.startTicktock() p.goTicktock()
} }
} }