optimization for clearing expired workers and gofmt codes

This commit is contained in:
Andy Pan 2018-07-06 20:39:50 +08:00
commit 267107d0fb
7 changed files with 38 additions and 29 deletions

View File

@ -37,7 +37,6 @@ glide get github.com/panjf2000/ants
If your program will generate a massive number of goroutines and you don't want them to consume a vast amount of memory, with ants, all you need to do is to import ants package and submit all your tasks to the default limited pool created when ants was imported: If your program will generate a massive number of goroutines and you don't want them to consume a vast amount of memory, with ants, all you need to do is to import ants package and submit all your tasks to the default limited pool created when ants was imported:
``` go ``` go
package main package main
import ( import (
@ -84,8 +83,8 @@ func main() {
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// use the pool with a function // use the pool with a function
// set 10 the size of goroutine pool // set 10 the size of goroutine pool and 1 second for expired duration
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error { p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
return nil return nil
@ -132,6 +131,15 @@ Don't worry about the synchronous problems in this case, this method is thread-s
All the tasks submitted to ants pool will not be guaranteed to be processed in order, because those tasks distribute among a series of concurrent workers, thus those tasks are processed concurrently. All the tasks submitted to ants pool will not be guaranteed to be processed in order, because those tasks distribute among a series of concurrent workers, thus those tasks are processed concurrently.
## Benchmarks ## Benchmarks
```
OS : macOS High Sierra
Processor : 2.7 GHz Intel Core i5
Memory : 8 GB 1867 MHz DDR3
Go1.9
```
<div align="center"><img src="ants_benchmarks.png"/></div> <div align="center"><img src="ants_benchmarks.png"/></div>
In that benchmark-picture, the first and second benchmarks performed test with 100w tasks and the rest of benchmarks performed test with 1000w tasks, both unlimited goroutines and ants pool, and the capacity of this ants goroutine-pool was limited to 5w. In that benchmark-picture, the first and second benchmarks performed test with 100w tasks and the rest of benchmarks performed test with 1000w tasks, both unlimited goroutines and ants pool, and the capacity of this ants goroutine-pool was limited to 5w.

View File

@ -38,7 +38,6 @@ glide get github.com/panjf2000/ants
写 go 并发程序的时候如果程序会启动大量的 goroutine 势必会消耗大量的系统资源内存CPU通过使用 `ants`,可以实例化一个协程池,复用 goroutine ,节省资源,提升性能: 写 go 并发程序的时候如果程序会启动大量的 goroutine 势必会消耗大量的系统资源内存CPU通过使用 `ants`,可以实例化一个协程池,复用 goroutine ,节省资源,提升性能:
``` go ``` go
package main package main
import ( import (
@ -85,8 +84,8 @@ func main() {
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// use the pool with a function // use the pool with a function
// set 10 the size of goroutine pool // set 10 the size of goroutine pool and 1 second for expired duration
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) error { p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error {
myFunc(i) myFunc(i)
wg.Done() wg.Done()
return nil return nil
@ -137,6 +136,8 @@ pool.ReSize(100000) // Readjust its capacity to 100000
OS : macOS High Sierra OS : macOS High Sierra
Processor : 2.7 GHz Intel Core i5 Processor : 2.7 GHz Intel Core i5
Memory : 8 GB 1867 MHz DDR3 Memory : 8 GB 1867 MHz DDR3
Go1.9
``` ```

View File

@ -68,4 +68,3 @@ var (
ErrPoolSizeInvalid = errors.New("invalid size for pool") ErrPoolSizeInvalid = errors.New("invalid size for pool")
ErrPoolClosed = errors.New("this pool has been closed") ErrPoolClosed = errors.New("this pool has been closed")
) )

View File

@ -32,14 +32,14 @@ import (
const ( const (
_ = 1 << (10 * iota) _ = 1 << (10 * iota)
KiB // 1024 KiB // 1024
MiB // 1048576 MiB // 1048576
GiB // 1073741824 GiB // 1073741824
TiB // 1099511627776 (超过了int32的范围) TiB // 1099511627776 (超过了int32的范围)
PiB // 1125899906842624 PiB // 1125899906842624
EiB // 1152921504606846976 EiB // 1152921504606846976
ZiB // 1180591620717411303424 (超过了int64的范围) ZiB // 1180591620717411303424 (超过了int64的范围)
YiB // 1208925819614629174706176 YiB // 1208925819614629174706176
) )
const RunTimes = 1000000 const RunTimes = 1000000
const loop = 10 const loop = 10
@ -78,7 +78,7 @@ func BenchmarkGoroutineWithFunc(b *testing.B) {
func BenchmarkAntsPoolWithFunc(b *testing.B) { func BenchmarkAntsPoolWithFunc(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(50000, 1,func(i interface{}) error { p, _ := ants.NewPoolWithFunc(50000, 1, func(i interface{}) error {
demoPoolFunc(i) demoPoolFunc(i)
wg.Done() wg.Done()
return nil return nil

View File

@ -51,7 +51,7 @@ func main() {
runTimes := 1000 runTimes := 1000
// use the common pool // use the common pool
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < runTimes; i++ { for i := 0; i < runTimes; i++ {
wg.Add(1) wg.Add(1)
@ -66,7 +66,7 @@ func main() {
fmt.Printf("finish all tasks.\n") fmt.Printf("finish all tasks.\n")
// use the pool with a function // use the pool with a function
// set 10 the size of goroutine pool // set 10 the size of goroutine pool and 1 second for expired duration
p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error { p, _ := ants.NewPoolWithFunc(10, 1, func(i interface{}) error {
myFunc(i) myFunc(i)
wg.Done() wg.Done()

12
pool.go
View File

@ -76,9 +76,10 @@ func (p *Pool) monitorAndClear() {
n = i n = i
w.stop() w.stop()
idleWorkers[i] = nil idleWorkers[i] = nil
p.running--
} }
if n > 0 { if n > 0 {
n += 1 n++
p.workers = idleWorkers[n:] p.workers = idleWorkers[n:]
} }
p.lock.Unlock() p.lock.Unlock()
@ -86,16 +87,15 @@ func (p *Pool) monitorAndClear() {
}() }()
} }
// NewPool generates a instance of ants pool // NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) { func NewPool(size, expiry int) (*Pool, error) {
if size <= 0 { if size <= 0 {
return nil, ErrPoolSizeInvalid return nil, ErrPoolSizeInvalid
} }
p := &Pool{ p := &Pool{
capacity: int32(size), capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32), freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1), release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
} }
p.monitorAndClear() p.monitorAndClear()
@ -137,7 +137,7 @@ func (p *Pool) Release() error {
for i := 0; i < running; i++ { for i := 0; i < running; i++ {
p.getWorker().stop() p.getWorker().stop()
} }
for i := range p.workers{ for i := range p.workers {
p.workers[i] = nil p.workers[i] = nil
} }
}) })

View File

@ -77,6 +77,7 @@ func (p *PoolWithFunc) MonitorAndClear() {
n = i n = i
w.stop() w.stop()
idleWorkers[i] = nil idleWorkers[i] = nil
p.running--
} }
if n > 0 { if n > 0 {
n += 1 n += 1
@ -93,11 +94,11 @@ func NewPoolWithFunc(size, expiry int, f pf) (*PoolWithFunc, error) {
return nil, ErrPoolSizeInvalid return nil, ErrPoolSizeInvalid
} }
p := &PoolWithFunc{ p := &PoolWithFunc{
capacity: int32(size), capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32), freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1), release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second, expiryDuration: time.Duration(expiry) * time.Second,
poolFunc: f, poolFunc: f,
} }
p.MonitorAndClear() p.MonitorAndClear()
return p, nil return p, nil
@ -141,7 +142,7 @@ func (p *PoolWithFunc) Release() error {
for i := 0; i < running; i++ { for i := 0; i < running; i++ {
p.getWorker().stop() p.getWorker().stop()
} }
for i := range p.workers{ for i := range p.workers {
p.workers[i] = nil p.workers[i] = nil
} }
}) })