From 566511ec5f3ccafb29d16853fc76ab5a4d370e3f Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Thu, 10 Oct 2019 03:02:04 +0800 Subject: [PATCH] Refactoring to the interface and implementations of worker-array --- ants_benchmark_test.go | 48 ++++++++--------- ants_test.go | 108 +++++++++++++++++++------------------- examples/main.go | 3 ++ pool.go | 20 +++---- worker_array.go | 41 +++++++++++++++ worker_loop_queue.go | 68 +++++------------------- worker_loop_queue_test.go | 20 +++---- worker_queue.go | 39 -------------- worker_stack.go | 33 ++++-------- worker_stack_test.go | 24 ++++----- 10 files changed, 171 insertions(+), 233 deletions(-) create mode 100644 worker_array.go delete mode 100644 worker_queue.go diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 7076222..0dfd8ee 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -20,26 +20,24 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package ants_test +package ants import ( "runtime" "sync" "testing" "time" - - "github.com/panjf2000/ants/v2" ) const ( - RunTimes = 1000000 - benchParam = 10 - benchAntsSize = 200000 + RunTimes = 1000000 + BenchParam = 10 + BenchAntsSize = 200000 + DefaultExpiredTime = 10 * time.Second ) func demoFunc() { - n := 10 - time.Sleep(time.Duration(n) * time.Millisecond) + time.Sleep(time.Duration(BenchParam) * time.Millisecond) } func demoPoolFunc(args interface{}) { @@ -63,13 +61,13 @@ func longRunningPoolFunc(arg interface{}) { } } -func BenchmarkGoroutineWithFunc(b *testing.B) { +func BenchmarkGoroutines(b *testing.B) { var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(RunTimes) for j := 0; j < RunTimes; j++ { go func() { - demoPoolFunc(benchParam) + demoFunc() wg.Done() }() } @@ -77,16 +75,16 @@ func BenchmarkGoroutineWithFunc(b *testing.B) { } } -func BenchmarkSemaphoreWithFunc(b *testing.B) { +func BenchmarkSemaphore(b *testing.B) { var wg sync.WaitGroup - sema := make(chan struct{}, benchAntsSize) + sema := make(chan struct{}, BenchAntsSize) for i := 0; i < b.N; i++ { wg.Add(RunTimes) for j := 0; j < RunTimes; j++ { sema <- struct{}{} go func() { - demoPoolFunc(benchParam) + demoFunc() <-sema wg.Done() }() @@ -95,40 +93,40 @@ func BenchmarkSemaphoreWithFunc(b *testing.B) { } } -func BenchmarkAntsPoolWithFunc(b *testing.B) { +func BenchmarkAntsPool(b *testing.B) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(benchAntsSize, func(i interface{}) { - demoPoolFunc(i) - wg.Done() - }) + p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime)) defer p.Release() b.StartTimer() for i := 0; i < b.N; i++ { wg.Add(RunTimes) for j := 0; j < RunTimes; j++ { - _ = p.Invoke(benchParam) + _ = p.Submit(func() { + demoFunc() + wg.Done() + }) } wg.Wait() } b.StopTimer() } -func BenchmarkGoroutineThroughput(b *testing.B) { +func BenchmarkGoroutinesThroughput(b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { - go demoPoolFunc(benchParam) + go demoFunc() } } } func BenchmarkSemaphoreThroughput(b *testing.B) { - sema := make(chan struct{}, benchAntsSize) + sema := make(chan struct{}, BenchAntsSize) for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { sema <- struct{}{} go func() { - demoPoolFunc(benchParam) + demoFunc() <-sema }() } @@ -136,12 +134,12 @@ func BenchmarkSemaphoreThroughput(b *testing.B) { } func BenchmarkAntsPoolThroughput(b *testing.B) { - p, _ := ants.NewPoolWithFunc(benchAntsSize, demoPoolFunc) + p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime)) defer p.Release() b.StartTimer() for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { - _ = p.Invoke(benchParam) + _ = p.Submit(demoFunc) } } b.StopTimer() diff --git a/ants_test.go b/ants_test.go index 5665f67..dd8203d 100644 --- a/ants_test.go +++ b/ants_test.go @@ -20,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package ants_test +package ants import ( "runtime" @@ -28,8 +28,6 @@ import ( "sync/atomic" "testing" "time" - - "github.com/panjf2000/ants/v2" ) const ( @@ -56,7 +54,7 @@ var curMem uint64 // TestAntsPoolWaitToGetWorker is used to test waiting to get worker. func TestAntsPoolWaitToGetWorker(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPool(AntsSize) + p, _ := NewPool(AntsSize) defer p.Release() for i := 0; i < n; i++ { @@ -76,7 +74,7 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) { func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPool(AntsSize, ants.WithPreAlloc(true)) + p, _ := NewPool(AntsSize, WithPreAlloc(true)) defer p.Release() for i := 0; i < n; i++ { @@ -97,7 +95,7 @@ func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) { // TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker. func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(AntsSize, func(i interface{}) { + p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) { demoPoolFunc(i) wg.Done() }) @@ -117,10 +115,10 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) { func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) { var wg sync.WaitGroup - p, _ := ants.NewPoolWithFunc(AntsSize, func(i interface{}) { + p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) { demoPoolFunc(i) wg.Done() - }, ants.WithPreAlloc(true)) + }, WithPreAlloc(true)) defer p.Release() for i := 0; i < n; i++ { @@ -137,13 +135,13 @@ func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) { // TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool. func TestAntsPoolGetWorkerFromCache(t *testing.T) { - p, _ := ants.NewPool(TestSize) + p, _ := NewPool(TestSize) defer p.Release() for i := 0; i < AntsSize; i++ { _ = p.Submit(demoFunc) } - time.Sleep(2 * ants.DefaultCleanIntervalTime) + time.Sleep(2 * DefaultCleanIntervalTime) _ = p.Submit(demoFunc) t.Logf("pool, running workers number:%d", p.Running()) mem := runtime.MemStats{} @@ -155,13 +153,13 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) { // TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool. func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) { dur := 10 - p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc) + p, _ := NewPoolWithFunc(TestSize, demoPoolFunc) defer p.Release() for i := 0; i < AntsSize; i++ { _ = p.Invoke(dur) } - time.Sleep(2 * ants.DefaultCleanIntervalTime) + time.Sleep(2 * DefaultCleanIntervalTime) _ = p.Invoke(dur) t.Logf("pool with func, running workers number:%d", p.Running()) mem := runtime.MemStats{} @@ -172,13 +170,13 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) { func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) { dur := 10 - p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true)) + p, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true)) defer p.Release() for i := 0; i < AntsSize; i++ { _ = p.Invoke(dur) } - time.Sleep(2 * ants.DefaultCleanIntervalTime) + time.Sleep(2 * DefaultCleanIntervalTime) _ = p.Invoke(dur) t.Logf("pool with func, running workers number:%d", p.Running()) mem := runtime.MemStats{} @@ -208,20 +206,20 @@ func TestNoPool(t *testing.T) { } func TestAntsPool(t *testing.T) { - defer ants.Release() + defer Release() var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) - _ = ants.Submit(func() { + _ = Submit(func() { demoFunc() wg.Done() }) } wg.Wait() - t.Logf("pool, capacity:%d", ants.Cap()) - t.Logf("pool, running workers number:%d", ants.Running()) - t.Logf("pool, free workers number:%d", ants.Free()) + t.Logf("pool, capacity:%d", Cap()) + t.Logf("pool, running workers number:%d", Running()) + t.Logf("pool, free workers number:%d", Free()) mem := runtime.MemStats{} runtime.ReadMemStats(&mem) @@ -235,7 +233,7 @@ func TestAntsPool(t *testing.T) { func TestPanicHandler(t *testing.T) { var panicCounter int64 var wg sync.WaitGroup - p0, err := ants.NewPool(10, ants.WithPanicHandler(func(p interface{}) { + p0, err := NewPool(10, WithPanicHandler(func(p interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) t.Logf("catch panic with PanicHandler: %v", p) @@ -257,7 +255,7 @@ func TestPanicHandler(t *testing.T) { t.Errorf("pool should be empty after panic") } - p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) })) @@ -280,7 +278,7 @@ func TestPanicHandler(t *testing.T) { func TestPanicHandlerPreMalloc(t *testing.T) { var panicCounter int64 var wg sync.WaitGroup - p0, err := ants.NewPool(10, ants.WithPreAlloc(true), ants.WithPanicHandler(func(p interface{}) { + p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) t.Logf("catch panic with PanicHandler: %v", p) @@ -302,7 +300,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) { t.Errorf("pool should be empty after panic") } - p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) { defer wg.Done() atomic.AddInt64(&panicCounter, 1) })) @@ -323,7 +321,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) { } func TestPoolPanicWithoutHandler(t *testing.T) { - p0, err := ants.NewPool(10) + p0, err := NewPool(10) if err != nil { t.Fatalf("create new pool failed: %s", err.Error()) } @@ -332,7 +330,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) { panic("Oops!") }) - p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }) if err != nil { @@ -343,7 +341,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) { } func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { - p0, err := ants.NewPool(10, ants.WithPreAlloc(true)) + p0, err := NewPool(10, WithPreAlloc(true)) if err != nil { t.Fatalf("create new pool failed: %s", err.Error()) } @@ -352,7 +350,7 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { panic("Oops!") }) - p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { + p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }) if err != nil { @@ -363,46 +361,46 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { } func TestPurge(t *testing.T) { - p, err := ants.NewPool(10) + p, err := NewPool(10) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } defer p.Release() _ = p.Submit(demoFunc) - time.Sleep(3 * ants.DefaultCleanIntervalTime) + time.Sleep(3 * DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } - p1, err := ants.NewPoolWithFunc(10, demoPoolFunc) + p1, err := NewPoolWithFunc(10, demoPoolFunc) if err != nil { t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error()) } defer p1.Release() _ = p1.Invoke(1) - time.Sleep(3 * ants.DefaultCleanIntervalTime) + time.Sleep(3 * DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } } func TestPurgePreMalloc(t *testing.T) { - p, err := ants.NewPool(10, ants.WithPreAlloc(true)) + p, err := NewPool(10, WithPreAlloc(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } defer p.Release() _ = p.Submit(demoFunc) - time.Sleep(3 * ants.DefaultCleanIntervalTime) + time.Sleep(3 * DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } - p1, err := ants.NewPoolWithFunc(10, demoPoolFunc) + p1, err := NewPoolWithFunc(10, demoPoolFunc) if err != nil { t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error()) } defer p1.Release() _ = p1.Invoke(1) - time.Sleep(3 * ants.DefaultCleanIntervalTime) + time.Sleep(3 * DefaultCleanIntervalTime) if p.Running() != 0 { t.Error("all p should be purged") } @@ -410,7 +408,7 @@ func TestPurgePreMalloc(t *testing.T) { func TestNonblockingSubmit(t *testing.T) { poolSize := 10 - p, err := ants.NewPool(poolSize, ants.WithNonblocking(true)) + p, err := NewPool(poolSize, WithNonblocking(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } @@ -430,7 +428,7 @@ func TestNonblockingSubmit(t *testing.T) { if err := p.Submit(f); err != nil { t.Fatalf("nonblocking submit when pool is not full shouldn't return error") } - if err := p.Submit(demoFunc); err == nil || err != ants.ErrPoolOverload { + if err := p.Submit(demoFunc); err == nil || err != ErrPoolOverload { t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload") } // interrupt f to get an available worker @@ -443,7 +441,7 @@ func TestNonblockingSubmit(t *testing.T) { func TestMaxBlockingSubmit(t *testing.T) { poolSize := 10 - p, err := ants.NewPool(poolSize, ants.WithMaxBlockingTasks(1)) + p, err := NewPool(poolSize, WithMaxBlockingTasks(1)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } @@ -473,7 +471,7 @@ func TestMaxBlockingSubmit(t *testing.T) { }() time.Sleep(1 * time.Second) // already reached max blocking limit - if err := p.Submit(demoFunc); err != ants.ErrPoolOverload { + if err := p.Submit(demoFunc); err != ErrPoolOverload { t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload") } // interrupt f to make blocking submit successful. @@ -488,7 +486,7 @@ func TestMaxBlockingSubmit(t *testing.T) { func TestNonblockingSubmitWithFunc(t *testing.T) { poolSize := 10 - p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithNonblocking(true)) + p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithNonblocking(true)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } @@ -503,7 +501,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) { if err := p.Invoke(ch); err != nil { t.Fatalf("nonblocking submit when pool is not full shouldn't return error") } - if err := p.Invoke(nil); err == nil || err != ants.ErrPoolOverload { + if err := p.Invoke(nil); err == nil || err != ErrPoolOverload { t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload") } // interrupt f to get an available worker @@ -516,7 +514,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) { func TestMaxBlockingSubmitWithFunc(t *testing.T) { poolSize := 10 - p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithMaxBlockingTasks(1)) + p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithMaxBlockingTasks(1)) if err != nil { t.Fatalf("create TimingPool failed: %s", err.Error()) } @@ -543,7 +541,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { }() time.Sleep(1 * time.Second) // already reached max blocking limit - if err := p.Invoke(Param); err != ants.ErrPoolOverload { + if err := p.Invoke(Param); err != ErrPoolOverload { t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload: %v", err) } // interrupt one func to make blocking submit successful. @@ -556,23 +554,23 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) { } } func TestRestCodeCoverage(t *testing.T) { - _, err := ants.NewPool(-1, ants.WithExpiryDuration(-1)) + _, err := NewPool(-1, WithExpiryDuration(-1)) t.Log(err) - _, err = ants.NewPool(1, ants.WithExpiryDuration(-1)) + _, err = NewPool(1, WithExpiryDuration(-1)) t.Log(err) - _, err = ants.NewPoolWithFunc(-1, demoPoolFunc, ants.WithExpiryDuration(-1)) + _, err = NewPoolWithFunc(-1, demoPoolFunc, WithExpiryDuration(-1)) t.Log(err) - _, err = ants.NewPoolWithFunc(1, demoPoolFunc, ants.WithExpiryDuration(-1)) + _, err = NewPoolWithFunc(1, demoPoolFunc, WithExpiryDuration(-1)) t.Log(err) - options := ants.Options{} + options := Options{} options.ExpiryDuration = time.Duration(10) * time.Second options.Nonblocking = true options.PreAlloc = true - poolOpts, _ := ants.NewPool(1, ants.WithOptions(options)) + poolOpts, _ := NewPool(1, WithOptions(options)) t.Logf("Pool with options, capacity: %d", poolOpts.Cap()) - p0, _ := ants.NewPool(TestSize) + p0, _ := NewPool(TestSize) defer func() { _ = p0.Submit(demoFunc) }() @@ -587,7 +585,7 @@ func TestRestCodeCoverage(t *testing.T) { p0.Tune(TestSize / 10) t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running()) - pprem, _ := ants.NewPool(TestSize, ants.WithPreAlloc(true)) + pprem, _ := NewPool(TestSize, WithPreAlloc(true)) defer func() { _ = pprem.Submit(demoFunc) }() @@ -602,7 +600,7 @@ func TestRestCodeCoverage(t *testing.T) { pprem.Tune(TestSize / 10) t.Logf("pre-malloc pool, after tuning capacity, capacity:%d, running:%d", pprem.Cap(), pprem.Running()) - p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc) + p, _ := NewPoolWithFunc(TestSize, demoPoolFunc) defer func() { _ = p.Invoke(Param) }() @@ -610,7 +608,7 @@ func TestRestCodeCoverage(t *testing.T) { for i := 0; i < n; i++ { _ = p.Invoke(Param) } - time.Sleep(ants.DefaultCleanIntervalTime) + time.Sleep(DefaultCleanIntervalTime) t.Logf("pool with func, capacity:%d", p.Cap()) t.Logf("pool with func, running workers number:%d", p.Running()) t.Logf("pool with func, free workers number:%d", p.Free()) @@ -618,7 +616,7 @@ func TestRestCodeCoverage(t *testing.T) { p.Tune(TestSize / 10) t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running()) - ppremWithFunc, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true)) + ppremWithFunc, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true)) defer func() { _ = ppremWithFunc.Invoke(Param) }() @@ -626,7 +624,7 @@ func TestRestCodeCoverage(t *testing.T) { for i := 0; i < n; i++ { _ = ppremWithFunc.Invoke(Param) } - time.Sleep(ants.DefaultCleanIntervalTime) + time.Sleep(DefaultCleanIntervalTime) t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap()) t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running()) t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free()) diff --git a/examples/main.go b/examples/main.go index 49aba1a..3e1d9e5 100644 --- a/examples/main.go +++ b/examples/main.go @@ -78,4 +78,7 @@ func main() { wg.Wait() fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Printf("finish all tasks, result is %d\n", sum) + if sum != 499500 { + panic("the final result is wrong!!!") + } } diff --git a/pool.go b/pool.go index fb2cc15..63b407c 100644 --- a/pool.go +++ b/pool.go @@ -42,7 +42,7 @@ type Pool struct { expiryDuration time.Duration // workers is a slice that store the available workers. - workers workerQueue + workers workerArray // release is used to notice the pool to closed itself. release int32 @@ -88,15 +88,15 @@ func (p *Pool) periodicallyPurge() { } p.lock.Lock() - stream := p.workers.releaseExpiry(p.expiryDuration) + expiredWorkers := p.workers.findOutExpiry(p.expiryDuration) p.lock.Unlock() // Notify obsolete workers to stop. // This notification must be outside the p.lock, since w.task // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. - for w := range stream { - w.task <- nil + for i := range expiredWorkers { + expiredWorkers[i].task <- nil } // There might be a situation that all workers have been cleaned up(no any worker is running) @@ -142,9 +142,9 @@ func NewPool(size int, options ...Option) (*Pool, error) { }, } if opts.PreAlloc { - p.workers = newQueue(loopQueueType, size) + p.workers = newWorkerArray(loopQueueType, size) } else { - p.workers = newQueue(stackType, 0) + p.workers = newWorkerArray(stackType, 0) } p.cond = sync.NewCond(p.lock) @@ -198,7 +198,7 @@ func (p *Pool) Release() { p.once.Do(func() { atomic.StoreInt32(&p.release, 1) p.lock.Lock() - p.workers.releaseAll() + p.workers.release() p.lock.Unlock() }) } @@ -225,7 +225,7 @@ func (p *Pool) retrieveWorker() *goWorker { p.lock.Lock() - w = p.workers.dequeue() + w = p.workers.detach() if w != nil { p.lock.Unlock() } else if p.Running() < p.Cap() { @@ -250,7 +250,7 @@ func (p *Pool) retrieveWorker() *goWorker { return w } - w = p.workers.dequeue() + w = p.workers.detach() if w == nil { goto Reentry } @@ -268,7 +268,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool { worker.recycleTime = time.Now() p.lock.Lock() - err := p.workers.enqueue(worker) + err := p.workers.insert(worker) if err != nil { return false } diff --git a/worker_array.go b/worker_array.go new file mode 100644 index 0000000..a882fe0 --- /dev/null +++ b/worker_array.go @@ -0,0 +1,41 @@ +package ants + +import ( + "errors" + "time" +) + +var ( + // ErrQueueIsFull ... + ErrQueueIsFull = errors.New("the queue is full") + + // ErrQueueLengthIsZero ... + ErrQueueLengthIsZero = errors.New("the queue length is zero") +) + +type workerArray interface { + len() int + isEmpty() bool + insert(worker *goWorker) error + detach() *goWorker + findOutExpiry(duration time.Duration) []*goWorker + release() +} + +type arrayType int + +const ( + stackType arrayType = 1 << iota + loopQueueType +) + +func newWorkerArray(aType arrayType, size int) workerArray { + switch aType { + case stackType: + return newWorkerStack(size) + case loopQueueType: + return newWorkerLoopQueue(size) + default: + return newWorkerStack(size) + } +} diff --git a/worker_loop_queue.go b/worker_loop_queue.go index 508da28..625861d 100644 --- a/worker_loop_queue.go +++ b/worker_loop_queue.go @@ -10,14 +10,13 @@ type loopQueue struct { remainder int } -func newLoopQueue(size int) *loopQueue { +func newWorkerLoopQueue(size int) *loopQueue { if size <= 0 { return nil } wq := loopQueue{ items: make([]*goWorker, size+1), - expiry: make([]*goWorker, 0), head: 0, tail: 0, remainder: size + 1, @@ -34,32 +33,26 @@ func (wq *loopQueue) len() int { return (wq.tail - wq.head + wq.remainder) % wq.remainder } -func (wq *loopQueue) cap() int { - if wq.remainder == 0 { - return 0 - } - return wq.remainder - 1 -} - func (wq *loopQueue) isEmpty() bool { return wq.tail == wq.head } -func (wq *loopQueue) enqueue(worker *goWorker) error { +func (wq *loopQueue) insert(worker *goWorker) error { if wq.remainder == 0 { return ErrQueueLengthIsZero } - if (wq.tail+1)%wq.remainder == wq.head { + next := (wq.tail + 1) % wq.remainder + if next == wq.head { return ErrQueueIsFull } wq.items[wq.tail] = worker - wq.tail = (wq.tail + 1) % wq.remainder + wq.tail = next return nil } -func (wq *loopQueue) dequeue() *goWorker { +func (wq *loopQueue) detach() *goWorker { if wq.len() == 0 { return nil } @@ -70,58 +63,25 @@ func (wq *loopQueue) dequeue() *goWorker { return w } -func (wq *loopQueue) releaseExpiry(duration time.Duration) chan *goWorker { - stream := make(chan *goWorker) - +func (wq *loopQueue) findOutExpiry(duration time.Duration) []*goWorker { if wq.len() == 0 { - close(stream) - return stream + return nil } wq.expiry = wq.expiry[:0] expiryTime := time.Now().Add(-duration) for wq.head != wq.tail { - if expiryTime.After(wq.items[wq.head].recycleTime) { - wq.expiry = append(wq.expiry, wq.items[wq.head]) - wq.head = (wq.head + 1) % wq.remainder - continue + if expiryTime.Before(wq.items[wq.head].recycleTime) { + break } - break + wq.expiry = append(wq.expiry, wq.items[wq.head]) + wq.head = (wq.head + 1) % wq.remainder } - - go func() { - defer close(stream) - - for i := 0; i < len(wq.expiry); i++ { - stream <- wq.expiry[i] - } - }() - - return stream + return wq.expiry } -//func (wq *LoopQueue)search(compareTime time.Time, l, r int) int { -// if l == r { -// if wq.items[l].recycleTime.After(compareTime) { -// return -1 -// } else { -// return l -// } -// } -// -// c := cap(wq.items) -// mid := ((r-l+c)/2 + l) % c -// if mid == l { -// return wq.search(compareTime, l, l) -// } else if wq.items[mid].recycleTime.After(compareTime) { -// return wq.search(compareTime, l, mid-1) -// } else { -// return wq.search(compareTime, mid+1, r) -// } -//} - -func (wq *loopQueue) releaseAll() { +func (wq *loopQueue) release() { if wq.len() == 0 { return } diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go index ded180c..db62658 100644 --- a/worker_loop_queue_test.go +++ b/worker_loop_queue_test.go @@ -7,30 +7,26 @@ import ( func TestNewLoopQueue(t *testing.T) { size := 100 - q := newLoopQueue(size) + q := newWorkerLoopQueue(size) if q.len() != 0 { t.Fatalf("Len error") } - if q.cap() != size { - t.Fatalf("Cap error") - } - if !q.isEmpty() { t.Fatalf("IsEmpty error") } - if q.dequeue() != nil { + if q.detach() != nil { t.Fatalf("Dequeue error") } } func TestLoopQueue(t *testing.T) { size := 10 - q := newLoopQueue(size) + q := newWorkerLoopQueue(size) for i := 0; i < 5; i++ { - err := q.enqueue(&goWorker{recycleTime: time.Now()}) + err := q.insert(&goWorker{recycleTime: time.Now()}) if err != nil { break } @@ -40,7 +36,7 @@ func TestLoopQueue(t *testing.T) { t.Fatalf("Len error") } - v := q.dequeue() + v := q.detach() t.Log(v) if q.len() != 4 { @@ -50,7 +46,7 @@ func TestLoopQueue(t *testing.T) { time.Sleep(time.Second) for i := 0; i < 6; i++ { - err := q.enqueue(&goWorker{recycleTime: time.Now()}) + err := q.insert(&goWorker{recycleTime: time.Now()}) if err != nil { break } @@ -60,12 +56,12 @@ func TestLoopQueue(t *testing.T) { t.Fatalf("Len error") } - err := q.enqueue(&goWorker{recycleTime: time.Now()}) + err := q.insert(&goWorker{recycleTime: time.Now()}) if err == nil { t.Fatalf("Enqueue error") } - q.releaseExpiry(time.Second) + q.findOutExpiry(time.Second) if q.len() != 6 { t.Fatalf("Len error: %d", q.len()) diff --git a/worker_queue.go b/worker_queue.go deleted file mode 100644 index 2c189d3..0000000 --- a/worker_queue.go +++ /dev/null @@ -1,39 +0,0 @@ -package ants - -import ( - "errors" - "time" -) - -var ( - ErrQueueIsFull = errors.New("the queue is full") - ErrQueueLengthIsZero = errors.New("the queue length is zero") -) - -type workerQueue interface { - len() int - cap() int - isEmpty() bool - enqueue(worker *goWorker) error - dequeue() *goWorker - releaseExpiry(duration time.Duration) chan *goWorker - releaseAll() -} - -type queueType int - -const ( - stackType queueType = 1 << iota - loopQueueType -) - -func newQueue(qType queueType, size int) workerQueue { - switch qType { - case stackType: - return newWorkerStack(size) - case loopQueueType: - return newLoopQueue(size) - default: - return newWorkerStack(size) - } -} diff --git a/worker_stack.go b/worker_stack.go index 5364816..bdf710e 100644 --- a/worker_stack.go +++ b/worker_stack.go @@ -5,6 +5,7 @@ import "time" type workerStack struct { items []*goWorker expiry []*goWorker + size int } func newWorkerStack(size int) *workerStack { @@ -13,8 +14,8 @@ func newWorkerStack(size int) *workerStack { } wq := workerStack{ - items: make([]*goWorker, 0, size), - expiry: make([]*goWorker, 0), + items: make([]*goWorker, 0, size), + size: size, } return &wq } @@ -23,20 +24,16 @@ func (wq *workerStack) len() int { return len(wq.items) } -func (wq *workerStack) cap() int { - return cap(wq.items) -} - func (wq *workerStack) isEmpty() bool { return len(wq.items) == 0 } -func (wq *workerStack) enqueue(worker *goWorker) error { +func (wq *workerStack) insert(worker *goWorker) error { wq.items = append(wq.items, worker) return nil } -func (wq *workerStack) dequeue() *goWorker { +func (wq *workerStack) detach() *goWorker { l := wq.len() if l == 0 { return nil @@ -48,13 +45,10 @@ func (wq *workerStack) dequeue() *goWorker { return w } -func (wq *workerStack) releaseExpiry(duration time.Duration) chan *goWorker { - stream := make(chan *goWorker) - +func (wq *workerStack) findOutExpiry(duration time.Duration) []*goWorker { n := wq.len() if n == 0 { - close(stream) - return stream + return nil } expiryTime := time.Now().Add(-duration) @@ -66,16 +60,7 @@ func (wq *workerStack) releaseExpiry(duration time.Duration) chan *goWorker { m := copy(wq.items, wq.items[index+1:]) wq.items = wq.items[:m] } - - go func() { - defer close(stream) - - for i := 0; i < len(wq.expiry); i++ { - stream <- wq.expiry[i] - } - }() - - return stream + return wq.expiry } func (wq *workerStack) search(l, r int, expiryTime time.Time) int { @@ -91,7 +76,7 @@ func (wq *workerStack) search(l, r int, expiryTime time.Time) int { return r } -func (wq *workerStack) releaseAll() { +func (wq *workerStack) release() { for i := 0; i < wq.len(); i++ { wq.items[i].task <- nil } diff --git a/worker_stack_test.go b/worker_stack_test.go index 5e6da62..695eba9 100644 --- a/worker_stack_test.go +++ b/worker_stack_test.go @@ -12,15 +12,11 @@ func TestNewWorkerStack(t *testing.T) { t.Fatalf("Len error") } - if q.cap() != size { - t.Fatalf("Cap error") - } - if !q.isEmpty() { t.Fatalf("IsEmpty error") } - if q.dequeue() != nil { + if q.detach() != nil { t.Fatalf("Dequeue error") } } @@ -29,7 +25,7 @@ func TestWorkerStack(t *testing.T) { q := newWorkerStack(0) for i := 0; i < 5; i++ { - err := q.enqueue(&goWorker{recycleTime: time.Now()}) + err := q.insert(&goWorker{recycleTime: time.Now()}) if err != nil { break } @@ -40,7 +36,7 @@ func TestWorkerStack(t *testing.T) { expired := time.Now() - err := q.enqueue(&goWorker{recycleTime: expired}) + err := q.insert(&goWorker{recycleTime: expired}) if err != nil { t.Fatalf("Enqueue error") } @@ -48,7 +44,7 @@ func TestWorkerStack(t *testing.T) { time.Sleep(time.Second) for i := 0; i < 6; i++ { - err := q.enqueue(&goWorker{recycleTime: time.Now()}) + err := q.insert(&goWorker{recycleTime: time.Now()}) if err != nil { t.Fatalf("Enqueue error") } @@ -58,7 +54,7 @@ func TestWorkerStack(t *testing.T) { t.Fatalf("Len error") } - q.releaseExpiry(time.Second) + q.findOutExpiry(time.Second) if q.len() != 6 { t.Fatalf("Len error") @@ -71,7 +67,7 @@ func TestSearch(t *testing.T) { // 1 expiry1 := time.Now() - _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + _ = q.insert(&goWorker{recycleTime: time.Now()}) index := q.search(0, q.len()-1, time.Now()) if index != 0 { @@ -85,7 +81,7 @@ func TestSearch(t *testing.T) { // 2 expiry2 := time.Now() - _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + _ = q.insert(&goWorker{recycleTime: time.Now()}) index = q.search(0, q.len()-1, expiry1) if index != -1 { @@ -104,15 +100,15 @@ func TestSearch(t *testing.T) { // more for i := 0; i < 5; i++ { - _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + _ = q.insert(&goWorker{recycleTime: time.Now()}) } expiry3 := time.Now() - _ = q.enqueue(&goWorker{recycleTime: expiry3}) + _ = q.insert(&goWorker{recycleTime: expiry3}) for i := 0; i < 10; i++ { - _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + _ = q.insert(&goWorker{recycleTime: time.Now()}) } index = q.search(0, q.len()-1, expiry3)