diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 3e225c6..a234372 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -9,6 +9,8 @@ on: branches: - master - dev + paths-ignore: + - '**.md' schedule: # ┌───────────── minute (0 - 59) # │ ┌───────────── hour (0 - 23) diff --git a/README.md b/README.md index e20a890..3f0a02b 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ A goroutine pool for Go

- + diff --git a/README_ZH.md b/README_ZH.md index 99d3dde..bbce7be 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -2,7 +2,7 @@ Go 语言的 goroutine 池

- + diff --git a/pool.go b/pool.go index 012bd6f..a5a8f1e 100644 --- a/pool.go +++ b/pool.go @@ -59,8 +59,11 @@ type Pool struct { // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock waiting int32 - heartbeatDone int32 - stopHeartbeat context.CancelFunc + purgeDone int32 + stopPurge context.CancelFunc + + ticktockDone int32 + stopTicktock context.CancelFunc now atomic.Value @@ -69,18 +72,18 @@ type Pool struct { // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. func (p *Pool) purgeStaleWorkers(ctx context.Context) { - heartbeat := time.NewTicker(p.options.ExpiryDuration) + ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { - heartbeat.Stop() - atomic.StoreInt32(&p.heartbeatDone, 1) + ticker.Stop() + atomic.StoreInt32(&p.purgeDone, 1) }() for { select { case <-ctx.Done(): return - case <-heartbeat.C: + case <-ticker.C: } if p.IsClosed() { @@ -111,15 +114,46 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *Pool) ticktock() { +func (p *Pool) ticktock(ctx context.Context) { ticker := time.NewTicker(nowTimeUpdateInterval) - defer ticker.Stop() + defer func() { + ticker.Stop() + atomic.StoreInt32(&p.ticktockDone, 1) + }() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if p.IsClosed() { + break + } - for range ticker.C { p.now.Store(time.Now()) } } +func (p *Pool) goPurge() { + if p.options.DisablePurge { + return + } + + // Start a goroutine to clean up expired workers periodically. + var ctx context.Context + ctx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers(ctx) +} + +func (p *Pool) goTicktock() { + p.now.Store(time.Now()) + var ctx context.Context + ctx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock(ctx) +} + func (p *Pool) nowTime() time.Time { return p.now.Load().(time.Time) } @@ -166,15 +200,8 @@ func NewPool(size int, options ...Option) (*Pool, error) { p.cond = sync.NewCond(p.lock) - // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } - - p.now.Store(time.Now()) - go p.ticktock() + p.goPurge() + p.goTicktock() return p, nil } @@ -259,17 +286,23 @@ func (p *Pool) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *Pool) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || p.stopHeartbeat == nil { + if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } - p.stopHeartbeat() - p.stopHeartbeat = nil + if p.stopPurge != nil { + p.stopPurge() + p.stopPurge = nil + } + p.stopTicktock() + p.stopTicktock = nil p.Release() endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } time.Sleep(10 * time.Millisecond) @@ -280,12 +313,10 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { // Reboot reboots a closed pool. func (p *Pool) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - atomic.StoreInt32(&p.heartbeatDone, 0) - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } + atomic.StoreInt32(&p.purgeDone, 0) + p.goPurge() + atomic.StoreInt32(&p.ticktockDone, 0) + p.goTicktock() } } diff --git a/pool_func.go b/pool_func.go index d466f1c..bcd90ee 100644 --- a/pool_func.go +++ b/pool_func.go @@ -61,8 +61,11 @@ type PoolWithFunc struct { // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock waiting int32 - heartbeatDone int32 - stopHeartbeat context.CancelFunc + purgeDone int32 + stopPurge context.CancelFunc + + ticktockDone int32 + stopTicktock context.CancelFunc now atomic.Value @@ -71,10 +74,10 @@ type PoolWithFunc struct { // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { - heartbeat := time.NewTicker(p.options.ExpiryDuration) + ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { - heartbeat.Stop() - atomic.StoreInt32(&p.heartbeatDone, 1) + ticker.Stop() + atomic.StoreInt32(&p.purgeDone, 1) }() var expiredWorkers []*goWorkerWithFunc @@ -82,7 +85,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { select { case <-ctx.Done(): return - case <-heartbeat.C: + case <-ticker.C: } if p.IsClosed() { @@ -134,15 +137,46 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *PoolWithFunc) ticktock() { +func (p *PoolWithFunc) ticktock(ctx context.Context) { ticker := time.NewTicker(nowTimeUpdateInterval) - defer ticker.Stop() + defer func() { + ticker.Stop() + atomic.StoreInt32(&p.ticktockDone, 1) + }() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if p.IsClosed() { + break + } - for range ticker.C { p.now.Store(time.Now()) } } +func (p *PoolWithFunc) goPurge() { + if p.options.DisablePurge { + return + } + + // Start a goroutine to clean up expired workers periodically. + var ctx context.Context + ctx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers(ctx) +} + +func (p *PoolWithFunc) goTicktock() { + p.now.Store(time.Now()) + var ctx context.Context + ctx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock(ctx) +} + func (p *PoolWithFunc) nowTime() time.Time { return p.now.Load().(time.Time) } @@ -191,15 +225,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p.cond = sync.NewCond(p.lock) - // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } - - p.now.Store(time.Now()) - go p.ticktock() + p.goPurge() + p.goTicktock() return p, nil } @@ -288,17 +315,23 @@ func (p *PoolWithFunc) Release() { // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || p.stopHeartbeat == nil { + if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } - p.stopHeartbeat() - p.stopHeartbeat = nil + if p.stopPurge != nil { + p.stopPurge() + p.stopPurge = nil + } + p.stopTicktock() + p.stopTicktock = nil p.Release() endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } time.Sleep(10 * time.Millisecond) @@ -309,12 +342,10 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { // Reboot reboots a closed pool. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - atomic.StoreInt32(&p.heartbeatDone, 0) - var ctx context.Context - ctx, p.stopHeartbeat = context.WithCancel(context.Background()) - if !p.options.DisablePurge { - go p.purgeStaleWorkers(ctx) - } + atomic.StoreInt32(&p.purgeDone, 0) + p.goPurge() + atomic.StoreInt32(&p.ticktockDone, 0) + p.goTicktock() } }