start a goroutine to clear expired workers when init a pool

This commit is contained in:
andy pan 2018-07-06 15:02:18 +08:00
commit 5cc2b16895
7 changed files with 80 additions and 16 deletions

11
ants.go
View File

@ -25,7 +25,6 @@ package ants
import (
"errors"
"math"
"runtime"
)
const (
@ -33,11 +32,11 @@ const (
DefaultPoolSize = math.MaxInt32
// DefaultCleanIntervalTime is the interval time to clean up goroutines
DefaultCleanIntervalTime = 30
DefaultCleanIntervalTime = 10
)
// Init a instance pool when importing ants
var defaultPool, _ = NewPool(DefaultPoolSize)
var defaultPool, _ = NewPool(DefaultPoolSize, DefaultCleanIntervalTime)
// Submit submit a task to pool
func Submit(task f) error {
@ -70,9 +69,3 @@ var (
ErrPoolClosed = errors.New("this pool has been closed")
)
var workerArgsCap = func() int {
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
return 1
}()

View File

@ -78,7 +78,7 @@ func BenchmarkGoroutineWithFunc(b *testing.B) {
func BenchmarkAntsPoolWithFunc(b *testing.B) {
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(50000, func(i interface{}) error {
p, _ := ants.NewPoolWithFunc(50000, 1,func(i interface{}) error {
demoPoolFunc(i)
wg.Done()
return nil
@ -104,7 +104,7 @@ func BenchmarkGoroutine(b *testing.B) {
}
func BenchmarkAntsPool(b *testing.B) {
p, _ := ants.NewPoolWithFunc(50000, demoPoolFunc)
p, _ := ants.NewPoolWithFunc(50000, 1, demoPoolFunc)
defer p.Release()
b.ResetTimer()

View File

@ -67,7 +67,7 @@ func main() {
// use the pool with a function
// set 10 the size of goroutine pool
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error {
p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error {
myFunc(i)
wg.Done()
return nil

36
pool.go
View File

@ -26,6 +26,7 @@ import (
"math"
"sync"
"sync/atomic"
"time"
)
type sig struct{}
@ -41,6 +42,9 @@ type Pool struct {
// running is the number of the currently running goroutines.
running int32
// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig
@ -57,8 +61,34 @@ type Pool struct {
once sync.Once
}
func (p *Pool) monitorAndClear() {
go func() {
for {
time.Sleep(p.expiryDuration)
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
n := 0
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.stop()
idleWorkers[i] = nil
}
if n > 0 {
n += 1
p.workers = idleWorkers[n:]
}
p.lock.Unlock()
}
}()
}
// NewPool generates a instance of ants pool
func NewPool(size int) (*Pool, error) {
func NewPool(size, expiry int) (*Pool, error) {
if size <= 0 {
return nil, ErrPoolSizeInvalid
}
@ -66,8 +96,9 @@ func NewPool(size int) (*Pool, error) {
capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second,
}
p.monitorAndClear()
return p, nil
}
@ -171,6 +202,7 @@ func (p *Pool) getWorker() *Worker {
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
p.lock.Unlock()

View File

@ -26,6 +26,7 @@ import (
"math"
"sync"
"sync/atomic"
"time"
)
type pf func(interface{}) error
@ -39,6 +40,9 @@ type PoolWithFunc struct {
// running is the number of the currently running goroutines.
running int32
// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig
@ -58,8 +62,33 @@ type PoolWithFunc struct {
once sync.Once
}
func (p *PoolWithFunc) MonitorAndClear() {
go func() {
for {
time.Sleep(p.expiryDuration)
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
n := 0
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.stop()
idleWorkers[i] = nil
}
if n > 0 {
n += 1
p.workers = idleWorkers[n:]
}
p.lock.Unlock()
}
}()
}
// NewPoolWithFunc generates a instance of ants pool with a specific function.
func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {
func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
if size <= 0 {
return nil, ErrPoolSizeInvalid
}
@ -67,9 +96,10 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {
capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second,
poolFunc: f,
}
p.MonitorAndClear()
return p, nil
}
@ -176,6 +206,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
p.lock.Unlock()

View File

@ -24,6 +24,7 @@ package ants
import (
"sync/atomic"
"time"
)
// Worker is the actual executor who runs the tasks,
@ -35,6 +36,9 @@ type Worker struct {
// task is a job should be done.
task chan f
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
}
// run starts a goroutine to repeat the process

View File

@ -24,6 +24,7 @@ package ants
import (
"sync/atomic"
"time"
)
// WorkerWithFunc is the actual executor who runs the tasks,
@ -35,6 +36,9 @@ type WorkerWithFunc struct {
// args is a job should be done.
args chan interface{}
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
}
// run starts a goroutine to repeat the process