support nonblocking submit and max blocking limit setting

Signed-off-by: Cholerae Hu <choleraehyq@gmail.com>
This commit is contained in:
Cholerae Hu 2019-08-20 11:22:00 +08:00 committed by Andy Pan
parent 2cbd2e3c2f
commit 444711e79f
6 changed files with 242 additions and 2 deletions

17
.gitignore vendored Normal file
View File

@ -0,0 +1,17 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
.idea

View File

@ -51,6 +51,9 @@ var (
// ErrPoolClosed will be returned when submitting task to a closed pool. // ErrPoolClosed will be returned when submitting task to a closed pool.
ErrPoolClosed = errors.New("this pool has been closed") ErrPoolClosed = errors.New("this pool has been closed")
// ErrPoolOverload will be returned when the pool is full and no workers available.
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
// workerChanCap determines whether the channel of a worker should be a buffered channel // workerChanCap determines whether the channel of a worker should be a buffered channel

View File

@ -23,6 +23,7 @@
package ants_test package ants_test
import ( import (
"runtime"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -46,6 +47,22 @@ func demoPoolFunc(args interface{}) {
time.Sleep(time.Duration(n) * time.Millisecond) time.Sleep(time.Duration(n) * time.Millisecond)
} }
func longRunningFunc() {
for {
runtime.Gosched()
}
}
func longRunningPoolFunc(arg interface{}) {
if ch, ok := arg.(chan struct{}); ok {
<-ch
return
}
for {
runtime.Gosched()
}
}
func BenchmarkGoroutineWithFunc(b *testing.B) { func BenchmarkGoroutineWithFunc(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {

View File

@ -416,6 +416,155 @@ func TestPurgePreMalloc(t *testing.T) {
} }
} }
func TestNonblockingSubmit(t *testing.T) {
poolSize := 10
p, err := ants.NewPool(poolSize)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.Nonblocking = true
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Submit(longRunningFunc); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
f := func() {
<-ch
}
// p is full now.
if err := p.Submit(f); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
if err := p.Submit(demoFunc); err == nil || err != ants.ErrPoolOverload {
t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload")
}
// interrupt f to get an available worker
close(ch)
time.Sleep(1 * time.Second)
if err := p.Submit(demoFunc); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}
func TestMaxBlockingSubmit(t *testing.T) {
poolSize := 10
p, err := ants.NewPool(poolSize)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.MaxBlockingTasks = 1
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Submit(longRunningFunc); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
f := func() {
<-ch
}
// p is full now.
if err := p.Submit(f); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Submit(demoFunc); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
if err := p.Submit(demoFunc); err != ants.ErrPoolOverload {
t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload")
}
// interrupt f to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
t.Fatalf("blocking submit when pool is full should not return error")
default:
}
}
func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.Nonblocking = true
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Invoke(nil); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
// p is full now.
if err := p.Invoke(ch); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
if err := p.Invoke(nil); err == nil || err != ants.ErrPoolOverload {
t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload")
}
// interrupt f to get an available worker
close(ch)
time.Sleep(1 * time.Second)
if err := p.Invoke(nil); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}
func TestMaxBlockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.MaxBlockingTasks = 1
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Invoke(Param); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
// p is full now.
if err := p.Invoke(ch); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Invoke(Param); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
if err := p.Invoke(Param); err != ants.ErrPoolOverload {
t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload: %v", err)
}
// interrupt one func to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
t.Fatalf("blocking submit when pool is full should not return error")
default:
}
}
func TestRestCodeCoverage(t *testing.T) { func TestRestCodeCoverage(t *testing.T) {
_, err := ants.NewUltimatePool(-1, -1, false) _, err := ants.NewUltimatePool(-1, -1, false)
t.Log(err) t.Log(err)

29
pool.go
View File

@ -60,6 +60,19 @@ type Pool struct {
// PanicHandler is used to handle panics from each worker goroutine. // PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) PanicHandler func(interface{})
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int32
// goroutine already been blocked on pool.Submit
// protected by pool.lock
blockingNum int32
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
} }
// Clear expired workers periodically. // Clear expired workers periodically.
@ -151,7 +164,11 @@ func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed return ErrPoolClosed
} }
p.retrieveWorker().task <- task if w := p.retrieveWorker(); w == nil {
return ErrPoolOverload
} else {
w.task <- task
}
return nil return nil
} }
@ -237,8 +254,18 @@ func (p *Pool) retrieveWorker() *Worker {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
} else { } else {
if p.Nonblocking {
p.lock.Unlock()
return nil
}
Reentry: Reentry:
if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks {
p.lock.Unlock()
return nil
}
p.blockingNum++
p.cond.Wait() p.cond.Wait()
p.blockingNum--
if p.Running() == 0 { if p.Running() == 0 {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()

View File

@ -63,6 +63,19 @@ type PoolWithFunc struct {
// PanicHandler is used to handle panics from each worker goroutine. // PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines. // if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{}) PanicHandler func(interface{})
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int32
// goroutine already been blocked on pool.Submit
// protected by pool.lock
blockingNum int32
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
} }
// Clear expired workers periodically. // Clear expired workers periodically.
@ -156,7 +169,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED { if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed return ErrPoolClosed
} }
p.retrieveWorker().args <- args if w := p.retrieveWorker(); w == nil {
return ErrPoolOverload
} else {
w.args <- args
}
return nil return nil
} }
@ -242,8 +259,18 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()
} else { } else {
if p.Nonblocking {
p.lock.Unlock()
return nil
}
Reentry: Reentry:
if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks {
p.lock.Unlock()
return nil
}
p.blockingNum++
p.cond.Wait() p.cond.Wait()
p.blockingNum--
if p.Running() == 0 { if p.Running() == 0 {
p.lock.Unlock() p.lock.Unlock()
spawnWorker() spawnWorker()