diff --git a/ants.go b/ants.go index 38a0dca..6c57b7b 100644 --- a/ants.go +++ b/ants.go @@ -85,7 +85,7 @@ type Option func(opts *Options) // Options contains all options which will be applied when instantiating a ants pool. type Options struct { - // ExpiryDuration set the expired time (second) of every worker. + // ExpiryDuration set the expired time of every worker. ExpiryDuration time.Duration // PreAlloc indicate whether to make memory pre-allocation when initializing Pool. diff --git a/ants_test.go b/ants_test.go index dd8203d..ff3cae5 100644 --- a/ants_test.go +++ b/ants_test.go @@ -486,7 +486,11 @@ func TestMaxBlockingSubmit(t *testing.T) { func TestNonblockingSubmitWithFunc(t *testing.T) { poolSize := 10 - p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithNonblocking(true)) + ch1 := make(chan struct{}) + p, err := NewPoolWithFunc(poolSize, func(i interface{}) { + longRunningPoolFunc(i) + close(ch1) + }, WithNonblocking(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } @@ -506,7 +510,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) { } // interrupt f to get an available worker close(ch) - time.Sleep(1 * time.Second) + <-ch1 if err := p.Invoke(nil); err != nil { t.Fatalf("nonblocking submit when pool is not full shouldn't return error") } diff --git a/pool.go b/pool.go index 63b407c..14e29be 100644 --- a/pool.go +++ b/pool.go @@ -38,9 +38,6 @@ type Pool struct { // running is the number of the currently running goroutines. running int32 - // expiryDuration set the expired time (second) of every worker. - expiryDuration time.Duration - // workers is a slice that store the available workers. workers workerArray @@ -59,27 +56,15 @@ type Pool struct { // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. workerCache sync.Pool - // panicHandler is used to handle panics from each worker goroutine. - // if nil, panics will be thrown out again from worker goroutines. - panicHandler func(interface{}) + // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock + blockingNum int - // Max number of goroutine blocking on pool.Submit. - // 0 (default value) means no such limit. - maxBlockingTasks int32 - - // goroutine already been blocked on pool.Submit - // protected by pool.lock - blockingNum int32 - - // When nonblocking is true, Pool.Submit will never be blocked. - // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. - // When nonblocking is true, MaxBlockingTasks is inoperative. - nonblocking bool + options *Options } // Clear expired workers periodically. func (p *Pool) periodicallyPurge() { - heartbeat := time.NewTicker(p.expiryDuration) + heartbeat := time.NewTicker(p.options.ExpiryDuration) defer heartbeat.Stop() for range heartbeat.C { @@ -88,7 +73,7 @@ func (p *Pool) periodicallyPurge() { } p.lock.Lock() - expiredWorkers := p.workers.findOutExpiry(p.expiryDuration) + expiredWorkers := p.workers.findOutExpiry(p.options.ExpiryDuration) p.lock.Unlock() // Notify obsolete workers to stop. @@ -126,12 +111,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { } p := &Pool{ - capacity: int32(size), - expiryDuration: opts.ExpiryDuration, - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - lock: internal.NewSpinLock(), + capacity: int32(size), + lock: internal.NewSpinLock(), + options: opts, } p.workerCache = sync.Pool{ New: func() interface{} { @@ -141,7 +123,7 @@ func NewPool(size int, options ...Option) (*Pool, error) { } }, } - if opts.PreAlloc { + if p.options.PreAlloc { p.workers = newWorkerArray(loopQueueType, size) } else { p.workers = newWorkerArray(stackType, 0) @@ -187,7 +169,7 @@ func (p *Pool) Cap() int { // Tune changes the capacity of this pool. func (p *Pool) Tune(size int) { - if p.Cap() == size { + if size < 0 || p.Cap() == size || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) @@ -232,12 +214,12 @@ func (p *Pool) retrieveWorker() *goWorker { p.lock.Unlock() spawnWorker() } else { - if p.nonblocking { + if p.options.Nonblocking { p.lock.Unlock() return nil } Reentry: - if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks { + if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() return nil } diff --git a/pool_func.go b/pool_func.go index b174d21..bd5b42f 100644 --- a/pool_func.go +++ b/pool_func.go @@ -38,9 +38,6 @@ type PoolWithFunc struct { // running is the number of the currently running goroutines. running int32 - // expiryDuration set the expired time (second) of every worker. - expiryDuration time.Duration - // workers is a slice that store the available workers. workers []*goWorkerWithFunc @@ -62,27 +59,15 @@ type PoolWithFunc struct { // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker. workerCache sync.Pool - // panicHandler is used to handle panics from each worker goroutine. - // if nil, panics will be thrown out again from worker goroutines. - panicHandler func(interface{}) + // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock + blockingNum int - // Max number of goroutine blocking on pool.Submit. - // 0 (default value) means no such limit. - maxBlockingTasks int32 - - // goroutine already been blocked on pool.Submit - // protected by pool.lock - blockingNum int32 - - // When nonblocking is true, Pool.Submit will never be blocked. - // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. - // When nonblocking is true, MaxBlockingTasks is inoperative. - nonblocking bool + options *Options } // Clear expired workers periodically. func (p *PoolWithFunc) periodicallyPurge() { - heartbeat := time.NewTicker(p.expiryDuration) + heartbeat := time.NewTicker(p.options.ExpiryDuration) defer heartbeat.Stop() var expiredWorkers []*goWorkerWithFunc @@ -95,7 +80,7 @@ func (p *PoolWithFunc) periodicallyPurge() { idleWorkers := p.workers n := len(idleWorkers) var i int - for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ { + for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.options.ExpiryDuration; i++ { } expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) if i > 0 { @@ -147,13 +132,10 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } p := &PoolWithFunc{ - capacity: int32(size), - expiryDuration: opts.ExpiryDuration, - poolFunc: pf, - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - lock: internal.NewSpinLock(), + capacity: int32(size), + poolFunc: pf, + lock: internal.NewSpinLock(), + options: opts, } p.workerCache = sync.Pool{ New: func() interface{} { @@ -163,7 +145,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi } }, } - if opts.PreAlloc { + if p.options.PreAlloc { p.workers = make([]*goWorkerWithFunc, 0, size) } p.cond = sync.NewCond(p.lock) @@ -206,7 +188,7 @@ func (p *PoolWithFunc) Cap() int { // Tune change the capacity of this pool. func (p *PoolWithFunc) Tune(size int) { - if p.Cap() == size { + if size < 0 || p.Cap() == size || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) @@ -259,12 +241,12 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { p.lock.Unlock() spawnWorker() } else { - if p.nonblocking { + if p.options.Nonblocking { p.lock.Unlock() return nil } Reentry: - if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks { + if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { p.lock.Unlock() return nil } diff --git a/worker.go b/worker.go index f38e9d2..0eecd2c 100644 --- a/worker.go +++ b/worker.go @@ -50,8 +50,8 @@ func (w *goWorker) run() { defer func() { w.pool.decRunning() if p := recover(); p != nil { - if w.pool.panicHandler != nil { - w.pool.panicHandler(p) + if ph := w.pool.options.PanicHandler; ph != nil { + ph(p) } else { log.Printf("worker exits from a panic: %v\n", p) var buf [4096]byte diff --git a/worker_func.go b/worker_func.go index 4c56ae8..0d119df 100644 --- a/worker_func.go +++ b/worker_func.go @@ -50,8 +50,8 @@ func (w *goWorkerWithFunc) run() { defer func() { w.pool.decRunning() if p := recover(); p != nil { - if w.pool.panicHandler != nil { - w.pool.panicHandler(p) + if ph := w.pool.options.PanicHandler; ph != nil { + ph(p) } else { log.Printf("worker with func exits from a panic: %v\n", p) var buf [4096]byte