forked from mirror/ants
Make optimization to Pool and PoolWithFunc struct
This commit is contained in:
parent
5697095a46
commit
5ecbdf4bf2
2
ants.go
2
ants.go
|
@ -85,7 +85,7 @@ type Option func(opts *Options)
|
||||||
|
|
||||||
// Options contains all options which will be applied when instantiating a ants pool.
|
// Options contains all options which will be applied when instantiating a ants pool.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// ExpiryDuration set the expired time (second) of every worker.
|
// ExpiryDuration set the expired time of every worker.
|
||||||
ExpiryDuration time.Duration
|
ExpiryDuration time.Duration
|
||||||
|
|
||||||
// PreAlloc indicate whether to make memory pre-allocation when initializing Pool.
|
// PreAlloc indicate whether to make memory pre-allocation when initializing Pool.
|
||||||
|
|
|
@ -486,7 +486,11 @@ func TestMaxBlockingSubmit(t *testing.T) {
|
||||||
|
|
||||||
func TestNonblockingSubmitWithFunc(t *testing.T) {
|
func TestNonblockingSubmitWithFunc(t *testing.T) {
|
||||||
poolSize := 10
|
poolSize := 10
|
||||||
p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithNonblocking(true))
|
ch1 := make(chan struct{})
|
||||||
|
p, err := NewPoolWithFunc(poolSize, func(i interface{}) {
|
||||||
|
longRunningPoolFunc(i)
|
||||||
|
close(ch1)
|
||||||
|
}, WithNonblocking(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("create TimingPool failed: %s", err.Error())
|
t.Fatalf("create TimingPool failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -506,7 +510,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) {
|
||||||
}
|
}
|
||||||
// interrupt f to get an available worker
|
// interrupt f to get an available worker
|
||||||
close(ch)
|
close(ch)
|
||||||
time.Sleep(1 * time.Second)
|
<-ch1
|
||||||
if err := p.Invoke(nil); err != nil {
|
if err := p.Invoke(nil); err != nil {
|
||||||
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
|
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
|
||||||
}
|
}
|
||||||
|
|
42
pool.go
42
pool.go
|
@ -38,9 +38,6 @@ type Pool struct {
|
||||||
// running is the number of the currently running goroutines.
|
// running is the number of the currently running goroutines.
|
||||||
running int32
|
running int32
|
||||||
|
|
||||||
// expiryDuration set the expired time (second) of every worker.
|
|
||||||
expiryDuration time.Duration
|
|
||||||
|
|
||||||
// workers is a slice that store the available workers.
|
// workers is a slice that store the available workers.
|
||||||
workers workerArray
|
workers workerArray
|
||||||
|
|
||||||
|
@ -59,27 +56,15 @@ type Pool struct {
|
||||||
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
|
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
|
||||||
workerCache sync.Pool
|
workerCache sync.Pool
|
||||||
|
|
||||||
// panicHandler is used to handle panics from each worker goroutine.
|
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
|
||||||
// if nil, panics will be thrown out again from worker goroutines.
|
blockingNum int
|
||||||
panicHandler func(interface{})
|
|
||||||
|
|
||||||
// Max number of goroutine blocking on pool.Submit.
|
options *Options
|
||||||
// 0 (default value) means no such limit.
|
|
||||||
maxBlockingTasks int32
|
|
||||||
|
|
||||||
// goroutine already been blocked on pool.Submit
|
|
||||||
// protected by pool.lock
|
|
||||||
blockingNum int32
|
|
||||||
|
|
||||||
// When nonblocking is true, Pool.Submit will never be blocked.
|
|
||||||
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
|
|
||||||
// When nonblocking is true, MaxBlockingTasks is inoperative.
|
|
||||||
nonblocking bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear expired workers periodically.
|
// Clear expired workers periodically.
|
||||||
func (p *Pool) periodicallyPurge() {
|
func (p *Pool) periodicallyPurge() {
|
||||||
heartbeat := time.NewTicker(p.expiryDuration)
|
heartbeat := time.NewTicker(p.options.ExpiryDuration)
|
||||||
defer heartbeat.Stop()
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
for range heartbeat.C {
|
for range heartbeat.C {
|
||||||
|
@ -88,7 +73,7 @@ func (p *Pool) periodicallyPurge() {
|
||||||
}
|
}
|
||||||
|
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
expiredWorkers := p.workers.findOutExpiry(p.expiryDuration)
|
expiredWorkers := p.workers.findOutExpiry(p.options.ExpiryDuration)
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
|
||||||
// Notify obsolete workers to stop.
|
// Notify obsolete workers to stop.
|
||||||
|
@ -126,12 +111,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
capacity: int32(size),
|
capacity: int32(size),
|
||||||
expiryDuration: opts.ExpiryDuration,
|
lock: internal.NewSpinLock(),
|
||||||
nonblocking: opts.Nonblocking,
|
options: opts,
|
||||||
maxBlockingTasks: int32(opts.MaxBlockingTasks),
|
|
||||||
panicHandler: opts.PanicHandler,
|
|
||||||
lock: internal.NewSpinLock(),
|
|
||||||
}
|
}
|
||||||
p.workerCache = sync.Pool{
|
p.workerCache = sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
|
@ -141,7 +123,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if opts.PreAlloc {
|
if p.options.PreAlloc {
|
||||||
p.workers = newWorkerArray(loopQueueType, size)
|
p.workers = newWorkerArray(loopQueueType, size)
|
||||||
} else {
|
} else {
|
||||||
p.workers = newWorkerArray(stackType, 0)
|
p.workers = newWorkerArray(stackType, 0)
|
||||||
|
@ -187,7 +169,7 @@ func (p *Pool) Cap() int {
|
||||||
|
|
||||||
// Tune changes the capacity of this pool.
|
// Tune changes the capacity of this pool.
|
||||||
func (p *Pool) Tune(size int) {
|
func (p *Pool) Tune(size int) {
|
||||||
if p.Cap() == size {
|
if size < 0 || p.Cap() == size || p.options.PreAlloc {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
atomic.StoreInt32(&p.capacity, int32(size))
|
atomic.StoreInt32(&p.capacity, int32(size))
|
||||||
|
@ -232,12 +214,12 @@ func (p *Pool) retrieveWorker() *goWorker {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
spawnWorker()
|
spawnWorker()
|
||||||
} else {
|
} else {
|
||||||
if p.nonblocking {
|
if p.options.Nonblocking {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
Reentry:
|
Reentry:
|
||||||
if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks {
|
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
44
pool_func.go
44
pool_func.go
|
@ -38,9 +38,6 @@ type PoolWithFunc struct {
|
||||||
// running is the number of the currently running goroutines.
|
// running is the number of the currently running goroutines.
|
||||||
running int32
|
running int32
|
||||||
|
|
||||||
// expiryDuration set the expired time (second) of every worker.
|
|
||||||
expiryDuration time.Duration
|
|
||||||
|
|
||||||
// workers is a slice that store the available workers.
|
// workers is a slice that store the available workers.
|
||||||
workers []*goWorkerWithFunc
|
workers []*goWorkerWithFunc
|
||||||
|
|
||||||
|
@ -62,27 +59,15 @@ type PoolWithFunc struct {
|
||||||
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
|
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
|
||||||
workerCache sync.Pool
|
workerCache sync.Pool
|
||||||
|
|
||||||
// panicHandler is used to handle panics from each worker goroutine.
|
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
|
||||||
// if nil, panics will be thrown out again from worker goroutines.
|
blockingNum int
|
||||||
panicHandler func(interface{})
|
|
||||||
|
|
||||||
// Max number of goroutine blocking on pool.Submit.
|
options *Options
|
||||||
// 0 (default value) means no such limit.
|
|
||||||
maxBlockingTasks int32
|
|
||||||
|
|
||||||
// goroutine already been blocked on pool.Submit
|
|
||||||
// protected by pool.lock
|
|
||||||
blockingNum int32
|
|
||||||
|
|
||||||
// When nonblocking is true, Pool.Submit will never be blocked.
|
|
||||||
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
|
|
||||||
// When nonblocking is true, MaxBlockingTasks is inoperative.
|
|
||||||
nonblocking bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear expired workers periodically.
|
// Clear expired workers periodically.
|
||||||
func (p *PoolWithFunc) periodicallyPurge() {
|
func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
heartbeat := time.NewTicker(p.expiryDuration)
|
heartbeat := time.NewTicker(p.options.ExpiryDuration)
|
||||||
defer heartbeat.Stop()
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
var expiredWorkers []*goWorkerWithFunc
|
var expiredWorkers []*goWorkerWithFunc
|
||||||
|
@ -95,7 +80,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
idleWorkers := p.workers
|
idleWorkers := p.workers
|
||||||
n := len(idleWorkers)
|
n := len(idleWorkers)
|
||||||
var i int
|
var i int
|
||||||
for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ {
|
for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.options.ExpiryDuration; i++ {
|
||||||
}
|
}
|
||||||
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
|
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
|
@ -147,13 +132,10 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &PoolWithFunc{
|
p := &PoolWithFunc{
|
||||||
capacity: int32(size),
|
capacity: int32(size),
|
||||||
expiryDuration: opts.ExpiryDuration,
|
poolFunc: pf,
|
||||||
poolFunc: pf,
|
lock: internal.NewSpinLock(),
|
||||||
nonblocking: opts.Nonblocking,
|
options: opts,
|
||||||
maxBlockingTasks: int32(opts.MaxBlockingTasks),
|
|
||||||
panicHandler: opts.PanicHandler,
|
|
||||||
lock: internal.NewSpinLock(),
|
|
||||||
}
|
}
|
||||||
p.workerCache = sync.Pool{
|
p.workerCache = sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
|
@ -163,7 +145,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if opts.PreAlloc {
|
if p.options.PreAlloc {
|
||||||
p.workers = make([]*goWorkerWithFunc, 0, size)
|
p.workers = make([]*goWorkerWithFunc, 0, size)
|
||||||
}
|
}
|
||||||
p.cond = sync.NewCond(p.lock)
|
p.cond = sync.NewCond(p.lock)
|
||||||
|
@ -206,7 +188,7 @@ func (p *PoolWithFunc) Cap() int {
|
||||||
|
|
||||||
// Tune change the capacity of this pool.
|
// Tune change the capacity of this pool.
|
||||||
func (p *PoolWithFunc) Tune(size int) {
|
func (p *PoolWithFunc) Tune(size int) {
|
||||||
if p.Cap() == size {
|
if size < 0 || p.Cap() == size || p.options.PreAlloc {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
atomic.StoreInt32(&p.capacity, int32(size))
|
atomic.StoreInt32(&p.capacity, int32(size))
|
||||||
|
@ -259,12 +241,12 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
spawnWorker()
|
spawnWorker()
|
||||||
} else {
|
} else {
|
||||||
if p.nonblocking {
|
if p.options.Nonblocking {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
Reentry:
|
Reentry:
|
||||||
if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks {
|
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,8 +50,8 @@ func (w *goWorker) run() {
|
||||||
defer func() {
|
defer func() {
|
||||||
w.pool.decRunning()
|
w.pool.decRunning()
|
||||||
if p := recover(); p != nil {
|
if p := recover(); p != nil {
|
||||||
if w.pool.panicHandler != nil {
|
if ph := w.pool.options.PanicHandler; ph != nil {
|
||||||
w.pool.panicHandler(p)
|
ph(p)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("worker exits from a panic: %v\n", p)
|
log.Printf("worker exits from a panic: %v\n", p)
|
||||||
var buf [4096]byte
|
var buf [4096]byte
|
||||||
|
|
|
@ -50,8 +50,8 @@ func (w *goWorkerWithFunc) run() {
|
||||||
defer func() {
|
defer func() {
|
||||||
w.pool.decRunning()
|
w.pool.decRunning()
|
||||||
if p := recover(); p != nil {
|
if p := recover(); p != nil {
|
||||||
if w.pool.panicHandler != nil {
|
if ph := w.pool.options.PanicHandler; ph != nil {
|
||||||
w.pool.panicHandler(p)
|
ph(p)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("worker with func exits from a panic: %v\n", p)
|
log.Printf("worker with func exits from a panic: %v\n", p)
|
||||||
var buf [4096]byte
|
var buf [4096]byte
|
||||||
|
|
Loading…
Reference in New Issue