Merge pull request #13 from liyonglion/master

解决死循环导致cpu占用率过高
This commit is contained in:
Andy Pan 2018-09-29 19:29:53 +08:00 committed by GitHub
commit 9a3b5cd253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 30 deletions

View File

@ -27,7 +27,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/panjf2000/ants" "github.com/liyonglion/ants"
) )
const ( const (
@ -42,9 +42,9 @@ const (
YiB // 1208925819614629174706176 YiB // 1208925819614629174706176
) )
const ( const (
RunTimes = 10000000 RunTimes = 1000000
Param = 100 Param = 100
AntsSize = 1000 AntsSize = 50000
TestSize = 10000 TestSize = 10000
) )
@ -68,6 +68,7 @@ func demoPoolFunc(args interface{}) error {
func BenchmarkGoroutineWithFunc(b *testing.B) { func BenchmarkGoroutineWithFunc(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {
wg.Add(1) wg.Add(1)
@ -96,7 +97,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) {
p.Serve(Param) p.Serve(Param)
} }
wg.Wait() wg.Wait()
b.Logf("running goroutines: %d", p.Running()) //b.Logf("running goroutines: %d", p.Running())
} }
} }
@ -111,7 +112,6 @@ func BenchmarkGoroutine(b *testing.B) {
func BenchmarkAntsPool(b *testing.B) { func BenchmarkAntsPool(b *testing.B) {
p, _ := ants.NewPoolWithFunc(AntsSize, demoPoolFunc) p, _ := ants.NewPoolWithFunc(AntsSize, demoPoolFunc)
defer p.Release() defer p.Release()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ { for j := 0; j < RunTimes; j++ {

View File

@ -28,7 +28,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/panjf2000/ants" "github.com/liyonglion/ants"
) )
var n = 100000 var n = 100000

24
pool.go
View File

@ -52,7 +52,7 @@ type Pool struct {
// lock for synchronous operation. // lock for synchronous operation.
lock sync.Mutex lock sync.Mutex
cond *sync.Cond
once sync.Once once sync.Once
} }
@ -105,6 +105,7 @@ func NewTimingPool(size, expiry int) (*Pool, error) {
release: make(chan sig, 1), release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
} }
p.cond = sync.NewCond(&p.lock)
go p.periodicallyPurge() go p.periodicallyPurge()
return p, nil return p, nil
} }
@ -183,6 +184,7 @@ func (p *Pool) getWorker() *Worker {
waiting := false waiting := false
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock()
idleWorkers := p.workers idleWorkers := p.workers
n := len(idleWorkers) - 1 n := len(idleWorkers) - 1
if n < 0 { if n < 0 {
@ -192,21 +194,17 @@ func (p *Pool) getWorker() *Worker {
idleWorkers[n] = nil idleWorkers[n] = nil
p.workers = idleWorkers[:n] p.workers = idleWorkers[:n]
} }
p.lock.Unlock()
if waiting { if waiting {
for { for{
p.lock.Lock() p.cond.Wait()
idleWorkers = p.workers l := len(p.workers) - 1
l := len(idleWorkers) - 1 if l < 0{
if l < 0 {
p.lock.Unlock()
continue continue
} }
w = idleWorkers[l] w = p.workers[l]
idleWorkers[l] = nil p.workers[l] = nil
p.workers = idleWorkers[:l] p.workers = p.workers[:l]
p.lock.Unlock()
break break
} }
} else if w == nil { } else if w == nil {
@ -225,5 +223,7 @@ func (p *Pool) putWorker(worker *Worker) {
worker.recycleTime = time.Now() worker.recycleTime = time.Now()
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
//通知有一个空闲的worker
p.cond.Signal()
p.lock.Unlock() p.lock.Unlock()
} }

View File

@ -50,7 +50,7 @@ type PoolWithFunc struct {
// lock for synchronous operation. // lock for synchronous operation.
lock sync.Mutex lock sync.Mutex
cond *sync.Cond
// pf is the function for processing tasks. // pf is the function for processing tasks.
poolFunc pf poolFunc pf
@ -107,6 +107,7 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
poolFunc: f, poolFunc: f,
} }
p.cond = sync.NewCond(&p.lock)
go p.periodicallyPurge() go p.periodicallyPurge()
return p, nil return p, nil
} }
@ -185,6 +186,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
waiting := false waiting := false
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock()
idleWorkers := p.workers idleWorkers := p.workers
n := len(idleWorkers) - 1 n := len(idleWorkers) - 1
if n < 0 { if n < 0 {
@ -194,23 +196,20 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc {
idleWorkers[n] = nil idleWorkers[n] = nil
p.workers = idleWorkers[:n] p.workers = idleWorkers[:n]
} }
p.lock.Unlock()
if waiting { if waiting {
for { for{
p.lock.Lock() p.cond.Wait()
idleWorkers = p.workers l := len(p.workers) - 1
l := len(idleWorkers) - 1 if l < 0{
if l < 0 {
p.lock.Unlock()
continue continue
} }
w = idleWorkers[l] w = p.workers[l]
idleWorkers[l] = nil p.workers[l] = nil
p.workers = idleWorkers[:l] p.workers = p.workers[:l]
p.lock.Unlock()
break break
} }
} else if w == nil { } else if w == nil {
w = &WorkerWithFunc{ w = &WorkerWithFunc{
pool: p, pool: p,
@ -227,5 +226,7 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) {
worker.recycleTime = time.Now() worker.recycleTime = time.Now()
p.lock.Lock() p.lock.Lock()
p.workers = append(p.workers, worker) p.workers = append(p.workers, worker)
//通知有一个空闲的worker
p.cond.Signal()
p.lock.Unlock() p.lock.Unlock()
} }