From fbd17036dbf5ae677ba9e41326745a65e655232f Mon Sep 17 00:00:00 2001 From: codingfanlt Date: Mon, 14 Feb 2022 21:51:40 +0800 Subject: [PATCH] Awake the blocking callers when Tune(size int) is invoked to expand the pool (#210) Fixes #205 --- ants_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++ pool.go | 10 ++++++- pool_func.go | 10 ++++++- 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/ants_test.go b/ants_test.go index 622e482..7e34578 100644 --- a/ants_test.go +++ b/ants_test.go @@ -745,3 +745,80 @@ func TestRestCodeCoverage(t *testing.T) { t.Logf("pre-malloc pool with func, after tuning capacity, capacity:%d, running:%d", ppremWithFunc.Cap(), ppremWithFunc.Running()) } + +func TestPoolTuneScaleUp(t *testing.T) { + c := make(chan struct{}) + p, _ := NewPool(2) + for i := 0; i < 2; i++ { + _ = p.Submit(func() { + <-c + }) + } + if n := p.Running(); n != 2 { + t.Errorf("expect 2 workers running, but got %d", n) + } + // test pool tune scale up one + p.Tune(3) + _ = p.Submit(func() { + <-c + }) + if n := p.Running(); n != 3 { + t.Errorf("expect 3 workers running, but got %d", n) + } + // test pool tune scale up multiple + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = p.Submit(func() { + <-c + }) + }() + } + p.Tune(8) + wg.Wait() + if n := p.Running(); n != 8 { + t.Errorf("expect 8 workers running, but got %d", n) + } + for i := 0; i < 8; i++ { + c <- struct{}{} + } + p.Release() + + // test PoolWithFunc + pf, _ := NewPoolWithFunc(2, func(i interface{}) { + <-c + }) + for i := 0; i < 2; i++ { + _ = pf.Invoke(1) + } + if n := pf.Running(); n != 2 { + t.Errorf("expect 2 workers running, but got %d", n) + } + // test pool tune scale up one + pf.Tune(3) + _ = pf.Invoke(1) + if n := pf.Running(); n != 3 { + t.Errorf("expect 3 workers running, but got %d", n) + } + // test pool tune scale up multiple + var pfwg sync.WaitGroup + for i := 0; i < 5; i++ { + pfwg.Add(1) + go func() { + defer pfwg.Done() + _ = pf.Invoke(1) + }() + } + pf.Tune(8) + pfwg.Wait() + if n := pf.Running(); n != 8 { + t.Errorf("expect 8 workers running, but got %d", n) + } + for i := 0; i < 8; i++ { + c <- struct{}{} + } + close(c) + pf.Release() +} diff --git a/pool.go b/pool.go index 4676fd5..d3056a3 100644 --- a/pool.go +++ b/pool.go @@ -180,10 +180,18 @@ func (p *Pool) Cap() int { // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. func (p *Pool) Tune(size int) { - if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { + capacity := p.Cap() + if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) + if size > capacity { + if size-capacity == 1 { + p.cond.Signal() + return + } + p.cond.Broadcast() + } } // IsClosed indicates whether the pool is closed. diff --git a/pool_func.go b/pool_func.go index 18cd320..1a03b16 100644 --- a/pool_func.go +++ b/pool_func.go @@ -197,10 +197,18 @@ func (p *PoolWithFunc) Cap() int { // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. func (p *PoolWithFunc) Tune(size int) { - if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { + capacity := p.Cap() + if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { return } atomic.StoreInt32(&p.capacity, int32(size)) + if size > capacity { + if size-capacity == 1 { + p.cond.Signal() + return + } + p.cond.Broadcast() + } } // IsClosed indicates whether the pool is closed.