diff --git a/ants.go b/ants.go index 8096d06..81ccfa0 100644 --- a/ants.go +++ b/ants.go @@ -22,32 +22,42 @@ package ants import "github.com/iris-contrib/errors" const ( - DEFAULT_POOL_SIZE = 50000 + // Default capacity for a default goroutine pool + DEFAULT_POOL_SIZE = 50000 + + // Interval time to clean up goroutines DEFAULT_CLEAN_INTERVAL_TIME = 30 ) +// Init a instance pool when importing ants var defaultPool, _ = NewPool(DEFAULT_POOL_SIZE) +// Push submit a task to pool func Push(task f) error { return defaultPool.Push(task) } +// Running returns the number of the currently running goroutines func Running() int { return defaultPool.Running() } +// Cap returns the capacity of this default pool func Cap() int { return defaultPool.Cap() } +// Free returns the available goroutines to work func Free() int { return defaultPool.Free() } +// Release Closed the default pool func Release() { - + defaultPool.Release() } +// Errors for the Ants API var ( PoolSizeInvalidError = errors.New("invalid size for pool") PoolClosedError = errors.New("this pool has been closed") diff --git a/ants_test.go b/ants_test.go index 5a1b696..3ee3da4 100644 --- a/ants_test.go +++ b/ants_test.go @@ -26,9 +26,7 @@ var n = 1000000 //} func forSleep() { - time.Sleep(time.Millisecond) - //for i := 0; i < 10000; i++ { - //} + time.Sleep(3 * time.Millisecond) } func TestNoPool(t *testing.T) { diff --git a/pool.go b/pool.go index eedbd23..473e7ca 100644 --- a/pool.go +++ b/pool.go @@ -29,14 +29,30 @@ type sig struct{} type f func() +// Pool accept the tasks from client,it will limit the total +// of goroutines to a given number by recycling goroutines. type Pool struct { + // Capacity of the pool. capacity int32 + + // The number of the currently running goroutines. running int32 + + // Signal is used to notice pool there are available + // workers which can be sent to work. freeSignal chan sig + + // A slice that store the available workers. workers []*Worker + workerPool sync.Pool + + // It is used to notice the pool to closed itself. release chan sig + lock sync.Mutex + + // It is used to confirm whether this pool has been closed. closed int32 } @@ -56,6 +72,8 @@ func NewPool(size int) (*Pool, error) { //------------------------------------------------------------------------- +// scanAndClean is a goroutine who will periodically clean up +// after it is noticed that this pool is closed. func (p *Pool) scanAndClean() { ticker := time.NewTicker(DEFAULT_CLEAN_INTERVAL_TIME * time.Second) go func() { @@ -72,6 +90,7 @@ func (p *Pool) scanAndClean() { }() } +// Push submit a task to pool func (p *Pool) Push(task f) error { if atomic.LoadInt32(&p.closed) == 1 { return PoolClosedError @@ -81,18 +100,22 @@ func (p *Pool) Push(task f) error { return nil } +// Running returns the number of the currently running goroutines func (p *Pool) Running() int { return int(atomic.LoadInt32(&p.running)) } +// Free returns the available goroutines to work func (p *Pool) Free() int { return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) } +// Cap returns the capacity of this pool func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } +// Release Closed this pool func (p *Pool) Release() error { p.lock.Lock() atomic.StoreInt32(&p.closed, 1) @@ -101,12 +124,14 @@ func (p *Pool) Release() error { return nil } +// Resize change the capacity of this pool func (p *Pool) ReSize(size int) { atomic.StoreInt32(&p.capacity, int32(size)) } //------------------------------------------------------------------------- +// getWorker returns a available worker to run the tasks. func (p *Pool) getWorker() *Worker { var w *Worker waiting := false @@ -157,6 +182,7 @@ func (p *Pool) getWorker() *Worker { return w } +// putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { p.workerPool.Put(worker) p.lock.Lock() diff --git a/worker.go b/worker.go index 5acc864..2d9478a 100644 --- a/worker.go +++ b/worker.go @@ -24,11 +24,19 @@ import ( "sync/atomic" ) +// Worker is the actual executor who run the tasks, +// it will start a goroutine that accept tasks and +// perform function calls. type Worker struct { + // A pool who owns this worker. pool *Pool + + // The job should be done. task chan f } +// run will start a goroutine to repeat the process +// that perform the function calls. func (w *Worker) run() { go func() { for f := range w.task { @@ -42,10 +50,12 @@ func (w *Worker) run() { }() } +// stop this worker. func (w *Worker) stop() { w.task <- nil } +// sendTask send a task to this worker. func (w *Worker) sendTask(task f) { w.task <- task }