diff --git a/ants_test.go b/ants_test.go index 9d094d8..af7562b 100644 --- a/ants_test.go +++ b/ants_test.go @@ -74,6 +74,26 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) { t.Logf("memory usage:%d MB", curMem) } +func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) { + var wg sync.WaitGroup + p, _ := ants.NewPoolPreMalloc(AntsSize) + defer p.Release() + + for i := 0; i < n; i++ { + wg.Add(1) + p.Submit(func() { + demoPoolFunc(Param) + wg.Done() + }) + } + wg.Wait() + t.Logf("pool, running workers number:%d", p.Running()) + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + curMem = mem.TotalAlloc/MiB - curMem + t.Logf("memory usage:%d MB", curMem) +} + // TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker. func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) { var wg sync.WaitGroup @@ -95,6 +115,26 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) { t.Logf("memory usage:%d MB", curMem) } +func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) { + var wg sync.WaitGroup + p, _ := ants.NewPoolWithFuncPreMalloc(AntsSize, func(i interface{}) { + demoPoolFunc(i) + wg.Done() + }) + defer p.Release() + + for i := 0; i < n; i++ { + wg.Add(1) + p.Invoke(Param) + } + wg.Wait() + t.Logf("pool with func, running workers number:%d", p.Running()) + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + curMem = mem.TotalAlloc/MiB - curMem + t.Logf("memory usage:%d MB", curMem) +} + // TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool. func TestAntsPoolGetWorkerFromCache(t *testing.T) { p, _ := ants.NewPool(TestSize) @@ -130,6 +170,23 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) { t.Logf("memory usage:%d MB", curMem) } +func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) { + dur := 10 + p, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc) + defer p.Release() + + for i := 0; i < AntsSize; i++ { + p.Invoke(dur) + } + time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + p.Invoke(dur) + t.Logf("pool with func, running workers number:%d", p.Running()) + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + curMem = mem.TotalAlloc/MiB - curMem + t.Logf("memory usage:%d MB", curMem) +} + //------------------------------------------------------------------------------------------- // Contrast between goroutines without a pool and goroutines with ants pool. //------------------------------------------------------------------------------------------- @@ -224,6 +281,55 @@ func TestPanicHandler(t *testing.T) { } } +func TestPanicHandlerPreMalloc(t *testing.T) { + p0, err := ants.NewPoolPreMalloc(10) + if err != nil { + t.Fatalf("create new pool failed: %s", err.Error()) + } + defer p0.Release() + var panicCounter int64 + var wg sync.WaitGroup + p0.PanicHandler = func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + t.Logf("catch panic with PanicHandler: %v", p) + } + wg.Add(1) + p0.Submit(func() { + panic("Oops!") + }) + wg.Wait() + c := atomic.LoadInt64(&panicCounter) + if c != 1 { + t.Errorf("panic handler didn't work, panicCounter: %d", c) + } + if p0.Running() != 0 { + t.Errorf("pool should be empty after panic") + } + + p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { + panic(p) + }) + if err != nil { + t.Fatalf("create new pool with func failed: %s", err.Error()) + } + defer p1.Release() + p1.PanicHandler = func(p interface{}) { + defer wg.Done() + atomic.AddInt64(&panicCounter, 1) + } + wg.Add(1) + p1.Invoke("Oops!") + wg.Wait() + c = atomic.LoadInt64(&panicCounter) + if c != 2 { + t.Errorf("panic handler didn't work, panicCounter: %d", c) + } + if p1.Running() != 0 { + t.Errorf("pool should be empty after panic") + } +} + func TestPoolPanicWithoutHandler(t *testing.T) { p0, err := ants.NewPool(10) if err != nil { @@ -244,6 +350,26 @@ func TestPoolPanicWithoutHandler(t *testing.T) { p1.Invoke("Oops!") } +func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { + p0, err := ants.NewPoolPreMalloc(10) + if err != nil { + t.Fatalf("create new pool failed: %s", err.Error()) + } + defer p0.Release() + p0.Submit(func() { + panic("Oops!") + }) + + p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { + panic(p) + }) + if err != nil { + t.Fatalf("create new pool with func failed: %s", err.Error()) + } + defer p1.Release() + p1.Invoke("Oops!") +} + func TestPurge(t *testing.T) { p, err := ants.NewPool(10) defer p.Release() @@ -267,14 +393,37 @@ func TestPurge(t *testing.T) { } } +func TestPurgePreMalloc(t *testing.T) { + p, err := ants.NewPoolPreMalloc(10) + defer p.Release() + if err != nil { + t.Fatalf("create TimingPool failed: %s", err.Error()) + } + p.Submit(demoFunc) + time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + if p.Running() != 0 { + t.Error("all p should be purged") + } + p1, err := ants.NewPoolWithFunc(10, demoPoolFunc) + defer p1.Release() + if err != nil { + t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error()) + } + p1.Invoke(1) + time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + if p.Running() != 0 { + t.Error("all p should be purged") + } +} + func TestRestCodeCoverage(t *testing.T) { - _, err := ants.NewTimingPool(-1, -1) + _, err := ants.NewTimingPool(-1, -1, false) t.Log(err) - _, err = ants.NewTimingPool(1, -1) + _, err = ants.NewTimingPool(1, -1, false) t.Log(err) - _, err = ants.NewTimingPoolWithFunc(-1, -1, demoPoolFunc) + _, err = ants.NewTimingPoolWithFunc(-1, -1, demoPoolFunc, false) t.Log(err) - _, err = ants.NewTimingPoolWithFunc(1, -1, demoPoolFunc) + _, err = ants.NewTimingPoolWithFunc(1, -1, demoPoolFunc, false) t.Log(err) p0, _ := ants.NewPool(TestSize) @@ -290,6 +439,19 @@ func TestRestCodeCoverage(t *testing.T) { p0.Tune(TestSize / 10) t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running()) + pprem, _ := ants.NewPoolPreMalloc(TestSize) + defer pprem.Submit(demoFunc) + defer pprem.Release() + for i := 0; i < n; i++ { + pprem.Submit(demoFunc) + } + t.Logf("pool with pre-malloc, capacity:%d", pprem.Cap()) + t.Logf("pool with pre-malloc, running workers number:%d", pprem.Running()) + t.Logf("pool with pre-malloc, free workers number:%d", pprem.Free()) + pprem.Tune(TestSize) + pprem.Tune(TestSize / 10) + t.Logf("pool with pre-malloc, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running()) + p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc) defer p.Invoke(Param) defer p.Release() @@ -303,4 +465,18 @@ func TestRestCodeCoverage(t *testing.T) { p.Tune(TestSize) p.Tune(TestSize / 10) t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running()) + + ppremWithFunc, _ := ants.NewPoolWithFuncPreMalloc(TestSize, demoPoolFunc) + defer ppremWithFunc.Invoke(Param) + defer ppremWithFunc.Release() + for i := 0; i < n; i++ { + ppremWithFunc.Invoke(Param) + } + time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second) + t.Logf("pool with func, capacity:%d", ppremWithFunc.Cap()) + t.Logf("pool with func, running workers number:%d", ppremWithFunc.Running()) + t.Logf("pool with func, free workers number:%d", ppremWithFunc.Free()) + ppremWithFunc.Tune(TestSize) + ppremWithFunc.Tune(TestSize / 10) + t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running()) } diff --git a/coverage.txt b/coverage.txt new file mode 100644 index 0000000..79b28a0 --- /dev/null +++ b/coverage.txt @@ -0,0 +1 @@ +mode: atomic diff --git a/pool.go b/pool.go index 5fa2512..e561c76 100644 --- a/pool.go +++ b/pool.go @@ -104,20 +104,34 @@ func (p *Pool) periodicallyPurge() { // NewPool generates an instance of ants pool. func NewPool(size int) (*Pool, error) { - return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME) + return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME, false) +} + +// NewPoolPreMalloc generates an instance of ants pool with the memory pre-allocation of pool size. +func NewPoolPreMalloc(size int) (*Pool, error) { + return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME, true) } // NewTimingPool generates an instance of ants pool with a custom timed task. -func NewTimingPool(size, expiry int) (*Pool, error) { +func NewTimingPool(size, expiry int, preAlloc bool) (*Pool, error) { if size <= 0 { return nil, ErrInvalidPoolSize } if expiry <= 0 { return nil, ErrInvalidPoolExpiry } - p := &Pool{ - capacity: int32(size), - expiryDuration: time.Duration(expiry) * time.Second, + var p *Pool + if preAlloc { + p = &Pool{ + capacity: int32(size), + expiryDuration: time.Duration(expiry) * time.Second, + workers: make([]*Worker, 0, size), + } + } else { + p = &Pool{ + capacity: int32(size), + expiryDuration: time.Duration(expiry) * time.Second, + } } p.cond = sync.NewCond(&p.lock) go p.periodicallyPurge() diff --git a/pool_func.go b/pool_func.go index 6513733..d5ae85b 100644 --- a/pool_func.go +++ b/pool_func.go @@ -107,21 +107,36 @@ func (p *PoolWithFunc) periodicallyPurge() { // NewPoolWithFunc generates an instance of ants pool with a specific function. func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error) { - return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf) + return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, false) +} + +// NewPoolWithFuncPreMalloc generates an instance of ants pool with a specific function and the memory pre-allocation of pool size. +func NewPoolWithFuncPreMalloc(size int, pf func(interface{})) (*PoolWithFunc, error) { + return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf, true) } // NewTimingPoolWithFunc generates an instance of ants pool with a specific function and a custom timed task. -func NewTimingPoolWithFunc(size, expiry int, pf func(interface{})) (*PoolWithFunc, error) { +func NewTimingPoolWithFunc(size, expiry int, pf func(interface{}), preAlloc bool) (*PoolWithFunc, error) { if size <= 0 { return nil, ErrInvalidPoolSize } if expiry <= 0 { return nil, ErrInvalidPoolExpiry } - p := &PoolWithFunc{ - capacity: int32(size), - expiryDuration: time.Duration(expiry) * time.Second, - poolFunc: pf, + var p *PoolWithFunc + if preAlloc { + p = &PoolWithFunc{ + capacity: int32(size), + expiryDuration: time.Duration(expiry) * time.Second, + poolFunc: pf, + workers: make([]*WorkerWithFunc, 0, size), + } + } else { + p = &PoolWithFunc{ + capacity: int32(size), + expiryDuration: time.Duration(expiry) * time.Second, + poolFunc: pf, + } } p.cond = sync.NewCond(&p.lock) go p.periodicallyPurge() diff --git a/worker.go b/worker.go index 1e0de31..332f772 100644 --- a/worker.go +++ b/worker.go @@ -24,6 +24,7 @@ package ants import ( "log" + "runtime" "time" ) @@ -53,7 +54,10 @@ func (w *Worker) run() { if w.pool.PanicHandler != nil { w.pool.PanicHandler(p) } else { - log.Printf("worker exits from a panic: %v", p) + log.Printf("worker exits from a panic: %v\n", p) + var buf [4096]byte + n := runtime.Stack(buf[:], false) + log.Printf("worker exits from panic: %s\n", string(buf[:n])) } } }() diff --git a/worker_func.go b/worker_func.go index 9b716ca..b13fde1 100644 --- a/worker_func.go +++ b/worker_func.go @@ -24,6 +24,7 @@ package ants import ( "log" + "runtime" "time" ) @@ -53,7 +54,10 @@ func (w *WorkerWithFunc) run() { if w.pool.PanicHandler != nil { w.pool.PanicHandler(p) } else { - log.Printf("worker exits from a panic: %v", p) + log.Printf("worker with func exits from a panic: %v\n", p) + var buf [4096]byte + n := runtime.Stack(buf[:], false) + log.Printf("worker with func exits from panic: %s\n", string(buf[:n])) } } }()