diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 698c5d4..0d6e0c6 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -72,7 +72,6 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { } wg.Wait() } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkAntsPoolWithFunc(b *testing.B) { @@ -88,9 +87,8 @@ func BenchmarkAntsPoolWithFunc(b *testing.B) { p.Serve(loop) } wg.Wait() - //b.Logf("running goroutines: %d", p.Running()) + b.Logf("running goroutines: %d", p.Running()) } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkGoroutine(b *testing.B) { @@ -100,15 +98,14 @@ func BenchmarkGoroutine(b *testing.B) { } } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } func BenchmarkAntsPool(b *testing.B) { + b.N = 3 for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { ants.Push(demoFunc) } b.Logf("running goroutines: %d", ants.Running()) } - //b.Logf("total memory usage:%d MB", mem.TotalAlloc/MiB) } diff --git a/examples/main.go b/examples/main.go index 5f71174..719c6b5 100644 --- a/examples/main.go +++ b/examples/main.go @@ -69,5 +69,6 @@ func main() { p.Serve(str) } wg.Wait() + fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Println("finish all tasks!") } diff --git a/pool_func.go b/pool_func.go index 0e888e5..6ea84fb 100644 --- a/pool_func.go +++ b/pool_func.go @@ -26,7 +26,6 @@ import ( "math" "sync" "sync/atomic" - "time" ) type pf func(interface{}) error @@ -55,10 +54,10 @@ type PoolWithFunc struct { lock sync.Mutex - // closed is used to confirm whether this pool has been closed. - closed int32 - poolFunc pf + + once sync.Once + } // NewPoolWithFunc generates a instance of ants pool with a specific function. @@ -69,8 +68,7 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { p := &PoolWithFunc{ capacity: int32(size), freeSignal: make(chan sig, math.MaxInt32), - release: make(chan sig), - closed: 0, + release: make(chan sig, 1), poolFunc: f, } @@ -79,27 +77,12 @@ func NewPoolWithFunc(size int, f pf) (*PoolWithFunc, error) { //------------------------------------------------------------------------- -// scanAndClean is a goroutine who will periodically clean up -// after it is noticed that this pool is closed. -func (p *PoolWithFunc) scanAndClean() { - ticker := time.NewTicker(DefaultCleanIntervalTime * time.Second) - go func() { - ticker.Stop() - for range ticker.C { - if atomic.LoadInt32(&p.closed) == 1 { - p.lock.Lock() - for _, w := range p.workers { - w.stop() - } - p.lock.Unlock() - } - } - }() -} - // Serve submit a task to pool func (p *PoolWithFunc) Serve(args interface{}) error { - if atomic.LoadInt32(&p.closed) == 1 { + //if atomic.LoadInt32(&p.closed) == 1 { + // return ErrPoolClosed + //} + if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() @@ -124,10 +107,9 @@ func (p *PoolWithFunc) Cap() int { // Release Closed this pool func (p *PoolWithFunc) Release() error { - p.lock.Lock() - atomic.StoreInt32(&p.closed, 1) - close(p.release) - p.lock.Unlock() + p.once.Do(func() { + p.release<- sig{} + }) return nil } diff --git a/worker_func.go b/worker_func.go index 87dd6a4..9580b79 100644 --- a/worker_func.go +++ b/worker_func.go @@ -43,7 +43,7 @@ func (w *WorkerWithFunc) run() { atomic.AddInt32(&w.pool.running, 1) go func() { for args := range w.args { - if args == nil { + if args == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } @@ -53,6 +53,26 @@ func (w *WorkerWithFunc) run() { }() } +//func (w *WorkerWithFunc) run() { +// atomic.AddInt32(&w.pool.running, 1) +// go func() { +// for { +// select { +// case args := <-w.args: +// if args == nil { +// atomic.AddInt32(&w.pool.running, -1) +// return +// } +// w.pool.poolFunc(args) +// w.pool.putWorker(w) +// case <-w.pool.release: +// atomic.AddInt32(&w.pool.running, -1) +// return +// } +// } +// }() +//} + // stop this worker. func (w *WorkerWithFunc) stop() { w.args <- nil