Merge branch 'develop'

This commit is contained in:
Andy Pan 2018-08-04 11:14:32 +08:00
commit dc5c8af3e8
3 changed files with 34 additions and 36 deletions

18
ants.go
View File

@ -28,42 +28,42 @@ import (
) )
const ( const (
// DefaultAntsPoolSize is the default capacity for a default goroutine pool // DefaultAntsPoolSize is the default capacity for a default goroutine pool.
DefaultAntsPoolSize = math.MaxInt32 DefaultAntsPoolSize = math.MaxInt32
// DefaultCleanIntervalTime is the interval time to clean up goroutines // DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = 10 DefaultCleanIntervalTime = 10
) )
// Init a instance pool when importing ants // Init a instance pool when importing ants.
var defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) var defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
// Submit submit a task to pool // Submit submits a task to pool.
func Submit(task f) error { func Submit(task f) error {
return defaultAntsPool.Submit(task) return defaultAntsPool.Submit(task)
} }
// Running returns the number of the currently running goroutines // Running returns the number of the currently running goroutines.
func Running() int { func Running() int {
return defaultAntsPool.Running() return defaultAntsPool.Running()
} }
// Cap returns the capacity of this default pool // Cap returns the capacity of this default pool.
func Cap() int { func Cap() int {
return defaultAntsPool.Cap() return defaultAntsPool.Cap()
} }
// Free returns the available goroutines to work // Free returns the available goroutines to work.
func Free() int { func Free() int {
return defaultAntsPool.Free() return defaultAntsPool.Free()
} }
// Release Closed the default pool // Release Closes the default pool.
func Release() { func Release() {
defaultAntsPool.Release() defaultAntsPool.Release()
} }
// Errors for the Ants API // Errors for the Ants API.
var ( var (
ErrInvalidPoolSize = errors.New("invalid size for pool") ErrInvalidPoolSize = errors.New("invalid size for pool")
ErrInvalidPoolExpiry = errors.New("invalid expiry for pool") ErrInvalidPoolExpiry = errors.New("invalid expiry for pool")

24
pool.go
View File

@ -50,12 +50,12 @@ type Pool struct {
// release is used to notice the pool to closed itself. // release is used to notice the pool to closed itself.
release chan sig release chan sig
// lock for synchronous operation // lock for synchronous operation.
lock sync.Mutex lock sync.Mutex
once sync.Once once sync.Once
} }
// clear expired workers periodically.
func (p *Pool) periodicallyPurge() { func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration) heartbeat := time.NewTicker(p.expiryDuration)
for range heartbeat.C { for range heartbeat.C {
@ -83,12 +83,12 @@ func (p *Pool) periodicallyPurge() {
} }
} }
// NewPool generates a instance of ants pool // NewPool generates an instance of ants pool.
func NewPool(size int) (*Pool, error) { func NewPool(size int) (*Pool, error) {
return NewTimingPool(size, DefaultCleanIntervalTime) return NewTimingPool(size, DefaultCleanIntervalTime)
} }
// NewTimingPool generates a instance of ants pool with a custom timed task // NewTimingPool generates an instance of ants pool with a custom timed task.
func NewTimingPool(size, expiry int) (*Pool, error) { func NewTimingPool(size, expiry int) (*Pool, error) {
if size <= 0 { if size <= 0 {
return nil, ErrInvalidPoolSize return nil, ErrInvalidPoolSize
@ -107,7 +107,7 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// Submit submit a task to pool // Submit submits a task to this pool.
func (p *Pool) Submit(task f) error { func (p *Pool) Submit(task f) error {
if len(p.release) > 0 { if len(p.release) > 0 {
return ErrPoolClosed return ErrPoolClosed
@ -116,22 +116,22 @@ func (p *Pool) Submit(task f) error {
return nil return nil
} }
// Running returns the number of the currently running goroutines // Running returns the number of the currently running goroutines.
func (p *Pool) Running() int { func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the available goroutines to work // Free returns the available goroutines to work.
func (p *Pool) Free() int { func (p *Pool) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
} }
// Cap returns the capacity of this pool // Cap returns the capacity of this pool.
func (p *Pool) Cap() int { func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
// ReSize change the capacity of this pool // ReSize changes the capacity of this pool.
func (p *Pool) ReSize(size int) { func (p *Pool) ReSize(size int) {
if size == p.Cap() { if size == p.Cap() {
return return
@ -145,7 +145,7 @@ func (p *Pool) ReSize(size int) {
} }
} }
// Release Closed this pool // Release Closes this pool.
func (p *Pool) Release() error { func (p *Pool) Release() error {
p.once.Do(func() { p.once.Do(func() {
p.release <- sig{} p.release <- sig{}
@ -163,12 +163,12 @@ func (p *Pool) Release() error {
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// incRunning increases the number of the currently running goroutines // incRunning increases the number of the currently running goroutines.
func (p *Pool) incRunning() { func (p *Pool) incRunning() {
atomic.AddInt32(&p.running, 1) atomic.AddInt32(&p.running, 1)
} }
// decRunning decreases the number of the currently running goroutines // decRunning decreases the number of the currently running goroutines.
func (p *Pool) decRunning() { func (p *Pool) decRunning() {
atomic.AddInt32(&p.running, -1) atomic.AddInt32(&p.running, -1)
} }

View File

@ -48,15 +48,16 @@ type PoolWithFunc struct {
// release is used to notice the pool to closed itself. // release is used to notice the pool to closed itself.
release chan sig release chan sig
// lock for synchronous operation // lock for synchronous operation.
lock sync.Mutex lock sync.Mutex
// pf is the function for processing tasks // pf is the function for processing tasks.
poolFunc pf poolFunc pf
once sync.Once once sync.Once
} }
// clear expired workers periodically.
func (p *PoolWithFunc) periodicallyPurge() { func (p *PoolWithFunc) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration) heartbeat := time.NewTicker(p.expiryDuration)
for range heartbeat.C { for range heartbeat.C {
@ -84,12 +85,12 @@ func (p *PoolWithFunc) periodicallyPurge() {
} }
} }
// NewPoolWithFunc generates a instance of ants pool with a specific function // NewPoolWithFunc generates an instance of ants pool with a specific function.
func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) {
return NewTimingPoolWithFunc(size, DefaultCleanIntervalTime, f) return NewTimingPoolWithFunc(size, DefaultCleanIntervalTime, f)
} }
// NewTimingPoolWithFunc generates a instance of ants pool with a specific function and a custom timed task // NewTimingPoolWithFunc generates an instance of ants pool with a specific function and a custom timed task.
func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
if size <= 0 { if size <= 0 {
return nil, ErrInvalidPoolSize return nil, ErrInvalidPoolSize
@ -109,11 +110,8 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// Serve submit a task to pool // Serve submits a task to pool.
func (p *PoolWithFunc) Serve(args interface{}) error { func (p *PoolWithFunc) Serve(args interface{}) error {
//if atomic.LoadInt32(&p.closed) == 1 {
// return ErrPoolClosed
//}
if len(p.release) > 0 { if len(p.release) > 0 {
return ErrPoolClosed return ErrPoolClosed
} }
@ -121,22 +119,22 @@ func (p *PoolWithFunc) Serve(args interface{}) error {
return nil return nil
} }
// Running returns the number of the currently running goroutines // Running returns the number of the currently running goroutines.
func (p *PoolWithFunc) Running() int { func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the available goroutines to work // Free returns a available goroutines to work.
func (p *PoolWithFunc) Free() int { func (p *PoolWithFunc) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
} }
// Cap returns the capacity of this pool // Cap returns the capacity of this pool.
func (p *PoolWithFunc) Cap() int { func (p *PoolWithFunc) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
// ReSize change the capacity of this pool // ReSize change the capacity of this pool.
func (p *PoolWithFunc) ReSize(size int) { func (p *PoolWithFunc) ReSize(size int) {
if size == p.Cap() { if size == p.Cap() {
return return
@ -150,7 +148,7 @@ func (p *PoolWithFunc) ReSize(size int) {
} }
} }
// Release Closed this pool // Release Closed this pool.
func (p *PoolWithFunc) Release() error { func (p *PoolWithFunc) Release() error {
p.once.Do(func() { p.once.Do(func() {
p.release <- sig{} p.release <- sig{}
@ -168,12 +166,12 @@ func (p *PoolWithFunc) Release() error {
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// incRunning increases the number of the currently running goroutines // incRunning increases the number of the currently running goroutines.
func (p *PoolWithFunc) incRunning() { func (p *PoolWithFunc) incRunning() {
atomic.AddInt32(&p.running, 1) atomic.AddInt32(&p.running, 1)
} }
// decRunning decreases the number of the currently running goroutines // decRunning decreases the number of the currently running goroutines.
func (p *PoolWithFunc) decRunning() { func (p *PoolWithFunc) decRunning() {
atomic.AddInt32(&p.running, -1) atomic.AddInt32(&p.running, -1)
} }