mirror of https://github.com/panjf2000/ants.git
parent
dbcb6a104f
commit
a2ad870d2d
32
ants_test.go
32
ants_test.go
|
@ -548,12 +548,42 @@ func TestInfinitePool(t *testing.T) {
|
|||
if n := p.Running(); n != 2 {
|
||||
t.Errorf("expect 2 workers running, but got %d", n)
|
||||
}
|
||||
if n := p.Free(); n != -1 {
|
||||
t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n)
|
||||
}
|
||||
p.Tune(10)
|
||||
if capacity := p.Cap(); capacity != -1 {
|
||||
t.Fatalf("expect capacity: -1 but got %d", capacity)
|
||||
}
|
||||
var err error
|
||||
p, err = NewPool(-1, WithPreAlloc(true))
|
||||
_, err = NewPool(-1, WithPreAlloc(true))
|
||||
if err != ErrInvalidPreAllocSize {
|
||||
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInfinitePoolWithFunc(t *testing.T) {
|
||||
c := make(chan struct{})
|
||||
p, _ := NewPoolWithFunc(-1, func(i interface{}) {
|
||||
demoPoolFunc(i)
|
||||
<-c
|
||||
})
|
||||
_ = p.Invoke(10)
|
||||
_ = p.Invoke(10)
|
||||
c <- struct{}{}
|
||||
c <- struct{}{}
|
||||
if n := p.Running(); n != 2 {
|
||||
t.Errorf("expect 2 workers running, but got %d", n)
|
||||
}
|
||||
if n := p.Free(); n != -1 {
|
||||
t.Errorf("expect -1 of free workers by unlimited pool, but got %d", n)
|
||||
}
|
||||
p.Tune(10)
|
||||
if capacity := p.Cap(); capacity != -1 {
|
||||
t.Fatalf("expect capacity: -1 but got %d", capacity)
|
||||
}
|
||||
var err error
|
||||
_, err = NewPoolWithFunc(-1, demoPoolFunc, WithPreAlloc(true))
|
||||
if err != ErrInvalidPreAllocSize {
|
||||
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
|
||||
}
|
||||
|
|
13
pool.go
13
pool.go
|
@ -159,9 +159,13 @@ func (p *Pool) Running() int {
|
|||
return int(atomic.LoadInt32(&p.running))
|
||||
}
|
||||
|
||||
// Free returns the available goroutines to work.
|
||||
// Free returns the available goroutines to work, -1 indicates this pool is unlimited.
|
||||
func (p *Pool) Free() int {
|
||||
return p.Cap() - p.Running()
|
||||
c := p.Cap()
|
||||
if c < 0 {
|
||||
return -1
|
||||
}
|
||||
return c - p.Running()
|
||||
}
|
||||
|
||||
// Cap returns the capacity of this pool.
|
||||
|
@ -224,10 +228,7 @@ func (p *Pool) retrieveWorker() (w *goWorker) {
|
|||
w = p.workers.detach()
|
||||
if w != nil {
|
||||
p.lock.Unlock()
|
||||
} else if capacity := p.Cap(); capacity == -1 {
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
} else if p.Running() < capacity {
|
||||
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
} else {
|
||||
|
|
22
pool_func.go
22
pool_func.go
|
@ -111,7 +111,7 @@ func (p *PoolWithFunc) purgePeriodically() {
|
|||
// NewPoolWithFunc generates an instance of ants pool with a specific function.
|
||||
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
|
||||
if size <= 0 {
|
||||
return nil, ErrInvalidPoolSize
|
||||
size = -1
|
||||
}
|
||||
|
||||
if pf == nil {
|
||||
|
@ -143,6 +143,9 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
|
|||
}
|
||||
}
|
||||
if p.options.PreAlloc {
|
||||
if size == -1 {
|
||||
return nil, ErrInvalidPreAllocSize
|
||||
}
|
||||
p.workers = make([]*goWorkerWithFunc, 0, size)
|
||||
}
|
||||
p.cond = sync.NewCond(p.lock)
|
||||
|
@ -173,9 +176,13 @@ func (p *PoolWithFunc) Running() int {
|
|||
return int(atomic.LoadInt32(&p.running))
|
||||
}
|
||||
|
||||
// Free returns a available goroutines to work.
|
||||
// Free returns a available goroutines to work, -1 indicates this pool is unlimited.
|
||||
func (p *PoolWithFunc) Free() int {
|
||||
return p.Cap() - p.Running()
|
||||
c := p.Cap()
|
||||
if c < 0 {
|
||||
return -1
|
||||
}
|
||||
return c - p.Running()
|
||||
}
|
||||
|
||||
// Cap returns the capacity of this pool.
|
||||
|
@ -185,7 +192,7 @@ func (p *PoolWithFunc) Cap() int {
|
|||
|
||||
// Tune changes the capacity of this pool.
|
||||
func (p *PoolWithFunc) Tune(size int) {
|
||||
if size <= 0 || size == p.Cap() || p.options.PreAlloc {
|
||||
if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
|
||||
return
|
||||
}
|
||||
atomic.StoreInt32(&p.capacity, int32(size))
|
||||
|
@ -245,7 +252,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
|
|||
idleWorkers[n] = nil
|
||||
p.workers = idleWorkers[:n]
|
||||
p.lock.Unlock()
|
||||
} else if p.Running() < p.Cap() {
|
||||
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
} else {
|
||||
|
@ -261,7 +268,8 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
|
|||
p.blockingNum++
|
||||
p.cond.Wait()
|
||||
p.blockingNum--
|
||||
if p.Running() == 0 {
|
||||
var nw int
|
||||
if nw = p.Running(); nw == 0 {
|
||||
p.lock.Unlock()
|
||||
if !p.IsClosed() {
|
||||
spawnWorker()
|
||||
|
@ -270,7 +278,7 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) {
|
|||
}
|
||||
l := len(p.workers) - 1
|
||||
if l < 0 {
|
||||
if p.Running() < p.Cap() {
|
||||
if nw < capacity {
|
||||
p.lock.Unlock()
|
||||
spawnWorker()
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue