mirror of https://github.com/panjf2000/ants.git
bug: return the error from Pool.Submit/PoolWithFunc.Invoke accordingly (#297)
This commit is contained in:
parent
45bc4f51ba
commit
2ce8d85f28
13
pool.go
13
pool.go
|
@ -217,11 +217,12 @@ func (p *Pool) Submit(task func()) error {
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
if w := p.retrieveWorker(); w != nil {
|
|
||||||
|
w, err := p.retrieveWorker()
|
||||||
|
if w != nil {
|
||||||
w.inputFunc(task)
|
w.inputFunc(task)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return ErrPoolOverload
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Running returns the number of workers currently running.
|
// Running returns the number of workers currently running.
|
||||||
|
@ -328,7 +329,7 @@ func (p *Pool) addWaiting(delta int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieveWorker returns an available worker to run the tasks.
|
// retrieveWorker returns an available worker to run the tasks.
|
||||||
func (p *Pool) retrieveWorker() (w worker) {
|
func (p *Pool) retrieveWorker() (w worker, err error) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
|
|
||||||
retry:
|
retry:
|
||||||
|
@ -350,7 +351,7 @@ retry:
|
||||||
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
|
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
|
||||||
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
|
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return
|
return nil, ErrPoolOverload
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
||||||
|
@ -360,7 +361,7 @@ retry:
|
||||||
|
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return
|
return nil, ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
goto retry
|
goto retry
|
||||||
|
|
13
pool_func.go
13
pool_func.go
|
@ -223,11 +223,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
return ErrPoolClosed
|
return ErrPoolClosed
|
||||||
}
|
}
|
||||||
if w := p.retrieveWorker(); w != nil {
|
|
||||||
|
w, err := p.retrieveWorker()
|
||||||
|
if w != nil {
|
||||||
w.inputParam(args)
|
w.inputParam(args)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return ErrPoolOverload
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Running returns the number of workers currently running.
|
// Running returns the number of workers currently running.
|
||||||
|
@ -334,7 +335,7 @@ func (p *PoolWithFunc) addWaiting(delta int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieveWorker returns an available worker to run the tasks.
|
// retrieveWorker returns an available worker to run the tasks.
|
||||||
func (p *PoolWithFunc) retrieveWorker() (w worker) {
|
func (p *PoolWithFunc) retrieveWorker() (w worker, err error) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
|
|
||||||
retry:
|
retry:
|
||||||
|
@ -356,7 +357,7 @@ retry:
|
||||||
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
|
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
|
||||||
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
|
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return
|
return nil, ErrPoolOverload
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
|
||||||
|
@ -366,7 +367,7 @@ retry:
|
||||||
|
|
||||||
if p.IsClosed() {
|
if p.IsClosed() {
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
return
|
return nil, ErrPoolClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
goto retry
|
goto retry
|
||||||
|
|
Loading…
Reference in New Issue