From b0ec5102cc8bba67e94d12895b98bcf343212e4a Mon Sep 17 00:00:00 2001
From: Andy Pan
Date: Fri, 4 Oct 2019 11:24:13 +0800
Subject: [PATCH] Refactoring
---
README.md | 2 +-
README_ZH.md | 2 +-
ants.go | 18 ++++++++----
ants_test.go | 22 +++++++--------
spinlock.go => internal/spinlock.go | 5 ++--
pool.go | 40 +++++++++++----------------
pool_func.go | 43 ++++++++++++-----------------
7 files changed, 62 insertions(+), 70 deletions(-)
rename spinlock.go => internal/spinlock.go (82%)
diff --git a/README.md b/README.md
index 6089b6d..7c153cb 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@ A goroutine pool for Go
-
+
# [[中文](README_ZH.md)]
diff --git a/README_ZH.md b/README_ZH.md
index 7578eae..3cb47b7 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -12,7 +12,7 @@ A goroutine pool for Go
-
+
# [[英文](README.md)]
diff --git a/ants.go b/ants.go
index 8c4c8f6..38a0dca 100644
--- a/ants.go
+++ b/ants.go
@@ -30,11 +30,11 @@ import (
)
const (
- // DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool.
- DEFAULT_ANTS_POOL_SIZE = math.MaxInt32
+ // DefaultAntsPoolSize is the default capacity for a default goroutine pool.
+ DefaultAntsPoolSize = math.MaxInt32
- // DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines.
- DEFAULT_CLEAN_INTERVAL_TIME = 1
+ // DefaultCleanIntervalTime is the interval time to clean up goroutines.
+ DefaultCleanIntervalTime = time.Second
// CLOSED represents that the pool is closed.
CLOSED = 1
@@ -77,11 +77,13 @@ var (
}()
// Init a instance pool when importing ants.
- defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE)
+ defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)
+// Option represents the optional function.
type Option func(opts *Options)
+// Options contains all options which will be applied when instantiating a ants pool.
type Options struct {
// ExpiryDuration set the expired time (second) of every worker.
ExpiryDuration time.Duration
@@ -103,36 +105,42 @@ type Options struct {
PanicHandler func(interface{})
}
+// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
+// WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
+// WithPreAlloc indicates whether it should malloc for workers.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
+// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
+// WithNonblocking indicates that pool will return nil when there is no available workers.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
+// WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
diff --git a/ants_test.go b/ants_test.go
index 854a20b..60aac94 100644
--- a/ants_test.go
+++ b/ants_test.go
@@ -33,8 +33,8 @@ import (
)
const (
- _ = 1 << (10 * iota)
- //KiB // 1024
+ _ = 1 << (10 * iota)
+ KiB // 1024
MiB // 1048576
//GiB // 1073741824
//TiB // 1099511627776 (超过了int32的范围)
@@ -143,7 +143,7 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ {
_ = p.Submit(demoFunc)
}
- time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
@@ -161,7 +161,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
- time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
@@ -178,7 +178,7 @@ func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
- time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(2 * ants.DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
@@ -369,7 +369,7 @@ func TestPurge(t *testing.T) {
}
defer p.Release()
_ = p.Submit(demoFunc)
- time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
@@ -379,7 +379,7 @@ func TestPurge(t *testing.T) {
}
defer p1.Release()
_ = p1.Invoke(1)
- time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
@@ -392,7 +392,7 @@ func TestPurgePreMalloc(t *testing.T) {
}
defer p.Release()
_ = p.Submit(demoFunc)
- time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
@@ -402,7 +402,7 @@ func TestPurgePreMalloc(t *testing.T) {
}
defer p1.Release()
_ = p1.Invoke(1)
- time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(3 * ants.DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
@@ -608,7 +608,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
_ = p.Invoke(Param)
}
- time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(ants.DefaultCleanIntervalTime)
t.Logf("pool with func, capacity:%d", p.Cap())
t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free())
@@ -624,7 +624,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
_ = ppremWithFunc.Invoke(Param)
}
- time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
+ time.Sleep(ants.DefaultCleanIntervalTime)
t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap())
t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running())
t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free())
diff --git a/spinlock.go b/internal/spinlock.go
similarity index 82%
rename from spinlock.go
rename to internal/spinlock.go
index 3b87c87..16607ef 100644
--- a/spinlock.go
+++ b/internal/spinlock.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
-package ants
+package internal
import (
"runtime"
@@ -22,6 +22,7 @@ func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
-func SpinLock() sync.Locker {
+// NewSpinLock instantiates a spin-lock.
+func NewSpinLock() sync.Locker {
return new(spinLock)
}
diff --git a/pool.go b/pool.go
index 9af46de..e8e8a6f 100644
--- a/pool.go
+++ b/pool.go
@@ -26,6 +26,8 @@ import (
"sync"
"sync/atomic"
"time"
+
+ "github.com/panjf2000/ants/v2/internal"
)
// Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
@@ -134,29 +136,19 @@ func NewPool(size int, options ...Option) (*Pool, error) {
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
- opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second
+ opts.ExpiryDuration = DefaultCleanIntervalTime
}
- var p *Pool
+ p := &Pool{
+ capacity: int32(size),
+ expiryDuration: opts.ExpiryDuration,
+ nonblocking: opts.Nonblocking,
+ maxBlockingTasks: int32(opts.MaxBlockingTasks),
+ panicHandler: opts.PanicHandler,
+ lock: internal.NewSpinLock(),
+ }
if opts.PreAlloc {
- p = &Pool{
- capacity: int32(size),
- expiryDuration: opts.ExpiryDuration,
- workers: make([]*goWorker, 0, size),
- nonblocking: opts.Nonblocking,
- maxBlockingTasks: int32(opts.MaxBlockingTasks),
- panicHandler: opts.PanicHandler,
- lock: SpinLock(),
- }
- } else {
- p = &Pool{
- capacity: int32(size),
- expiryDuration: opts.ExpiryDuration,
- nonblocking: opts.Nonblocking,
- maxBlockingTasks: int32(opts.MaxBlockingTasks),
- panicHandler: opts.PanicHandler,
- lock: SpinLock(),
- }
+ p.workers = make([]*goWorker, 0, size)
}
p.cond = sync.NewCond(p.lock)
@@ -173,11 +165,11 @@ func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed
}
- if w := p.retrieveWorker(); w == nil {
+ var w *goWorker
+ if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
- } else {
- w.task <- task
}
+ w.task <- task
return nil
}
@@ -188,7 +180,7 @@ func (p *Pool) Running() int {
// Free returns the available goroutines to work.
func (p *Pool) Free() int {
- return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
+ return p.Cap() - p.Running()
}
// Cap returns the capacity of this pool.
diff --git a/pool_func.go b/pool_func.go
index 59d3187..f2dd704 100644
--- a/pool_func.go
+++ b/pool_func.go
@@ -26,6 +26,8 @@ import (
"sync"
"sync/atomic"
"time"
+
+ "github.com/panjf2000/ants/v2/internal"
)
// PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
@@ -141,31 +143,20 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
- opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second
+ opts.ExpiryDuration = DefaultCleanIntervalTime
}
- var p *PoolWithFunc
+ p := &PoolWithFunc{
+ capacity: int32(size),
+ expiryDuration: opts.ExpiryDuration,
+ poolFunc: pf,
+ nonblocking: opts.Nonblocking,
+ maxBlockingTasks: int32(opts.MaxBlockingTasks),
+ panicHandler: opts.PanicHandler,
+ lock: internal.NewSpinLock(),
+ }
if opts.PreAlloc {
- p = &PoolWithFunc{
- capacity: int32(size),
- expiryDuration: opts.ExpiryDuration,
- poolFunc: pf,
- workers: make([]*goWorkerWithFunc, 0, size),
- nonblocking: opts.Nonblocking,
- maxBlockingTasks: int32(opts.MaxBlockingTasks),
- panicHandler: opts.PanicHandler,
- lock: SpinLock(),
- }
- } else {
- p = &PoolWithFunc{
- capacity: int32(size),
- expiryDuration: opts.ExpiryDuration,
- poolFunc: pf,
- nonblocking: opts.Nonblocking,
- maxBlockingTasks: int32(opts.MaxBlockingTasks),
- panicHandler: opts.PanicHandler,
- lock: SpinLock(),
- }
+ p.workers = make([]*goWorkerWithFunc, 0, size)
}
p.cond = sync.NewCond(p.lock)
@@ -182,11 +173,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed
}
- if w := p.retrieveWorker(); w == nil {
+ var w *goWorkerWithFunc
+ if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
- } else {
- w.args <- args
}
+ w.args <- args
return nil
}
@@ -197,7 +188,7 @@ func (p *PoolWithFunc) Running() int {
// Free returns a available goroutines to work.
func (p *PoolWithFunc) Free() int {
- return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
+ return p.Cap() - p.Running()
}
// Cap returns the capacity of this pool.