mirror of https://github.com/panjf2000/ants.git
optimize memory allocation and add the log of panic stack
This commit is contained in:
parent
f447bf104a
commit
3e1c7a03a5
184
ants_test.go
184
ants_test.go
|
@ -74,6 +74,26 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) {
|
||||||
t.Logf("memory usage:%d MB", curMem)
|
t.Logf("memory usage:%d MB", curMem)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
p, _ := ants.NewPoolPreMalloc(AntsSize)
|
||||||
|
defer p.Release()
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
p.Submit(func() {
|
||||||
|
demoPoolFunc(Param)
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
t.Logf("pool, running workers number:%d", p.Running())
|
||||||
|
mem := runtime.MemStats{}
|
||||||
|
runtime.ReadMemStats(&mem)
|
||||||
|
curMem = mem.TotalAlloc/MiB - curMem
|
||||||
|
t.Logf("memory usage:%d MB", curMem)
|
||||||
|
}
|
||||||
|
|
||||||
// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
|
// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
|
||||||
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
|
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
@ -95,6 +115,26 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
|
||||||
t.Logf("memory usage:%d MB", curMem)
|
t.Logf("memory usage:%d MB", curMem)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
p, _ := ants.NewPoolWithFuncPreMalloc(AntsSize, func(i interface{}) {
|
||||||
|
demoPoolFunc(i)
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
defer p.Release()
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
p.Invoke(Param)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
t.Logf("pool with func, running workers number:%d", p.Running())
|
||||||
|
mem := runtime.MemStats{}
|
||||||
|
runtime.ReadMemStats(&mem)
|
||||||
|
curMem = mem.TotalAlloc/MiB - curMem
|
||||||
|
t.Logf("memory usage:%d MB", curMem)
|
||||||
|
}
|
||||||
|
|
||||||
// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
|
// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
|
||||||
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
|
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
|
||||||
p, _ := ants.NewPool(TestSize)
|
p, _ := ants.NewPool(TestSize)
|
||||||
|
@ -130,6 +170,23 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
|
||||||
t.Logf("memory usage:%d MB", curMem)
|
t.Logf("memory usage:%d MB", curMem)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
|
||||||
|
dur := 10
|
||||||
|
p, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc)
|
||||||
|
defer p.Release()
|
||||||
|
|
||||||
|
for i := 0; i < AntsSize; i++ {
|
||||||
|
p.Invoke(dur)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
|
||||||
|
p.Invoke(dur)
|
||||||
|
t.Logf("pool with func, running workers number:%d", p.Running())
|
||||||
|
mem := runtime.MemStats{}
|
||||||
|
runtime.ReadMemStats(&mem)
|
||||||
|
curMem = mem.TotalAlloc/MiB - curMem
|
||||||
|
t.Logf("memory usage:%d MB", curMem)
|
||||||
|
}
|
||||||
|
|
||||||
//-------------------------------------------------------------------------------------------
|
//-------------------------------------------------------------------------------------------
|
||||||
// Contrast between goroutines without a pool and goroutines with ants pool.
|
// Contrast between goroutines without a pool and goroutines with ants pool.
|
||||||
//-------------------------------------------------------------------------------------------
|
//-------------------------------------------------------------------------------------------
|
||||||
|
@ -224,6 +281,55 @@ func TestPanicHandler(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPanicHandlerPreMalloc(t *testing.T) {
|
||||||
|
p0, err := ants.NewPoolPreMalloc(10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create new pool failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer p0.Release()
|
||||||
|
var panicCounter int64
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
p0.PanicHandler = func(p interface{}) {
|
||||||
|
defer wg.Done()
|
||||||
|
atomic.AddInt64(&panicCounter, 1)
|
||||||
|
t.Logf("catch panic with PanicHandler: %v", p)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
p0.Submit(func() {
|
||||||
|
panic("Oops!")
|
||||||
|
})
|
||||||
|
wg.Wait()
|
||||||
|
c := atomic.LoadInt64(&panicCounter)
|
||||||
|
if c != 1 {
|
||||||
|
t.Errorf("panic handler didn't work, panicCounter: %d", c)
|
||||||
|
}
|
||||||
|
if p0.Running() != 0 {
|
||||||
|
t.Errorf("pool should be empty after panic")
|
||||||
|
}
|
||||||
|
|
||||||
|
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) {
|
||||||
|
panic(p)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create new pool with func failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer p1.Release()
|
||||||
|
p1.PanicHandler = func(p interface{}) {
|
||||||
|
defer wg.Done()
|
||||||
|
atomic.AddInt64(&panicCounter, 1)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
p1.Invoke("Oops!")
|
||||||
|
wg.Wait()
|
||||||
|
c = atomic.LoadInt64(&panicCounter)
|
||||||
|
if c != 2 {
|
||||||
|
t.Errorf("panic handler didn't work, panicCounter: %d", c)
|
||||||
|
}
|
||||||
|
if p1.Running() != 0 {
|
||||||
|
t.Errorf("pool should be empty after panic")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPoolPanicWithoutHandler(t *testing.T) {
|
func TestPoolPanicWithoutHandler(t *testing.T) {
|
||||||
p0, err := ants.NewPool(10)
|
p0, err := ants.NewPool(10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -244,6 +350,26 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
|
||||||
p1.Invoke("Oops!")
|
p1.Invoke("Oops!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
|
||||||
|
p0, err := ants.NewPoolPreMalloc(10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create new pool failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer p0.Release()
|
||||||
|
p0.Submit(func() {
|
||||||
|
panic("Oops!")
|
||||||
|
})
|
||||||
|
|
||||||
|
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) {
|
||||||
|
panic(p)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create new pool with func failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer p1.Release()
|
||||||
|
p1.Invoke("Oops!")
|
||||||
|
}
|
||||||
|
|
||||||
func TestPurge(t *testing.T) {
|
func TestPurge(t *testing.T) {
|
||||||
p, err := ants.NewPool(10)
|
p, err := ants.NewPool(10)
|
||||||
defer p.Release()
|
defer p.Release()
|
||||||
|
@ -267,14 +393,37 @@ func TestPurge(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPurgePreMalloc(t *testing.T) {
|
||||||
|
p, err := ants.NewPoolPreMalloc(10)
|
||||||
|
defer p.Release()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create TimingPool failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
p.Submit(demoFunc)
|
||||||
|
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
|
||||||
|
if p.Running() != 0 {
|
||||||
|
t.Error("all p should be purged")
|
||||||
|
}
|
||||||
|
p1, err := ants.NewPoolWithFunc(10, demoPoolFunc)
|
||||||
|
defer p1.Release()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
p1.Invoke(1)
|
||||||
|
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
|
||||||
|
if p.Running() != 0 {
|
||||||
|
t.Error("all p should be purged")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRestCodeCoverage(t *testing.T) {
|
func TestRestCodeCoverage(t *testing.T) {
|
||||||
_, err := ants.NewTimingPool(-1, -1)
|
_, err := ants.NewTimingPool(-1, -1, false)
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
_, err = ants.NewTimingPool(1, -1)
|
_, err = ants.NewTimingPool(1, -1, false)
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
_, err = ants.NewTimingPoolWithFunc(-1, -1, demoPoolFunc)
|
_, err = ants.NewTimingPoolWithFunc(-1, -1, demoPoolFunc, false)
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
_, err = ants.NewTimingPoolWithFunc(1, -1, demoPoolFunc)
|
_, err = ants.NewTimingPoolWithFunc(1, -1, demoPoolFunc, false)
|
||||||
t.Log(err)
|
t.Log(err)
|
||||||
|
|
||||||
p0, _ := ants.NewPool(TestSize)
|
p0, _ := ants.NewPool(TestSize)
|
||||||
|
@ -290,6 +439,19 @@ func TestRestCodeCoverage(t *testing.T) {
|
||||||
p0.Tune(TestSize / 10)
|
p0.Tune(TestSize / 10)
|
||||||
t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
|
t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
|
||||||
|
|
||||||
|
pprem, _ := ants.NewPoolPreMalloc(TestSize)
|
||||||
|
defer pprem.Submit(demoFunc)
|
||||||
|
defer pprem.Release()
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
pprem.Submit(demoFunc)
|
||||||
|
}
|
||||||
|
t.Logf("pool with pre-malloc, capacity:%d", pprem.Cap())
|
||||||
|
t.Logf("pool with pre-malloc, running workers number:%d", pprem.Running())
|
||||||
|
t.Logf("pool with pre-malloc, free workers number:%d", pprem.Free())
|
||||||
|
pprem.Tune(TestSize)
|
||||||
|
pprem.Tune(TestSize / 10)
|
||||||
|
t.Logf("pool with pre-malloc, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
|
||||||
|
|
||||||
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc)
|
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc)
|
||||||
defer p.Invoke(Param)
|
defer p.Invoke(Param)
|
||||||
defer p.Release()
|
defer p.Release()
|
||||||
|
@ -303,4 +465,18 @@ func TestRestCodeCoverage(t *testing.T) {
|
||||||
p.Tune(TestSize)
|
p.Tune(TestSize)
|
||||||
p.Tune(TestSize / 10)
|
p.Tune(TestSize / 10)
|
||||||
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
|
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
|
||||||
|
|
||||||
|
ppremWithFunc, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc)
|
||||||
|
defer ppremWithFunc.Invoke(Param)
|
||||||
|
defer ppremWithFunc.Release()
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
ppremWithFunc.Invoke(Param)
|
||||||
|
}
|
||||||
|
time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
|
||||||
|
t.Logf("pool with func, capacity:%d", ppremWithFunc.Cap())
|
||||||
|
t.Logf("pool with func, running workers number:%d", ppremWithFunc.Running())
|
||||||
|
t.Logf("pool with func, free workers number:%d", ppremWithFunc.Free())
|
||||||
|
ppremWithFunc.Tune(TestSize)
|
||||||
|
ppremWithFunc.Tune(TestSize / 10)
|
||||||
|
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
mode: atomic
|
24
pool.go
24
pool.go
|
@ -104,20 +104,34 @@ func (p *Pool) periodicallyPurge() {
|
||||||
|
|
||||||
// NewPool generates an instance of ants pool.
|
// NewPool generates an instance of ants pool.
|
||||||
func NewPool(size int) (*Pool, error) {
|
func NewPool(size int) (*Pool, error) {
|
||||||
return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME)
|
return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPoolPreMalloc generates an instance of ants pool with the memory pre-allocation of pool size.
|
||||||
|
func NewPoolPreMalloc(size int) (*Pool, error) {
|
||||||
|
return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTimingPool generates an instance of ants pool with a custom timed task.
|
// NewTimingPool generates an instance of ants pool with a custom timed task.
|
||||||
func NewTimingPool(size, expiry int) (*Pool, error) {
|
func NewTimingPool(size, expiry int, preAlloc bool) (*Pool, error) {
|
||||||
if size <= 0 {
|
if size <= 0 {
|
||||||
return nil, ErrInvalidPoolSize
|
return nil, ErrInvalidPoolSize
|
||||||
}
|
}
|
||||||
if expiry <= 0 {
|
if expiry <= 0 {
|
||||||
return nil, ErrInvalidPoolExpiry
|
return nil, ErrInvalidPoolExpiry
|
||||||
}
|
}
|
||||||
p := &Pool{
|
var p *Pool
|
||||||
capacity: int32(size),
|
if preAlloc {
|
||||||
expiryDuration: time.Duration(expiry) * time.Second,
|
p = &Pool{
|
||||||
|
capacity: int32(size),
|
||||||
|
expiryDuration: time.Duration(expiry) * time.Second,
|
||||||
|
workers: make([]*Worker, 0, size),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p = &Pool{
|
||||||
|
capacity: int32(size),
|
||||||
|
expiryDuration: time.Duration(expiry) * time.Second,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p.cond = sync.NewCond(&p.lock)
|
p.cond = sync.NewCond(&p.lock)
|
||||||
go p.periodicallyPurge()
|
go p.periodicallyPurge()
|
||||||
|
|
27
pool_func.go
27
pool_func.go
|
@ -107,21 +107,36 @@ func (p *PoolWithFunc) periodicallyPurge() {
|
||||||
|
|
||||||
// NewPoolWithFunc generates an instance of ants pool with a specific function.
|
// NewPoolWithFunc generates an instance of ants pool with a specific function.
|
||||||
func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error) {
|
func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error) {
|
||||||
return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf)
|
return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPoolWithFuncPreMalloc generates an instance of ants pool with a specific function and the memory pre-allocation of pool size.
|
||||||
|
func NewPoolWithFuncPreMalloc(size int, pf func(interface{})) (*PoolWithFunc, error) {
|
||||||
|
return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTimingPoolWithFunc generates an instance of ants pool with a specific function and a custom timed task.
|
// NewTimingPoolWithFunc generates an instance of ants pool with a specific function and a custom timed task.
|
||||||
func NewTimingPoolWithFunc(size, expiry int, pf func(interface{})) (*PoolWithFunc, error) {
|
func NewTimingPoolWithFunc(size, expiry int, pf func(interface{}), preAlloc bool) (*PoolWithFunc, error) {
|
||||||
if size <= 0 {
|
if size <= 0 {
|
||||||
return nil, ErrInvalidPoolSize
|
return nil, ErrInvalidPoolSize
|
||||||
}
|
}
|
||||||
if expiry <= 0 {
|
if expiry <= 0 {
|
||||||
return nil, ErrInvalidPoolExpiry
|
return nil, ErrInvalidPoolExpiry
|
||||||
}
|
}
|
||||||
p := &PoolWithFunc{
|
var p *PoolWithFunc
|
||||||
capacity: int32(size),
|
if preAlloc {
|
||||||
expiryDuration: time.Duration(expiry) * time.Second,
|
p = &PoolWithFunc{
|
||||||
poolFunc: pf,
|
capacity: int32(size),
|
||||||
|
expiryDuration: time.Duration(expiry) * time.Second,
|
||||||
|
poolFunc: pf,
|
||||||
|
workers: make([]*WorkerWithFunc, 0, size),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p = &PoolWithFunc{
|
||||||
|
capacity: int32(size),
|
||||||
|
expiryDuration: time.Duration(expiry) * time.Second,
|
||||||
|
poolFunc: pf,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p.cond = sync.NewCond(&p.lock)
|
p.cond = sync.NewCond(&p.lock)
|
||||||
go p.periodicallyPurge()
|
go p.periodicallyPurge()
|
||||||
|
|
|
@ -24,6 +24,7 @@ package ants
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,7 +54,10 @@ func (w *Worker) run() {
|
||||||
if w.pool.PanicHandler != nil {
|
if w.pool.PanicHandler != nil {
|
||||||
w.pool.PanicHandler(p)
|
w.pool.PanicHandler(p)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("worker exits from a panic: %v", p)
|
log.Printf("worker exits from a panic: %v\n", p)
|
||||||
|
var buf [4096]byte
|
||||||
|
n := runtime.Stack(buf[:], false)
|
||||||
|
log.Printf("worker exits from panic: %s\n", string(buf[:n]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -24,6 +24,7 @@ package ants
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,7 +54,10 @@ func (w *WorkerWithFunc) run() {
|
||||||
if w.pool.PanicHandler != nil {
|
if w.pool.PanicHandler != nil {
|
||||||
w.pool.PanicHandler(p)
|
w.pool.PanicHandler(p)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("worker exits from a panic: %v", p)
|
log.Printf("worker with func exits from a panic: %v\n", p)
|
||||||
|
var buf [4096]byte
|
||||||
|
n := runtime.Stack(buf[:], false)
|
||||||
|
log.Printf("worker with func exits from panic: %s\n", string(buf[:n]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in New Issue