Refactoring

This commit is contained in:
Andy Pan 2019-10-04 11:24:13 +08:00
parent 2b9f35b18f
commit b0ec5102cc
7 changed files with 62 additions and 70 deletions

View File

@ -12,7 +12,7 @@ A goroutine pool for Go
<a title="Godoc for ants" target="_blank" href="https://godoc.org/github.com/panjf2000/ants"><img src="https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square"></a> <a title="Godoc for ants" target="_blank" href="https://godoc.org/github.com/panjf2000/ants"><img src="https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square"></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/release/panjf2000/ants.svg?style=flat-square"></a> <a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/release/panjf2000/ants.svg?style=flat-square"></a>
<a title="License" target="_blank" href="https://opensource.org/licenses/mit-license.php"><img src="https://img.shields.io/aur/license/pac?style=flat-square"></a> <a title="License" target="_blank" href="https://opensource.org/licenses/mit-license.php"><img src="https://img.shields.io/aur/license/pac?style=flat-square"></a>
<a title="Awesome" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/badge-flat2.svg"></a> <a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/mentioned-badge-flat.svg"></a>
</p> </p>
# [[中文](README_ZH.md)] # [[中文](README_ZH.md)]

View File

@ -12,7 +12,7 @@ A goroutine pool for Go
<a title="Godoc for ants" target="_blank" href="https://godoc.org/github.com/panjf2000/ants"><img src="https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square"></a> <a title="Godoc for ants" target="_blank" href="https://godoc.org/github.com/panjf2000/ants"><img src="https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square"></a>
<a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/release/panjf2000/ants.svg?style=flat-square"></a> <a title="Release" target="_blank" href="https://github.com/panjf2000/ants/releases"><img src="https://img.shields.io/github/release/panjf2000/ants.svg?style=flat-square"></a>
<a title="License" target="_blank" href="https://opensource.org/licenses/mit-license.php"><img src="https://img.shields.io/aur/license/pac?style=flat-square"></a> <a title="License" target="_blank" href="https://opensource.org/licenses/mit-license.php"><img src="https://img.shields.io/aur/license/pac?style=flat-square"></a>
<a title="Awesome" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/badge-flat2.svg"></a> <a title="Mentioned in Awesome Go" target="_blank" href="https://github.com/avelino/awesome-go"><img src="https://awesome.re/mentioned-badge-flat.svg"></a>
</p> </p>
# [[英文](README.md)] # [[英文](README.md)]

18
ants.go
View File

@ -30,11 +30,11 @@ import (
) )
const ( const (
// DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool. // DefaultAntsPoolSize is the default capacity for a default goroutine pool.
DEFAULT_ANTS_POOL_SIZE = math.MaxInt32 DefaultAntsPoolSize = math.MaxInt32
// DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines. // DefaultCleanIntervalTime is the interval time to clean up goroutines.
DEFAULT_CLEAN_INTERVAL_TIME = 1 DefaultCleanIntervalTime = time.Second
// CLOSED represents that the pool is closed. // CLOSED represents that the pool is closed.
CLOSED = 1 CLOSED = 1
@ -77,11 +77,13 @@ var (
}() }()
// Init a instance pool when importing ants. // Init a instance pool when importing ants.
defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE) defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
) )
// Option represents the optional function.
type Option func(opts *Options) type Option func(opts *Options)
// Options contains all options which will be applied when instantiating a ants pool.
type Options struct { type Options struct {
// ExpiryDuration set the expired time (second) of every worker. // ExpiryDuration set the expired time (second) of every worker.
ExpiryDuration time.Duration ExpiryDuration time.Duration
@ -103,36 +105,42 @@ type Options struct {
PanicHandler func(interface{}) PanicHandler func(interface{})
} }
// WithOptions accepts the whole options config.
func WithOptions(options Options) Option { func WithOptions(options Options) Option {
return func(opts *Options) { return func(opts *Options) {
*opts = options *opts = options
} }
} }
// WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option { func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) { return func(opts *Options) {
opts.ExpiryDuration = expiryDuration opts.ExpiryDuration = expiryDuration
} }
} }
// WithPreAlloc indicates whether it should malloc for workers.
func WithPreAlloc(preAlloc bool) Option { func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) { return func(opts *Options) {
opts.PreAlloc = preAlloc opts.PreAlloc = preAlloc
} }
} }
// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option { func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) { return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks opts.MaxBlockingTasks = maxBlockingTasks
} }
} }
// WithNonblocking indicates that pool will return nil when there is no available workers.
func WithNonblocking(nonblocking bool) Option { func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) { return func(opts *Options) {
opts.Nonblocking = nonblocking opts.Nonblocking = nonblocking
} }
} }
// WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option { func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) { return func(opts *Options) {
opts.PanicHandler = panicHandler opts.PanicHandler = panicHandler

View File

@ -34,7 +34,7 @@ import (
const ( const (
_ = 1 << (10 * iota) _ = 1 << (10 * iota)
//KiB // 1024 KiB // 1024
MiB // 1048576 MiB // 1048576
//GiB // 1073741824 //GiB // 1073741824
//TiB // 1099511627776 (超过了int32的范围) //TiB // 1099511627776 (超过了int32的范围)
@ -143,7 +143,7 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ { for i := 0; i < AntsSize; i++ {
_ = p.Submit(demoFunc) _ = p.Submit(demoFunc)
} }
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Submit(demoFunc) _ = p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running()) t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{} mem := runtime.MemStats{}
@ -161,7 +161,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ { for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur) _ = p.Invoke(dur)
} }
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Invoke(dur) _ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running()) t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{} mem := runtime.MemStats{}
@ -178,7 +178,7 @@ func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
for i := 0; i < AntsSize; i++ { for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur) _ = p.Invoke(dur)
} }
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Invoke(dur) _ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running()) t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{} mem := runtime.MemStats{}
@ -369,7 +369,7 @@ func TestPurge(t *testing.T) {
} }
defer p.Release() defer p.Release()
_ = p.Submit(demoFunc) _ = p.Submit(demoFunc)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 { if p.Running() != 0 {
t.Error("all p should be purged") t.Error("all p should be purged")
} }
@ -379,7 +379,7 @@ func TestPurge(t *testing.T) {
} }
defer p1.Release() defer p1.Release()
_ = p1.Invoke(1) _ = p1.Invoke(1)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 { if p.Running() != 0 {
t.Error("all p should be purged") t.Error("all p should be purged")
} }
@ -392,7 +392,7 @@ func TestPurgePreMalloc(t *testing.T) {
} }
defer p.Release() defer p.Release()
_ = p.Submit(demoFunc) _ = p.Submit(demoFunc)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 { if p.Running() != 0 {
t.Error("all p should be purged") t.Error("all p should be purged")
} }
@ -402,7 +402,7 @@ func TestPurgePreMalloc(t *testing.T) {
} }
defer p1.Release() defer p1.Release()
_ = p1.Invoke(1) _ = p1.Invoke(1)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 { if p.Running() != 0 {
t.Error("all p should be purged") t.Error("all p should be purged")
} }
@ -608,7 +608,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
_ = p.Invoke(Param) _ = p.Invoke(Param)
} }
time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(ants.DefaultCleanIntervalTime)
t.Logf("pool with func, capacity:%d", p.Cap()) t.Logf("pool with func, capacity:%d", p.Cap())
t.Logf("pool with func, running workers number:%d", p.Running()) t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free()) t.Logf("pool with func, free workers number:%d", p.Free())
@ -624,7 +624,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
_ = ppremWithFunc.Invoke(Param) _ = ppremWithFunc.Invoke(Param)
} }
time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) time.Sleep(ants.DefaultCleanIntervalTime)
t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap()) t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap())
t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running()) t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running())
t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free()) t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free())

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by an MIT-style // Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package ants package internal
import ( import (
"runtime" "runtime"
@ -22,6 +22,7 @@ func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0) atomic.StoreUint32((*uint32)(sl), 0)
} }
func SpinLock() sync.Locker { // NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock) return new(spinLock)
} }

40
pool.go
View File

@ -26,6 +26,8 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/panjf2000/ants/v2/internal"
) )
// Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. // Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
@ -134,29 +136,19 @@ func NewPool(size int, options ...Option) (*Pool, error) {
if expiry := opts.ExpiryDuration; expiry < 0 { if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry return nil, ErrInvalidPoolExpiry
} else if expiry == 0 { } else if expiry == 0 {
opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second opts.ExpiryDuration = DefaultCleanIntervalTime
} }
var p *Pool p := &Pool{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: internal.NewSpinLock(),
}
if opts.PreAlloc { if opts.PreAlloc {
p = &Pool{ p.workers = make([]*goWorker, 0, size)
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
workers: make([]*goWorker, 0, size),
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
} else {
p = &Pool{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
} }
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
@ -173,11 +165,11 @@ func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed return ErrPoolClosed
} }
if w := p.retrieveWorker(); w == nil { var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload return ErrPoolOverload
} else {
w.task <- task
} }
w.task <- task
return nil return nil
} }
@ -188,7 +180,7 @@ func (p *Pool) Running() int {
// Free returns the available goroutines to work. // Free returns the available goroutines to work.
func (p *Pool) Free() int { func (p *Pool) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return p.Cap() - p.Running()
} }
// Cap returns the capacity of this pool. // Cap returns the capacity of this pool.

View File

@ -26,6 +26,8 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/panjf2000/ants/v2/internal"
) )
// PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. // PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
@ -141,31 +143,20 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
if expiry := opts.ExpiryDuration; expiry < 0 { if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry return nil, ErrInvalidPoolExpiry
} else if expiry == 0 { } else if expiry == 0 {
opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second opts.ExpiryDuration = DefaultCleanIntervalTime
} }
var p *PoolWithFunc p := &PoolWithFunc{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: internal.NewSpinLock(),
}
if opts.PreAlloc { if opts.PreAlloc {
p = &PoolWithFunc{ p.workers = make([]*goWorkerWithFunc, 0, size)
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
workers: make([]*goWorkerWithFunc, 0, size),
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
} else {
p = &PoolWithFunc{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: SpinLock(),
}
} }
p.cond = sync.NewCond(p.lock) p.cond = sync.NewCond(p.lock)
@ -182,11 +173,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed return ErrPoolClosed
} }
if w := p.retrieveWorker(); w == nil { var w *goWorkerWithFunc
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload return ErrPoolOverload
} else {
w.args <- args
} }
w.args <- args
return nil return nil
} }
@ -197,7 +188,7 @@ func (p *PoolWithFunc) Running() int {
// Free returns a available goroutines to work. // Free returns a available goroutines to work.
func (p *PoolWithFunc) Free() int { func (p *PoolWithFunc) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return p.Cap() - p.Running()
} }
// Cap returns the capacity of this pool. // Cap returns the capacity of this pool.