Awake the blocking callers when Tune(size int) is invoked to expand the pool (#210)

Fixes #205
This commit is contained in:
codingfanlt 2022-02-14 21:51:40 +08:00 committed by GitHub
parent 0fa2fd6dc1
commit fbd17036db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 2 deletions

View File

@ -745,3 +745,80 @@ func TestRestCodeCoverage(t *testing.T) {
t.Logf("pre-malloc pool with func, after tuning capacity, capacity:%d, running:%d", ppremWithFunc.Cap(), t.Logf("pre-malloc pool with func, after tuning capacity, capacity:%d, running:%d", ppremWithFunc.Cap(),
ppremWithFunc.Running()) ppremWithFunc.Running())
} }
func TestPoolTuneScaleUp(t *testing.T) {
c := make(chan struct{})
p, _ := NewPool(2)
for i := 0; i < 2; i++ {
_ = p.Submit(func() {
<-c
})
}
if n := p.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
// test pool tune scale up one
p.Tune(3)
_ = p.Submit(func() {
<-c
})
if n := p.Running(); n != 3 {
t.Errorf("expect 3 workers running, but got %d", n)
}
// test pool tune scale up multiple
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = p.Submit(func() {
<-c
})
}()
}
p.Tune(8)
wg.Wait()
if n := p.Running(); n != 8 {
t.Errorf("expect 8 workers running, but got %d", n)
}
for i := 0; i < 8; i++ {
c <- struct{}{}
}
p.Release()
// test PoolWithFunc
pf, _ := NewPoolWithFunc(2, func(i interface{}) {
<-c
})
for i := 0; i < 2; i++ {
_ = pf.Invoke(1)
}
if n := pf.Running(); n != 2 {
t.Errorf("expect 2 workers running, but got %d", n)
}
// test pool tune scale up one
pf.Tune(3)
_ = pf.Invoke(1)
if n := pf.Running(); n != 3 {
t.Errorf("expect 3 workers running, but got %d", n)
}
// test pool tune scale up multiple
var pfwg sync.WaitGroup
for i := 0; i < 5; i++ {
pfwg.Add(1)
go func() {
defer pfwg.Done()
_ = pf.Invoke(1)
}()
}
pf.Tune(8)
pfwg.Wait()
if n := pf.Running(); n != 8 {
t.Errorf("expect 8 workers running, but got %d", n)
}
for i := 0; i < 8; i++ {
c <- struct{}{}
}
close(c)
pf.Release()
}

10
pool.go
View File

@ -180,10 +180,18 @@ func (p *Pool) Cap() int {
// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *Pool) Tune(size int) { func (p *Pool) Tune(size int) {
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return return
} }
atomic.StoreInt32(&p.capacity, int32(size)) atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
}
} }
// IsClosed indicates whether the pool is closed. // IsClosed indicates whether the pool is closed.

View File

@ -197,10 +197,18 @@ func (p *PoolWithFunc) Cap() int {
// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. // Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
func (p *PoolWithFunc) Tune(size int) { func (p *PoolWithFunc) Tune(size int) {
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { capacity := p.Cap()
if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
return return
} }
atomic.StoreInt32(&p.capacity, int32(size)) atomic.StoreInt32(&p.capacity, int32(size))
if size > capacity {
if size-capacity == 1 {
p.cond.Signal()
return
}
p.cond.Broadcast()
}
} }
// IsClosed indicates whether the pool is closed. // IsClosed indicates whether the pool is closed.