opt: cache current time for workders and update it periodically

This commit is contained in:
Andy Pan 2022-12-11 18:16:45 +08:00
parent 03011bc512
commit 846d76a437
5 changed files with 135 additions and 25 deletions

View File

@ -88,6 +88,8 @@ var (
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
) )
const nowTimeUpdateInterval = 500 * time.Millisecond
// Logger is used for logging formatted messages. // Logger is used for logging formatted messages.
type Logger interface { type Logger interface {
// Printf must have the same semantics as log.Printf. // Printf must have the same semantics as log.Printf.

View File

@ -25,6 +25,7 @@ package ants
import ( import (
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
@ -144,3 +145,44 @@ func BenchmarkAntsPoolThroughput(b *testing.B) {
} }
b.StopTimer() b.StopTimer()
} }
func BenchmarkTimeNow(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = time.Now()
}
}
func BenchmarkTimeNowCache(b *testing.B) {
var (
now atomic.Value
offset int32
)
now.Store(time.Now())
go func() {
for range time.Tick(500 * time.Millisecond) {
now.Store(time.Now())
atomic.StoreInt32(&offset, 0)
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = now.Load().(time.Time).Add(time.Duration(atomic.AddInt32(&offset, 1)))
}
}
func BenchmarkTimeNowCache1(b *testing.B) {
var now atomic.Value
now.Store(time.Now())
go func() {
for range time.Tick(500 * time.Millisecond) {
now.Store(time.Now())
}
}()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = now.Load().(time.Time)
}
}

View File

@ -322,22 +322,50 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
_ = p1.Invoke("Oops!") _ = p1.Invoke("Oops!")
} }
func TestPurge(t *testing.T) { func TestPurgePool(t *testing.T) {
p, err := NewPool(10) size := 500
ch := make(chan struct{})
p, err := NewPool(size)
assert.NoErrorf(t, err, "create TimingPool failed: %v", err) assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release() defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * DefaultCleanIntervalTime) for i := 0; i < size; i++ {
assert.EqualValues(t, 0, p.Running(), "all p should be purged") j := i + 1
p1, err := NewPoolWithFunc(10, demoPoolFunc) _ = p.Submit(func() {
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err) <-ch
defer p1.Release() d := j % 100
_ = p1.Invoke(1) time.Sleep(time.Duration(d) * time.Millisecond)
time.Sleep(3 * DefaultCleanIntervalTime) })
assert.EqualValues(t, 0, p.Running(), "all p should be purged") }
assert.Equalf(t, size, p.Running(), "pool should be full, expected: %d, but got: %d", size, p.Running())
close(ch)
time.Sleep(5 * DefaultCleanIntervalTime)
assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running())
ch = make(chan struct{})
f := func(i interface{}) {
<-ch
d := i.(int) % 100
time.Sleep(time.Duration(d) * time.Millisecond)
} }
func TestPurgePreMalloc(t *testing.T) { p1, err := NewPoolWithFunc(size, f)
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
for i := 0; i < size; i++ {
_ = p1.Invoke(i)
}
assert.Equalf(t, size, p1.Running(), "pool should be full, expected: %d, but got: %d", size, p1.Running())
close(ch)
time.Sleep(5 * DefaultCleanIntervalTime)
assert.Equalf(t, 0, p1.Running(), "pool should be empty after purge, but got %d", p1.Running())
}
func TestPurgePreMallocPool(t *testing.T) {
p, err := NewPool(10, WithPreAlloc(true)) p, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err) assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release() defer p.Release()
@ -547,9 +575,7 @@ func TestInfinitePool(t *testing.T) {
} }
var err error var err error
_, err = NewPool(-1, WithPreAlloc(true)) _, err = NewPool(-1, WithPreAlloc(true))
if err != ErrInvalidPreAllocSize { assert.EqualErrorf(t, err, ErrInvalidPreAllocSize.Error(), "")
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
}
} }
func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) { func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) {

32
pool.go
View File

@ -62,11 +62,13 @@ type Pool struct {
heartbeatDone int32 heartbeatDone int32
stopHeartbeat context.CancelFunc stopHeartbeat context.CancelFunc
now atomic.Value
options *Options options *Options
} }
// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically(ctx context.Context) { func (p *Pool) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
@ -76,9 +78,9 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
for { for {
select { select {
case <-heartbeat.C:
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C:
} }
if p.IsClosed() { if p.IsClosed() {
@ -108,6 +110,20 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
} }
} }
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()
for range ticker.C {
p.now.Store(time.Now())
}
}
func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time)
}
// NewPool generates an instance of ants pool. // NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) { func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...) opts := loadOptions(options...)
@ -154,8 +170,12 @@ func NewPool(size int, options ...Option) (*Pool, error) {
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge { if !p.options.DisablePurge {
go p.purgePeriodically(ctx) go p.purgeStaleWorkers(ctx)
} }
p.now.Store(time.Now())
go p.ticktock()
return p, nil return p, nil
} }
@ -264,7 +284,7 @@ func (p *Pool) Reboot() {
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge { if !p.options.DisablePurge {
go p.purgePeriodically(ctx) go p.purgeStaleWorkers(ctx)
} }
} }
} }
@ -340,7 +360,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool {
p.cond.Broadcast() p.cond.Broadcast()
return false return false
} }
worker.recycleTime = time.Now() worker.recycleTime = p.nowTime()
p.lock.Lock() p.lock.Lock()
// To avoid memory leaks, add a double check in the lock scope. // To avoid memory leaks, add a double check in the lock scope.

View File

@ -64,11 +64,13 @@ type PoolWithFunc struct {
heartbeatDone int32 heartbeatDone int32
stopHeartbeat context.CancelFunc stopHeartbeat context.CancelFunc
now atomic.Value
options *Options options *Options
} }
// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration) heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() { defer func() {
heartbeat.Stop() heartbeat.Stop()
@ -78,9 +80,9 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
var expiredWorkers []*goWorkerWithFunc var expiredWorkers []*goWorkerWithFunc
for { for {
select { select {
case <-heartbeat.C:
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C:
} }
if p.IsClosed() { if p.IsClosed() {
@ -123,6 +125,20 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
} }
} }
// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()
for range ticker.C {
p.now.Store(time.Now())
}
}
func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time)
}
// NewPoolWithFunc generates an instance of ants pool with a specific function. // NewPoolWithFunc generates an instance of ants pool with a specific function.
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
if size <= 0 { if size <= 0 {
@ -171,8 +187,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge { if !p.options.DisablePurge {
go p.purgePeriodically(ctx) go p.purgeStaleWorkers(ctx)
} }
p.now.Store(time.Now())
go p.ticktock()
return p, nil return p, nil
} }
@ -285,7 +305,7 @@ func (p *PoolWithFunc) Reboot() {
var ctx context.Context var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background()) ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge { if !p.options.DisablePurge {
go p.purgePeriodically(ctx) go p.purgeStaleWorkers(ctx)
} }
} }
} }
@ -368,7 +388,7 @@ func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
p.cond.Broadcast() p.cond.Broadcast()
return false return false
} }
worker.recycleTime = time.Now() worker.recycleTime = p.nowTime()
p.lock.Lock() p.lock.Lock()
// To avoid memory leaks, add a double check in the lock scope. // To avoid memory leaks, add a double check in the lock scope.