Merge pull request #4 from barryz/pre_allocate

optimization for pool
This commit is contained in:
Andy Pan 2018-07-26 10:32:40 +08:00 committed by GitHub
commit 3ddd58c390
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 15 deletions

View File

@ -26,9 +26,9 @@ import (
"runtime" "runtime"
"sync" "sync"
"testing" "testing"
"time"
"github.com/panjf2000/ants" "github.com/panjf2000/ants"
"time"
) )
var n = 100000 var n = 100000

17
pool.go
View File

@ -129,6 +129,16 @@ func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// IncrRunning increases the number of the currently running goroutines
func (p *Pool) IncrRunning() {
atomic.AddInt32(&p.running, 1)
}
// DecrRunning decreases the number of the currently running goroutines
func (p *Pool) DecrRunning() {
atomic.AddInt32(&p.running, -1)
}
// Free returns the available goroutines to work // Free returns the available goroutines to work
func (p *Pool) Free() int { func (p *Pool) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
@ -181,11 +191,7 @@ func (p *Pool) getWorker() *Worker {
idleWorkers := p.workers idleWorkers := p.workers
n := len(idleWorkers) - 1 n := len(idleWorkers) - 1
if n < 0 { if n < 0 {
if p.Running() >= p.Cap() { waiting = p.Running() >= p.Cap()
waiting = true
} else {
atomic.AddInt32(&p.running, 1)
}
} else { } else {
<-p.freeSignal <-p.freeSignal
w = idleWorkers[n] w = idleWorkers[n]
@ -209,6 +215,7 @@ func (p *Pool) getWorker() *Worker {
task: make(chan f, 1), task: make(chan f, 1),
} }
w.run() w.run()
p.IncrRunning()
} }
return w return w
} }

View File

@ -134,6 +134,16 @@ func (p *PoolWithFunc) Running() int {
return int(atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.running))
} }
// IncrRunning increases the number of the currently running goroutines
func (p *PoolWithFunc) IncrRunning() {
atomic.AddInt32(&p.running, 1)
}
// DecrRunning decreases the number of the currently running goroutines
func (p *PoolWithFunc) DecrRunning() {
atomic.AddInt32(&p.running, -1)
}
// Free returns the available goroutines to work // Free returns the available goroutines to work
func (p *PoolWithFunc) Free() int { func (p *PoolWithFunc) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
@ -186,11 +196,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
idleWorkers := p.workers idleWorkers := p.workers
n := len(idleWorkers) - 1 n := len(idleWorkers) - 1
if n < 0 { if n < 0 {
if p.Running() >= p.Cap() { waiting = p.Running() >= p.Cap()
waiting = true
} else {
atomic.AddInt32(&p.running, 1)
}
} else { } else {
<-p.freeSignal <-p.freeSignal
w = idleWorkers[n] w = idleWorkers[n]
@ -214,6 +220,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
args: make(chan interface{}, 1), args: make(chan interface{}, 1),
} }
w.run() w.run()
p.IncrRunning()
} }
return w return w
} }

View File

@ -23,7 +23,6 @@
package ants package ants
import ( import (
"sync/atomic"
"time" "time"
) )
@ -47,7 +46,7 @@ func (w *Worker) run() {
go func() { go func() {
for f := range w.task { for f := range w.task {
if f == nil { if f == nil {
atomic.AddInt32(&w.pool.running, -1) w.pool.DecrRunning()
return return
} }
f() f()

View File

@ -23,7 +23,6 @@
package ants package ants
import ( import (
"sync/atomic"
"time" "time"
) )
@ -47,7 +46,7 @@ func (w *WorkerWithFunc) run() {
go func() { go func() {
for args := range w.args { for args := range w.args {
if args == nil { if args == nil {
atomic.AddInt32(&w.pool.running, -1) w.pool.DecrRunning()
return return
} }
w.pool.poolFunc(args) w.pool.poolFunc(args)