diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..398baf2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +.idea diff --git a/ants.go b/ants.go index 8187074..5dccc54 100644 --- a/ants.go +++ b/ants.go @@ -51,6 +51,9 @@ var ( // ErrPoolClosed will be returned when submitting task to a closed pool. ErrPoolClosed = errors.New("this pool has been closed") + + // ErrPoolOverload will be returned when the pool is full and no workers available. + ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") //--------------------------------------------------------------------------- // workerChanCap determines whether the channel of a worker should be a buffered channel diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 74e255b..fa9c423 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -23,6 +23,7 @@ package ants_test import ( + "runtime" "sync" "testing" "time" @@ -46,6 +47,22 @@ func demoPoolFunc(args interface{}) { time.Sleep(time.Duration(n) * time.Millisecond) } +func longRunningFunc() { + for { + runtime.Gosched() + } +} + +func longRunningPoolFunc(arg interface{}) { + if ch, ok := arg.(chan struct{}); ok { + <-ch + return + } + for { + runtime.Gosched() + } +} + func BenchmarkGoroutineWithFunc(b *testing.B) { var wg sync.WaitGroup for i := 0; i < b.N; i++ { diff --git a/ants_test.go b/ants_test.go index 91ade65..9394492 100644 --- a/ants_test.go +++ b/ants_test.go @@ -416,6 +416,155 @@ func TestPurgePreMalloc(t *testing.T) { } } +func TestNonblockingSubmit(t *testing.T) { + poolSize := 10 + p, err := ants.NewPool(poolSize) + if err != nil { + t.Fatalf("create TimingPool failed: %s", err.Error()) + } + p.Nonblocking = true + defer p.Release() + for i := 0; i < poolSize-1; i++ { + if err := p.Submit(longRunningFunc); err != nil { + t.Fatalf("nonblocking submit when pool is not full shouldn't return error") + } + } + ch := make(chan struct{}) + f := func() { + <-ch + } + // p is full now. + if err := p.Submit(f); err != nil { + t.Fatalf("nonblocking submit when pool is not full shouldn't return error") + } + if err := p.Submit(demoFunc); err == nil || err != ants.ErrPoolOverload { + t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload") + } + // interrupt f to get an available worker + close(ch) + time.Sleep(1 * time.Second) + if err := p.Submit(demoFunc); err != nil { + t.Fatalf("nonblocking submit when pool is not full shouldn't return error") + } +} + +func TestMaxBlockingSubmit(t *testing.T) { + poolSize := 10 + p, err := ants.NewPool(poolSize) + if err != nil { + t.Fatalf("create TimingPool failed: %s", err.Error()) + } + p.MaxBlockingTasks = 1 + defer p.Release() + for i := 0; i < poolSize-1; i++ { + if err := p.Submit(longRunningFunc); err != nil { + t.Fatalf("submit when pool is not full shouldn't return error") + } + } + ch := make(chan struct{}) + f := func() { + <-ch + } + // p is full now. + if err := p.Submit(f); err != nil { + t.Fatalf("submit when pool is not full shouldn't return error") + } + var wg sync.WaitGroup + wg.Add(1) + errCh := make(chan error, 1) + go func() { + // should be blocked. blocking num == 1 + if err := p.Submit(demoFunc); err != nil { + errCh <- err + } + wg.Done() + }() + time.Sleep(1 * time.Second) + // already reached max blocking limit + if err := p.Submit(demoFunc); err != ants.ErrPoolOverload { + t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload") + } + // interrupt f to make blocking submit successful. + close(ch) + wg.Wait() + select { + case <-errCh: + t.Fatalf("blocking submit when pool is full should not return error") + default: + } +} + +func TestNonblockingSubmitWithFunc(t *testing.T) { + poolSize := 10 + p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc) + if err != nil { + t.Fatalf("create TimingPool failed: %s", err.Error()) + } + p.Nonblocking = true + defer p.Release() + for i := 0; i < poolSize-1; i++ { + if err := p.Invoke(nil); err != nil { + t.Fatalf("nonblocking submit when pool is not full shouldn't return error") + } + } + ch := make(chan struct{}) + // p is full now. + if err := p.Invoke(ch); err != nil { + t.Fatalf("nonblocking submit when pool is not full shouldn't return error") + } + if err := p.Invoke(nil); err == nil || err != ants.ErrPoolOverload { + t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload") + } + // interrupt f to get an available worker + close(ch) + time.Sleep(1 * time.Second) + if err := p.Invoke(nil); err != nil { + t.Fatalf("nonblocking submit when pool is not full shouldn't return error") + } +} + +func TestMaxBlockingSubmitWithFunc(t *testing.T) { + poolSize := 10 + p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc) + if err != nil { + t.Fatalf("create TimingPool failed: %s", err.Error()) + } + p.MaxBlockingTasks = 1 + defer p.Release() + for i := 0; i < poolSize-1; i++ { + if err := p.Invoke(Param); err != nil { + t.Fatalf("submit when pool is not full shouldn't return error") + } + } + ch := make(chan struct{}) + // p is full now. + if err := p.Invoke(ch); err != nil { + t.Fatalf("submit when pool is not full shouldn't return error") + } + var wg sync.WaitGroup + wg.Add(1) + errCh := make(chan error, 1) + go func() { + // should be blocked. blocking num == 1 + if err := p.Invoke(Param); err != nil { + errCh <- err + } + wg.Done() + }() + time.Sleep(1 * time.Second) + // already reached max blocking limit + if err := p.Invoke(Param); err != ants.ErrPoolOverload { + t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload: %v", err) + } + // interrupt one func to make blocking submit successful. + close(ch) + wg.Wait() + select { + case <-errCh: + t.Fatalf("blocking submit when pool is full should not return error") + default: + } +} func TestRestCodeCoverage(t *testing.T) { _, err := ants.NewUltimatePool(-1, -1, false) t.Log(err) diff --git a/pool.go b/pool.go index ef8e977..d2b817d 100644 --- a/pool.go +++ b/pool.go @@ -60,6 +60,19 @@ type Pool struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) + + // Max number of goroutine blocking on pool.Submit. + // 0 (default value) means no such limit. + MaxBlockingTasks int32 + + // goroutine already been blocked on pool.Submit + // protected by pool.lock + blockingNum int32 + + // When Nonblocking is true, Pool.Submit will never be blocked. + // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. + // When Nonblocking is true, MaxBlockingTasks is inoperative. + Nonblocking bool } // Clear expired workers periodically. @@ -151,7 +164,11 @@ func (p *Pool) Submit(task func()) error { if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } - p.retrieveWorker().task <- task + if w := p.retrieveWorker(); w == nil { + return ErrPoolOverload + } else { + w.task <- task + } return nil } @@ -237,8 +254,18 @@ func (p *Pool) retrieveWorker() *Worker { p.lock.Unlock() spawnWorker() } else { + if p.Nonblocking { + p.lock.Unlock() + return nil + } Reentry: + if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks { + p.lock.Unlock() + return nil + } + p.blockingNum++ p.cond.Wait() + p.blockingNum-- if p.Running() == 0 { p.lock.Unlock() spawnWorker() diff --git a/pool_func.go b/pool_func.go index 3f7e202..e8a8f00 100644 --- a/pool_func.go +++ b/pool_func.go @@ -63,6 +63,19 @@ type PoolWithFunc struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) + + // Max number of goroutine blocking on pool.Submit. + // 0 (default value) means no such limit. + MaxBlockingTasks int32 + + // goroutine already been blocked on pool.Submit + // protected by pool.lock + blockingNum int32 + + // When Nonblocking is true, Pool.Submit will never be blocked. + // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. + // When Nonblocking is true, MaxBlockingTasks is inoperative. + Nonblocking bool } // Clear expired workers periodically. @@ -156,7 +169,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error { if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } - p.retrieveWorker().args <- args + if w := p.retrieveWorker(); w == nil { + return ErrPoolOverload + } else { + w.args <- args + } return nil } @@ -242,8 +259,18 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc { p.lock.Unlock() spawnWorker() } else { + if p.Nonblocking { + p.lock.Unlock() + return nil + } Reentry: + if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks { + p.lock.Unlock() + return nil + } + p.blockingNum++ p.cond.Wait() + p.blockingNum-- if p.Running() == 0 { p.lock.Unlock() spawnWorker()