From fad443d7d8db7637212d76aeae8b4b429759cab8 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 22 May 2018 11:26:16 +0800 Subject: [PATCH] add a new type of pool, allowing to create a pool with a function --- pool_func.go | 199 +++++++++++++++++++++++++++++++++++++++++++++++++ worker_func.go | 63 ++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 pool_func.go create mode 100644 worker_func.go diff --git a/pool_func.go b/pool_func.go new file mode 100644 index 0000000..83b1cd4 --- /dev/null +++ b/pool_func.go @@ -0,0 +1,199 @@ +// MIT License + +// Copyright (c) 2018 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "math" + "sync" + "sync/atomic" + "time" +) + +type pf func(interface{}) error + +// PoolWithFunc accept the tasks from client,it will limit the total +// of goroutines to a given number by recycling goroutines. +type PoolWithFunc struct { + // capacity of the pool. + capacity int32 + + // running is the number of the currently running goroutines. + running int32 + + // signal is used to notice pool there are available + // workers which can be sent to work. + freeSignal chan sig + + // workers is a slice that store the available workers. + workers []*WorkerWithFunc + + // workerPool is a pool that saves a set of temporary objects. + workerPool sync.Pool + + // release is used to notice the pool to closed itself. + release chan sig + + lock sync.Mutex + + // closed is used to confirm whether this pool has been closed. + closed int32 + + poolFunc pf +} + +// NewPoolWithFunc generates a instance of ants pool with a specific function. +func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { + if size <= 0 { + return nil, ErrPoolSizeInvalid + } + p := &PoolWithFunc{ + capacity: int32(size), + freeSignal: make(chan sig, math.MaxInt32), + release: make(chan sig), + closed: 0, + poolFunc: f, + } + + return p, nil +} + +//------------------------------------------------------------------------- + +// scanAndClean is a goroutine who will periodically clean up +// after it is noticed that this pool is closed. +func (p *PoolWithFunc) scanAndCleanWithFunc() { + ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) + go func() { + ticker.Stop() + for range ticker.C { + if atomic.LoadInt32(&p.closed) == 1 { + p.lock.Lock() + for _, w := range p.workers { + w.stopWithFunc() + } + p.lock.Unlock() + } + } + }() +} + +// Push submit a task to pool +func (p *PoolWithFunc) Serve(args interface{}) error { + if atomic.LoadInt32(&p.closed) == 1 { + return ErrPoolClosed + } + w := p.getWorkerWithFunc() + w.sendTaskWithFunc(args) + return nil +} + +// Running returns the number of the currently running goroutines +func (p *PoolWithFunc) RunningWithFunc() int { + return int(atomic.LoadInt32(&p.running)) +} + +// Free returns the available goroutines to work +func (p *PoolWithFunc) FreeWithFunc() int { + return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) +} + +// Cap returns the capacity of this pool +func (p *PoolWithFunc) CapWithFunc() int { + return int(atomic.LoadInt32(&p.capacity)) +} + +// Release Closed this pool +func (p *PoolWithFunc) ReleaseWithFunc() error { + p.lock.Lock() + atomic.StoreInt32(&p.closed, 1) + close(p.release) + p.lock.Unlock() + return nil +} + +// ReSize change the capacity of this pool +func (p *PoolWithFunc) ReSizeWithFunc(size int) { + atomic.StoreInt32(&p.capacity, int32(size)) +} + +//------------------------------------------------------------------------- + +// getWorker returns a available worker to run the tasks. +func (p *PoolWithFunc) getWorkerWithFunc() *WorkerWithFunc { + var w *WorkerWithFunc + waiting := false + + p.lock.Lock() + workers := p.workers + n := len(workers) - 1 + if n < 0 { + if p.running >= p.capacity { + waiting = true + } + } else { + w = workers[n] + workers[n] = nil + p.workers = workers[:n] + } + p.lock.Unlock() + + if waiting { + <-p.freeSignal + for { + p.lock.Lock() + workers = p.workers + l := len(workers) - 1 + if l < 0 { + p.lock.Unlock() + continue + } + w = workers[l] + workers[l] = nil + p.workers = workers[:l] + p.lock.Unlock() + break + } + } else { + wp := p.workerPool.Get() + if wp == nil { + w = &WorkerWithFunc{ + pool: p, + args: make(chan interface{}), + } + w.runWithFunc() + atomic.AddInt32(&p.running, 1) + } else { + w = wp.(*WorkerWithFunc) + } + } + return w +} + +// putWorker puts a worker back into free pool, recycling the goroutines. +func (p *PoolWithFunc) putWorkerWithFunc(worker *WorkerWithFunc) { + p.workerPool.Put(worker) + p.lock.Lock() + p.workers = append(p.workers, worker) + p.lock.Unlock() + p.freeSignal <- sig{} +} diff --git a/worker_func.go b/worker_func.go new file mode 100644 index 0000000..5deab61 --- /dev/null +++ b/worker_func.go @@ -0,0 +1,63 @@ +// MIT License + +// Copyright (c) 2018 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "sync/atomic" +) + +// Worker is the actual executor who run the tasks, +// it will start a goroutine that accept tasks and +// perform function calls. +type WorkerWithFunc struct { + // pool who owns this worker. + pool *PoolWithFunc + + // args is a job should be done. + args chan interface{} +} + +// run will start a goroutine to repeat the process +// that perform the function calls. +func (w *WorkerWithFunc) runWithFunc() { + go func() { + for args := range w.args { + if args == nil { + atomic.AddInt32(&w.pool.running, -1) + return + } + w.pool.poolFunc(args) + w.pool.putWorkerWithFunc(w) + } + }() +} + +// stop this worker. +func (w *WorkerWithFunc) stopWithFunc() { + w.args <- nil +} + +// sendTask send a task to this worker. +func (w *WorkerWithFunc) sendTaskWithFunc(args interface{}) { + w.args <- args +}