// MIT License // Copyright (c) 2018 Andy Pan // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package ants import ( "context" "sync" "sync/atomic" "time" syncx "github.com/panjf2000/ants/v2/internal/sync" ) // PoolWithFunc accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { poolCommon // poolFunc is the function for processing tasks. poolFunc func(interface{}) } // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. func (p *PoolWithFunc) purgeStaleWorkers() { ticker := time.NewTicker(p.options.ExpiryDuration) defer func() { ticker.Stop() atomic.StoreInt32(&p.purgeDone, 1) }() purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() for { select { case <-purgeCtx.Done(): return case <-ticker.C: } if p.IsClosed() { break } var isDormant bool p.lock.Lock() staleWorkers := p.workers.refresh(p.options.ExpiryDuration) n := p.Running() isDormant = n == 0 || n == len(staleWorkers) p.lock.Unlock() // Clean up the stale workers. for i := range staleWorkers { staleWorkers[i].finish() staleWorkers[i] = nil } // There might be a situation where all workers have been cleaned up (no worker is running), // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers. if isDormant && p.Waiting() > 0 { p.cond.Broadcast() } } } // ticktock is a goroutine that updates the current time in the pool regularly. func (p *PoolWithFunc) ticktock() { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() atomic.StoreInt32(&p.ticktockDone, 1) }() ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() for { select { case <-ticktockCtx.Done(): return case <-ticker.C: } if p.IsClosed() { break } p.now.Store(time.Now()) } } func (p *PoolWithFunc) goPurge() { if p.options.DisablePurge { return } // Start a goroutine to clean up expired workers periodically. p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) go p.purgeStaleWorkers() } func (p *PoolWithFunc) goTicktock() { p.now.Store(time.Now()) p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) go p.ticktock() } func (p *PoolWithFunc) nowTime() time.Time { return p.now.Load().(time.Time) } // NewPoolWithFunc instantiates a PoolWithFunc with customized options. func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { if size <= 0 { size = -1 } if pf == nil { return nil, ErrLackPoolFunc } opts := loadOptions(options...) if !opts.DisablePurge { if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { opts.ExpiryDuration = DefaultCleanIntervalTime } } if opts.Logger == nil { opts.Logger = defaultLogger } p := &PoolWithFunc{ poolCommon: poolCommon{ capacity: int32(size), allDone: make(chan struct{}), lock: syncx.NewSpinLock(), once: &sync.Once{}, options: opts, }, poolFunc: pf, } p.workerCache.New = func() interface{} { return &goWorkerWithFunc{ pool: p, args: make(chan interface{}, workerChanCap), } } if p.options.PreAlloc { if size == -1 { return nil, ErrInvalidPreAllocSize } p.workers = newWorkerQueue(queueTypeLoopQueue, size) } else { p.workers = newWorkerQueue(queueTypeStack, 0) } p.cond = sync.NewCond(p.lock) p.goPurge() p.goTicktock() return p, nil } // Invoke submits a task to pool. // // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), // but what calls for special attention is that you will get blocked with the last // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // you should instantiate a PoolWithFunc with ants.WithNonblocking(true). func (p *PoolWithFunc) Invoke(args interface{}) error { if p.IsClosed() { return ErrPoolClosed } w, err := p.retrieveWorker() if w != nil { w.inputParam(args) } return err } // Running returns the number of workers currently running. func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } // Free returns the number of available workers, -1 indicates this pool is unlimited. func (p *PoolWithFunc) Free() int { c := p.Cap() if c < 0 { return -1 } return c - p.Running() } // Waiting returns the number of tasks waiting to be executed. func (p *PoolWithFunc) Waiting() int { return int(atomic.LoadInt32(&p.waiting)) } // Cap returns the capacity of this pool. func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) } // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. func (p *PoolWithFunc) Tune(size int) { capacity := p.Cap() if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) if size > capacity { if size-capacity == 1 { p.cond.Signal() return } p.cond.Broadcast() } } // IsClosed indicates whether the pool is closed. func (p *PoolWithFunc) IsClosed() bool { return atomic.LoadInt32(&p.state) == CLOSED } // Release closes this pool and releases the worker queue. func (p *PoolWithFunc) Release() { if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { return } if p.stopPurge != nil { p.stopPurge() p.stopPurge = nil } if p.stopTicktock != nil { p.stopTicktock() p.stopTicktock = nil } p.lock.Lock() p.workers.reset() p.lock.Unlock() // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent // those callers blocking infinitely. p.cond.Broadcast() } // ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { return ErrPoolClosed } p.Release() var purgeCh <-chan struct{} if !p.options.DisablePurge { purgeCh = p.purgeCtx.Done() } else { purgeCh = p.allDone } if p.Running() == 0 { p.once.Do(func() { close(p.allDone) }) } timer := time.NewTimer(timeout) defer timer.Stop() for { select { case <-timer.C: return ErrTimeout case <-p.allDone: <-purgeCh <-p.ticktockCtx.Done() if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && atomic.LoadInt32(&p.ticktockDone) == 1 { return nil } } } } // Reboot reboots a closed pool, it does nothing if the pool is not closed. // If you intend to reboot a closed pool, use ReleaseTimeout() instead of // Release() to ensure that all workers are stopped and resource are released // before rebooting, otherwise you may run into data race. func (p *PoolWithFunc) Reboot() { if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { atomic.StoreInt32(&p.purgeDone, 0) p.goPurge() atomic.StoreInt32(&p.ticktockDone, 0) p.goTicktock() p.allDone = make(chan struct{}) p.once = &sync.Once{} } } func (p *PoolWithFunc) addRunning(delta int) int { return int(atomic.AddInt32(&p.running, int32(delta))) } func (p *PoolWithFunc) addWaiting(delta int) { atomic.AddInt32(&p.waiting, int32(delta)) } // retrieveWorker returns an available worker to run the tasks. func (p *PoolWithFunc) retrieveWorker() (w worker, err error) { p.lock.Lock() retry: // First try to fetch the worker from the queue. if w = p.workers.detach(); w != nil { p.lock.Unlock() return } // If the worker queue is empty, and we don't run out of the pool capacity, // then just spawn a new worker goroutine. if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() w = p.workerCache.Get().(*goWorkerWithFunc) w.run() return } // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() return nil, ErrPoolOverload } // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. p.addWaiting(1) p.cond.Wait() // block and wait for an available worker p.addWaiting(-1) if p.IsClosed() { p.lock.Unlock() return nil, ErrPoolClosed } goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { p.cond.Broadcast() return false } worker.lastUsed = p.nowTime() p.lock.Lock() // To avoid memory leaks, add a double check in the lock scope. // Issue: https://github.com/panjf2000/ants/issues/113 if p.IsClosed() { p.lock.Unlock() return false } if err := p.workers.insert(worker); err != nil { p.lock.Unlock() return false } // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() p.lock.Unlock() return true }