add all comments

This commit is contained in:
Andy Pan 2018-05-20 21:09:45 +08:00
parent 1ee8144272
commit 301221be5d
4 changed files with 49 additions and 5 deletions

14
ants.go
View File

@ -22,32 +22,42 @@ package ants
import "github.com/iris-contrib/errors" import "github.com/iris-contrib/errors"
const ( const (
DEFAULT_POOL_SIZE = 50000 // Default capacity for a default goroutine pool
DEFAULT_POOL_SIZE = 50000
// Interval time to clean up goroutines
DEFAULT_CLEAN_INTERVAL_TIME = 30 DEFAULT_CLEAN_INTERVAL_TIME = 30
) )
// Init a instance pool when importing ants
var defaultPool, _ = NewPool(DEFAULT_POOL_SIZE) var defaultPool, _ = NewPool(DEFAULT_POOL_SIZE)
// Push submit a task to pool
func Push(task f) error { func Push(task f) error {
return defaultPool.Push(task) return defaultPool.Push(task)
} }
// Running returns the number of the currently running goroutines
func Running() int { func Running() int {
return defaultPool.Running() return defaultPool.Running()
} }
// Cap returns the capacity of this default pool
func Cap() int { func Cap() int {
return defaultPool.Cap() return defaultPool.Cap()
} }
// Free returns the available goroutines to work
func Free() int { func Free() int {
return defaultPool.Free() return defaultPool.Free()
} }
// Release Closed the default pool
func Release() { func Release() {
defaultPool.Release()
} }
// Errors for the Ants API
var ( var (
PoolSizeInvalidError = errors.New("invalid size for pool") PoolSizeInvalidError = errors.New("invalid size for pool")
PoolClosedError = errors.New("this pool has been closed") PoolClosedError = errors.New("this pool has been closed")

View File

@ -26,9 +26,7 @@ var n = 1000000
//} //}
func forSleep() { func forSleep() {
time.Sleep(time.Millisecond) time.Sleep(3 * time.Millisecond)
//for i := 0; i < 10000; i++ {
//}
} }
func TestNoPool(t *testing.T) { func TestNoPool(t *testing.T) {

26
pool.go
View File

@ -29,14 +29,30 @@ type sig struct{}
type f func() type f func()
// Pool accept the tasks from client,it will limit the total
// of goroutines to a given number by recycling goroutines.
type Pool struct { type Pool struct {
// Capacity of the pool.
capacity int32 capacity int32
// The number of the currently running goroutines.
running int32 running int32
// Signal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan sig freeSignal chan sig
// A slice that store the available workers.
workers []*Worker workers []*Worker
workerPool sync.Pool workerPool sync.Pool
// It is used to notice the pool to closed itself.
release chan sig release chan sig
lock sync.Mutex lock sync.Mutex
// It is used to confirm whether this pool has been closed.
closed int32 closed int32
} }
@ -56,6 +72,8 @@ func NewPool(size int) (*Pool, error) {
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// scanAndClean is a goroutine who will periodically clean up
// after it is noticed that this pool is closed.
func (p *Pool) scanAndClean() { func (p *Pool) scanAndClean() {
ticker := time.NewTicker(DEFAULT_CLEAN_INTERVAL_TIME * time.Second) ticker := time.NewTicker(DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
go func() { go func() {
@ -72,6 +90,7 @@ func (p *Pool) scanAndClean() {
}() }()
} }
// Push submit a task to pool
func (p *Pool) Push(task f) error { func (p *Pool) Push(task f) error {
if atomic.LoadInt32(&p.closed) == 1 { if atomic.LoadInt32(&p.closed) == 1 {
return PoolClosedError return PoolClosedError
@ -81,18 +100,22 @@ func (p *Pool) Push(task f) error {
return nil return nil
} }
// Running returns the number of the currently running goroutines
func (p *Pool) Running() int { func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// Free returns the available goroutines to work
func (p *Pool) Free() int { func (p *Pool) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
} }
// Cap returns the capacity of this pool
func (p *Pool) Cap() int { func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.capacity))
} }
// Release Closed this pool
func (p *Pool) Release() error { func (p *Pool) Release() error {
p.lock.Lock() p.lock.Lock()
atomic.StoreInt32(&p.closed, 1) atomic.StoreInt32(&p.closed, 1)
@ -101,12 +124,14 @@ func (p *Pool) Release() error {
return nil return nil
} }
// Resize change the capacity of this pool
func (p *Pool) ReSize(size int) { func (p *Pool) ReSize(size int) {
atomic.StoreInt32(&p.capacity, int32(size)) atomic.StoreInt32(&p.capacity, int32(size))
} }
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker { func (p *Pool) getWorker() *Worker {
var w *Worker var w *Worker
waiting := false waiting := false
@ -157,6 +182,7 @@ func (p *Pool) getWorker() *Worker {
return w return w
} }
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) { func (p *Pool) putWorker(worker *Worker) {
p.workerPool.Put(worker) p.workerPool.Put(worker)
p.lock.Lock() p.lock.Lock()

View File

@ -24,11 +24,19 @@ import (
"sync/atomic" "sync/atomic"
) )
// Worker is the actual executor who run the tasks,
// it will start a goroutine that accept tasks and
// perform function calls.
type Worker struct { type Worker struct {
// A pool who owns this worker.
pool *Pool pool *Pool
// The job should be done.
task chan f task chan f
} }
// run will start a goroutine to repeat the process
// that perform the function calls.
func (w *Worker) run() { func (w *Worker) run() {
go func() { go func() {
for f := range w.task { for f := range w.task {
@ -42,10 +50,12 @@ func (w *Worker) run() {
}() }()
} }
// stop this worker.
func (w *Worker) stop() { func (w *Worker) stop() {
w.task <- nil w.task <- nil
} }
// sendTask send a task to this worker.
func (w *Worker) sendTask(task f) { func (w *Worker) sendTask(task f) {
w.task <- task w.task <- task
} }