Refactor about the synchronous lock

This commit is contained in:
Andy Pan 2019-01-27 04:05:58 +08:00
parent dcf13e8f75
commit 689d74c63b
3 changed files with 39 additions and 45 deletions

View File

@ -75,28 +75,25 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
}
// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
var wg sync.WaitGroup
p, _ := ants.NewPool(AntsSize)
defer p.Release()
for i := 0; i < n; i++ {
if i == n/2 {
time.Sleep(ants.DefaultCleanIntervalTime * time.Second)
}
wg.Add(1)
p.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
curMem = mem.TotalAlloc/MiB - curMem
t.Logf("memory usage:%d MB", curMem)
}
//func TestAntsPoolGetWorkerFromCache(t *testing.T) {
// var wg sync.WaitGroup
// p, _ := ants.NewPool(AntsSize)
// defer p.Release()
//
// for i := 0; i < n; i++ {
// wg.Add(1)
// p.Submit(func() {
// demoPoolFunc(Param)
// wg.Done()
// })
// }
// wg.Wait()
// t.Logf("pool, running workers number:%d", p.Running())
// mem := runtime.MemStats{}
// runtime.ReadMemStats(&mem)
// curMem = mem.TotalAlloc/MiB - curMem
// t.Logf("memory usage:%d MB", curMem)
//}
//-------------------------------------------------------------------------------------------
// Contrast between goroutines without a pool and goroutines with ants pool.
@ -245,17 +242,19 @@ func TestRestCodeCoverage(t *testing.T) {
_, err = ants.NewTimingPoolWithFunc(1, -1, demoPoolFunc)
t.Log(err)
p0, _ := ants.NewPool(AntsSize)
p0, _ := ants.NewPool(TestSize)
defer p0.Submit(demoFunc)
defer p0.Release()
for i := 0; i < n; i++ {
p0.Submit(demoFunc)
p0.Submit(func() {
demoPoolFunc(Param)
})
}
t.Logf("pool, capacity:%d", p0.Cap())
t.Logf("pool, running workers number:%d", p0.Running())
t.Logf("pool, free workers number:%d", p0.Free())
p0.Tune(TestSize)
p0.Tune(AntsSize)
p0.Tune(AntsSize / 2)
t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc)
@ -269,6 +268,6 @@ func TestRestCodeCoverage(t *testing.T) {
t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free())
p.Tune(TestSize)
p.Tune(TestSize / 2)
p.Tune(AntsSize)
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
}

16
pool.go
View File

@ -192,22 +192,15 @@ func (p *Pool) retrieveWorker() *Worker {
var waiting bool
p.lock.Lock()
defer p.lock.Unlock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
if n < 0 {
if p.Running() >= p.Cap() {
waiting = true
} else {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
return cacheWorker.(*Worker)
}
}
waiting = p.Running() >= p.Cap()
} else {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
p.lock.Unlock()
}
if waiting {
@ -222,7 +215,12 @@ func (p *Pool) retrieveWorker() *Worker {
p.workers = p.workers[:l]
break
}
p.lock.Unlock()
} else if w == nil {
p.lock.Unlock()
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
return cacheWorker.(*Worker)
}
w = &Worker{
pool: p,
task: make(chan f, workerChanCap),

View File

@ -193,26 +193,18 @@ func (p *PoolWithFunc) decRunning() {
// retrieveWorker returns a available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
var w *WorkerWithFunc
waiting := false
var waiting bool
p.lock.Lock()
defer p.lock.Unlock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
if n < 0 {
waiting = p.Running() >= p.Cap()
if p.Running() >= p.Cap() {
waiting = true
} else {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
return cacheWorker.(*WorkerWithFunc)
}
}
} else {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
p.lock.Unlock()
}
if waiting {
@ -227,7 +219,12 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
p.workers = p.workers[:l]
break
}
p.lock.Unlock()
} else if w == nil {
p.lock.Unlock()
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
return cacheWorker.(*WorkerWithFunc)
}
w = &WorkerWithFunc{
pool: p,
args: make(chan interface{}, workerChanCap),