From 70731aff71f7750e34ccc0dcf878fba95614ddd4 Mon Sep 17 00:00:00 2001 From: liyonglion <470542519@qq.com> Date: Fri, 28 Sep 2018 19:28:09 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=AD=BB=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E5=AF=BC=E8=87=B4cpu=E5=8D=A0=E7=94=A8=E7=8E=87?= =?UTF-8?q?=E8=BF=87=E9=AB=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool.go | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/pool.go b/pool.go index 987ba4d..5f6ce63 100644 --- a/pool.go +++ b/pool.go @@ -52,7 +52,7 @@ type Pool struct { // lock for synchronous operation. lock sync.Mutex - + cond *sync.Cond once sync.Once } @@ -105,6 +105,7 @@ func NewTimingPool(size, expiry int) (*Pool, error) { release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, } + p.cond = sync.NewCond(&p.lock) go p.periodicallyPurge() return p, nil } @@ -195,20 +196,11 @@ func (p *Pool) getWorker() *Worker { p.lock.Unlock() if waiting { - for { - p.lock.Lock() - idleWorkers = p.workers - l := len(idleWorkers) - 1 - if l < 0 { - p.lock.Unlock() - continue - } - w = idleWorkers[l] - idleWorkers[l] = nil - p.workers = idleWorkers[:l] - p.lock.Unlock() - break - } + p.lock.Lock() + p.cond.Wait() + l := len(p.workers) - 1 + w = p.workers[l] + p.lock.Unlock() } else if w == nil { w = &Worker{ pool: p, @@ -226,4 +218,6 @@ func (p *Pool) putWorker(worker *Worker) { p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() + //通知有一个空闲的worker + p.cond.Signal() } From 8ecd5078ac8f3eacd8037d2bcd5addf87d45a4ae Mon Sep 17 00:00:00 2001 From: liyonglion <470542519@qq.com> Date: Sat, 29 Sep 2018 10:19:17 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=AD=BB=E9=94=81?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pool.go | 6 ++---- pool_func.go | 24 ++++++++---------------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/pool.go b/pool.go index 5f6ce63..f3d931c 100644 --- a/pool.go +++ b/pool.go @@ -184,6 +184,7 @@ func (p *Pool) getWorker() *Worker { waiting := false p.lock.Lock() + defer p.lock.Unlock() idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 { @@ -193,14 +194,11 @@ func (p *Pool) getWorker() *Worker { idleWorkers[n] = nil p.workers = idleWorkers[:n] } - p.lock.Unlock() if waiting { - p.lock.Lock() p.cond.Wait() l := len(p.workers) - 1 w = p.workers[l] - p.lock.Unlock() } else if w == nil { w = &Worker{ pool: p, @@ -217,7 +215,7 @@ func (p *Pool) putWorker(worker *Worker) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) - p.lock.Unlock() //通知有一个空闲的worker p.cond.Signal() + p.lock.Unlock() } diff --git a/pool_func.go b/pool_func.go index 6051117..d1ed757 100644 --- a/pool_func.go +++ b/pool_func.go @@ -50,7 +50,7 @@ type PoolWithFunc struct { // lock for synchronous operation. lock sync.Mutex - + cond *sync.Cond // pf is the function for processing tasks. poolFunc pf @@ -107,6 +107,7 @@ func NewTimingPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) { expiryDuration: time.Duration(expiry) * time.Second, poolFunc: f, } + p.cond = sync.NewCond(&p.lock) go p.periodicallyPurge() return p, nil } @@ -185,6 +186,7 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { waiting := false p.lock.Lock() + defer p.lock.Unlock() idleWorkers := p.workers n := len(idleWorkers) - 1 if n < 0 { @@ -194,23 +196,11 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { idleWorkers[n] = nil p.workers = idleWorkers[:n] } - p.lock.Unlock() if waiting { - for { - p.lock.Lock() - idleWorkers = p.workers - l := len(idleWorkers) - 1 - if l < 0 { - p.lock.Unlock() - continue - } - w = idleWorkers[l] - idleWorkers[l] = nil - p.workers = idleWorkers[:l] - p.lock.Unlock() - break - } + p.cond.Wait() + l := len(p.workers) - 1 + w = p.workers[l] } else if w == nil { w = &WorkerWithFunc{ pool: p, @@ -227,5 +217,7 @@ func (p *PoolWithFunc) putWorker(worker *WorkerWithFunc) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) + //通知有一个空闲的worker + p.cond.Signal() p.lock.Unlock() } From 912aa7698715121d3e96e3badd934221d8ecf727 Mon Sep 17 00:00:00 2001 From: liyonglion <470542519@qq.com> Date: Sat, 29 Sep 2018 10:20:43 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E9=81=97=E6=BC=8F=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ants_benchmark_test.go | 2 +- ants_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0eaa4a4..b88d8bf 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -27,7 +27,7 @@ import ( "testing" "time" - "github.com/panjf2000/ants" + "github.com/liyonglion/ants" ) const ( diff --git a/ants_test.go b/ants_test.go index aff9053..e1e2008 100644 --- a/ants_test.go +++ b/ants_test.go @@ -28,7 +28,7 @@ import ( "testing" "time" - "github.com/panjf2000/ants" + "github.com/liyonglion/ants" ) var n = 100000 From 1846b4392a3a20e6bf1a7431b67f86bd43e0f0b9 Mon Sep 17 00:00:00 2001 From: liyonglion <470542519@qq.com> Date: Sat, 29 Sep 2018 14:58:58 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E7=AB=9E=E4=BA=89?= =?UTF-8?q?=E9=94=81=E5=AF=BC=E8=87=B4bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ants_benchmark_test.go | 8 ++++---- pool.go | 14 +++++++++++--- pool_func.go | 15 ++++++++++++--- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index b88d8bf..f63d114 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -42,9 +42,9 @@ const ( YiB // 1208925819614629174706176 ) const ( - RunTimes = 10000000 + RunTimes = 1000000 Param = 100 - AntsSize = 1000 + AntsSize = 50000 TestSize = 10000 ) @@ -68,6 +68,7 @@ func demoPoolFunc(args interface{}) error { func BenchmarkGoroutineWithFunc(b *testing.B) { var wg sync.WaitGroup + for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { wg.Add(1) @@ -96,7 +97,7 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { p.Serve(Param) } wg.Wait() - b.Logf("running goroutines: %d", p.Running()) + //b.Logf("running goroutines: %d", p.Running()) } } @@ -111,7 +112,6 @@ func BenchmarkGoroutine(b *testing.B) { func BenchmarkAntsPool(b *testing.B) { p, _ := ants.NewPoolWithFunc(AntsSize, demoPoolFunc) defer p.Release() - b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { diff --git a/pool.go b/pool.go index f3d931c..8fee19f 100644 --- a/pool.go +++ b/pool.go @@ -196,9 +196,17 @@ func (p *Pool) getWorker() *Worker { } if waiting { - p.cond.Wait() - l := len(p.workers) - 1 - w = p.workers[l] + for{ + p.cond.Wait() + l := len(p.workers) - 1 + if l < 0{ + continue + } + w = p.workers[l] + p.workers[l] = nil + p.workers = p.workers[:l] + break + } } else if w == nil { w = &Worker{ pool: p, diff --git a/pool_func.go b/pool_func.go index d1ed757..f0c75e5 100644 --- a/pool_func.go +++ b/pool_func.go @@ -198,9 +198,18 @@ func (p *PoolWithFunc) getWorker() *WorkerWithFunc { } if waiting { - p.cond.Wait() - l := len(p.workers) - 1 - w = p.workers[l] + for{ + p.cond.Wait() + l := len(p.workers) - 1 + if l < 0{ + continue + } + w = p.workers[l] + p.workers[l] = nil + p.workers = p.workers[:l] + break + } + } else if w == nil { w = &WorkerWithFunc{ pool: p,