diff --git a/README.md b/README.md index 6089b6d..7c153cb 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ A goroutine pool for Go - +

# [[中文](README_ZH.md)] diff --git a/README_ZH.md b/README_ZH.md index 7578eae..3cb47b7 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -12,7 +12,7 @@ A goroutine pool for Go - +

# [[英文](README.md)] diff --git a/ants.go b/ants.go index 8c4c8f6..38a0dca 100644 --- a/ants.go +++ b/ants.go @@ -30,11 +30,11 @@ import ( ) const ( - // DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool. - DEFAULT_ANTS_POOL_SIZE = math.MaxInt32 + // DefaultAntsPoolSize is the default capacity for a default goroutine pool. + DefaultAntsPoolSize = math.MaxInt32 - // DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines. - DEFAULT_CLEAN_INTERVAL_TIME = 1 + // DefaultCleanIntervalTime is the interval time to clean up goroutines. + DefaultCleanIntervalTime = time.Second // CLOSED represents that the pool is closed. CLOSED = 1 @@ -77,11 +77,13 @@ var ( }() // Init a instance pool when importing ants. - defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE) + defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) +// Option represents the optional function. type Option func(opts *Options) +// Options contains all options which will be applied when instantiating a ants pool. type Options struct { // ExpiryDuration set the expired time (second) of every worker. ExpiryDuration time.Duration @@ -103,36 +105,42 @@ type Options struct { PanicHandler func(interface{}) } +// WithOptions accepts the whole options config. func WithOptions(options Options) Option { return func(opts *Options) { *opts = options } } +// WithExpiryDuration sets up the interval time of cleaning up goroutines. func WithExpiryDuration(expiryDuration time.Duration) Option { return func(opts *Options) { opts.ExpiryDuration = expiryDuration } } +// WithPreAlloc indicates whether it should malloc for workers. func WithPreAlloc(preAlloc bool) Option { return func(opts *Options) { opts.PreAlloc = preAlloc } } +// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool. func WithMaxBlockingTasks(maxBlockingTasks int) Option { return func(opts *Options) { opts.MaxBlockingTasks = maxBlockingTasks } } +// WithNonblocking indicates that pool will return nil when there is no available workers. func WithNonblocking(nonblocking bool) Option { return func(opts *Options) { opts.Nonblocking = nonblocking } } +// WithPanicHandler sets up panic handler. func WithPanicHandler(panicHandler func(interface{})) Option { return func(opts *Options) { opts.PanicHandler = panicHandler diff --git a/ants_test.go b/ants_test.go index 854a20b..60aac94 100644 --- a/ants_test.go +++ b/ants_test.go @@ -33,8 +33,8 @@ import ( ) const ( - _ = 1 << (10 * iota) - //KiB // 1024 + _ = 1 << (10 * iota) + KiB // 1024 MiB // 1048576 //GiB // 1073741824 //TiB // 1099511627776 (超过了int32的范围) @@ -143,7 +143,7 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) { for i := 0; i < AntsSize; i++ { _ = p.Submit(demoFunc) } - time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(2 * ants.DefaultCleanIntervalTime) _ = p.Submit(demoFunc) t.Logf("pool, running workers number:%d", p.Running()) mem := runtime.MemStats{} @@ -161,7 +161,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) { for i := 0; i < AntsSize; i++ { _ = p.Invoke(dur) } - time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(2 * ants.DefaultCleanIntervalTime) _ = p.Invoke(dur) t.Logf("pool with func, running workers number:%d", p.Running()) mem := runtime.MemStats{} @@ -178,7 +178,7 @@ func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) { for i := 0; i < AntsSize; i++ { _ = p.Invoke(dur) } - time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(2 * ants.DefaultCleanIntervalTime) _ = p.Invoke(dur) t.Logf("pool with func, running workers number:%d", p.Running()) mem := runtime.MemStats{} @@ -369,7 +369,7 @@ func TestPurge(t *testing.T) { } defer p.Release() _ = p.Submit(demoFunc) - time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(3 * ants.DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } @@ -379,7 +379,7 @@ func TestPurge(t *testing.T) { } defer p1.Release() _ = p1.Invoke(1) - time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(3 * ants.DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } @@ -392,7 +392,7 @@ func TestPurgePreMalloc(t *testing.T) { } defer p.Release() _ = p.Submit(demoFunc) - time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(3 * ants.DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } @@ -402,7 +402,7 @@ func TestPurgePreMalloc(t *testing.T) { } defer p1.Release() _ = p1.Invoke(1) - time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(3 * ants.DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } @@ -608,7 +608,7 @@ func TestRestCodeCoverage(t *testing.T) { for i := 0; i < n; i++ { _ = p.Invoke(Param) } - time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(ants.DefaultCleanIntervalTime) t.Logf("pool with func, capacity:%d", p.Cap()) t.Logf("pool with func, running workers number:%d", p.Running()) t.Logf("pool with func, free workers number:%d", p.Free()) @@ -624,7 +624,7 @@ func TestRestCodeCoverage(t *testing.T) { for i := 0; i < n; i++ { _ = ppremWithFunc.Invoke(Param) } - time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + time.Sleep(ants.DefaultCleanIntervalTime) t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap()) t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running()) t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free()) diff --git a/spinlock.go b/internal/spinlock.go similarity index 82% rename from spinlock.go rename to internal/spinlock.go index 3b87c87..16607ef 100644 --- a/spinlock.go +++ b/internal/spinlock.go @@ -2,7 +2,7 @@ // Use of this source code is governed by an MIT-style // license that can be found in the LICENSE file. -package ants +package internal import ( "runtime" @@ -22,6 +22,7 @@ func (sl *spinLock) Unlock() { atomic.StoreUint32((*uint32)(sl), 0) } -func SpinLock() sync.Locker { +// NewSpinLock instantiates a spin-lock. +func NewSpinLock() sync.Locker { return new(spinLock) } diff --git a/pool.go b/pool.go index 9af46de..e8e8a6f 100644 --- a/pool.go +++ b/pool.go @@ -26,6 +26,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/panjf2000/ants/v2/internal" ) // Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. @@ -134,29 +136,19 @@ func NewPool(size int, options ...Option) (*Pool, error) { if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { - opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second + opts.ExpiryDuration = DefaultCleanIntervalTime } - var p *Pool + p := &Pool{ + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, + lock: internal.NewSpinLock(), + } if opts.PreAlloc { - p = &Pool{ - capacity: int32(size), - expiryDuration: opts.ExpiryDuration, - workers: make([]*goWorker, 0, size), - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - lock: SpinLock(), - } - } else { - p = &Pool{ - capacity: int32(size), - expiryDuration: opts.ExpiryDuration, - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - lock: SpinLock(), - } + p.workers = make([]*goWorker, 0, size) } p.cond = sync.NewCond(p.lock) @@ -173,11 +165,11 @@ func (p *Pool) Submit(task func()) error { if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } - if w := p.retrieveWorker(); w == nil { + var w *goWorker + if w = p.retrieveWorker(); w == nil { return ErrPoolOverload - } else { - w.task <- task } + w.task <- task return nil } @@ -188,7 +180,7 @@ func (p *Pool) Running() int { // Free returns the available goroutines to work. func (p *Pool) Free() int { - return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) + return p.Cap() - p.Running() } // Cap returns the capacity of this pool. diff --git a/pool_func.go b/pool_func.go index 59d3187..f2dd704 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,6 +26,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/panjf2000/ants/v2/internal" ) // PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. @@ -141,31 +143,20 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { - opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second + opts.ExpiryDuration = DefaultCleanIntervalTime } - var p *PoolWithFunc + p := &PoolWithFunc{ + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + poolFunc: pf, + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, + lock: internal.NewSpinLock(), + } if opts.PreAlloc { - p = &PoolWithFunc{ - capacity: int32(size), - expiryDuration: opts.ExpiryDuration, - poolFunc: pf, - workers: make([]*goWorkerWithFunc, 0, size), - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - lock: SpinLock(), - } - } else { - p = &PoolWithFunc{ - capacity: int32(size), - expiryDuration: opts.ExpiryDuration, - poolFunc: pf, - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - lock: SpinLock(), - } + p.workers = make([]*goWorkerWithFunc, 0, size) } p.cond = sync.NewCond(p.lock) @@ -182,11 +173,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error { if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } - if w := p.retrieveWorker(); w == nil { + var w *goWorkerWithFunc + if w = p.retrieveWorker(); w == nil { return ErrPoolOverload - } else { - w.args <- args } + w.args <- args return nil } @@ -197,7 +188,7 @@ func (p *PoolWithFunc) Running() int { // Free returns a available goroutines to work. func (p *PoolWithFunc) Free() int { - return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) + return p.Cap() - p.Running() } // Cap returns the capacity of this pool.