Refactoring to the interface and implementations of worker-array

This commit is contained in:
Andy Pan 2019-10-10 03:02:04 +08:00
parent f0e23928f4
commit 566511ec5f
10 changed files with 171 additions and 233 deletions

View File

@ -20,26 +20,24 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants_test
package ants
import (
"runtime"
"sync"
"testing"
"time"
"github.com/panjf2000/ants/v2"
)
const (
RunTimes = 1000000
benchParam = 10
benchAntsSize = 200000
RunTimes = 1000000
BenchParam = 10
BenchAntsSize = 200000
DefaultExpiredTime = 10 * time.Second
)
func demoFunc() {
n := 10
time.Sleep(time.Duration(n) * time.Millisecond)
time.Sleep(time.Duration(BenchParam) * time.Millisecond)
}
func demoPoolFunc(args interface{}) {
@ -63,13 +61,13 @@ func longRunningPoolFunc(arg interface{}) {
}
}
func BenchmarkGoroutineWithFunc(b *testing.B) {
func BenchmarkGoroutines(b *testing.B) {
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
go func() {
demoPoolFunc(benchParam)
demoFunc()
wg.Done()
}()
}
@ -77,16 +75,16 @@ func BenchmarkGoroutineWithFunc(b *testing.B) {
}
}
func BenchmarkSemaphoreWithFunc(b *testing.B) {
func BenchmarkSemaphore(b *testing.B) {
var wg sync.WaitGroup
sema := make(chan struct{}, benchAntsSize)
sema := make(chan struct{}, BenchAntsSize)
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
sema <- struct{}{}
go func() {
demoPoolFunc(benchParam)
demoFunc()
<-sema
wg.Done()
}()
@ -95,40 +93,40 @@ func BenchmarkSemaphoreWithFunc(b *testing.B) {
}
}
func BenchmarkAntsPoolWithFunc(b *testing.B) {
func BenchmarkAntsPool(b *testing.B) {
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(benchAntsSize, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
})
p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime))
defer p.Release()
b.StartTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
_ = p.Invoke(benchParam)
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
}
b.StopTimer()
}
func BenchmarkGoroutineThroughput(b *testing.B) {
func BenchmarkGoroutinesThroughput(b *testing.B) {
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
go demoPoolFunc(benchParam)
go demoFunc()
}
}
}
func BenchmarkSemaphoreThroughput(b *testing.B) {
sema := make(chan struct{}, benchAntsSize)
sema := make(chan struct{}, BenchAntsSize)
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
sema <- struct{}{}
go func() {
demoPoolFunc(benchParam)
demoFunc()
<-sema
}()
}
@ -136,12 +134,12 @@ func BenchmarkSemaphoreThroughput(b *testing.B) {
}
func BenchmarkAntsPoolThroughput(b *testing.B) {
p, _ := ants.NewPoolWithFunc(benchAntsSize, demoPoolFunc)
p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime))
defer p.Release()
b.StartTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
_ = p.Invoke(benchParam)
_ = p.Submit(demoFunc)
}
}
b.StopTimer()

View File

@ -20,7 +20,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package ants_test
package ants
import (
"runtime"
@ -28,8 +28,6 @@ import (
"sync/atomic"
"testing"
"time"
"github.com/panjf2000/ants/v2"
)
const (
@ -56,7 +54,7 @@ var curMem uint64
// TestAntsPoolWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
p, _ := ants.NewPool(AntsSize)
p, _ := NewPool(AntsSize)
defer p.Release()
for i := 0; i < n; i++ {
@ -76,7 +74,7 @@ func TestAntsPoolWaitToGetWorker(t *testing.T) {
func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
p, _ := ants.NewPool(AntsSize, ants.WithPreAlloc(true))
p, _ := NewPool(AntsSize, WithPreAlloc(true))
defer p.Release()
for i := 0; i < n; i++ {
@ -97,7 +95,7 @@ func TestAntsPoolWaitToGetWorkerPreMalloc(t *testing.T) {
// TestAntsPoolWithFuncWaitToGetWorker is used to test waiting to get worker.
func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(AntsSize, func(i interface{}) {
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
})
@ -117,10 +115,10 @@ func TestAntsPoolWithFuncWaitToGetWorker(t *testing.T) {
func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(AntsSize, func(i interface{}) {
p, _ := NewPoolWithFunc(AntsSize, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
}, ants.WithPreAlloc(true))
}, WithPreAlloc(true))
defer p.Release()
for i := 0; i < n; i++ {
@ -137,13 +135,13 @@ func TestAntsPoolWithFuncWaitToGetWorkerPreMalloc(t *testing.T) {
// TestAntsPoolGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolGetWorkerFromCache(t *testing.T) {
p, _ := ants.NewPool(TestSize)
p, _ := NewPool(TestSize)
defer p.Release()
for i := 0; i < AntsSize; i++ {
_ = p.Submit(demoFunc)
}
time.Sleep(2 * ants.DefaultCleanIntervalTime)
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
@ -155,13 +153,13 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) {
// TestAntsPoolWithFuncGetWorkerFromCache is used to test getting worker from sync.Pool.
func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
dur := 10
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc)
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc)
defer p.Release()
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
time.Sleep(2 * ants.DefaultCleanIntervalTime)
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
@ -172,13 +170,13 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
func TestAntsPoolWithFuncGetWorkerFromCachePreMalloc(t *testing.T) {
dur := 10
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true))
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true))
defer p.Release()
for i := 0; i < AntsSize; i++ {
_ = p.Invoke(dur)
}
time.Sleep(2 * ants.DefaultCleanIntervalTime)
time.Sleep(2 * DefaultCleanIntervalTime)
_ = p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
@ -208,20 +206,20 @@ func TestNoPool(t *testing.T) {
}
func TestAntsPool(t *testing.T) {
defer ants.Release()
defer Release()
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
_ = ants.Submit(func() {
_ = Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
t.Logf("pool, capacity:%d", ants.Cap())
t.Logf("pool, running workers number:%d", ants.Running())
t.Logf("pool, free workers number:%d", ants.Free())
t.Logf("pool, capacity:%d", Cap())
t.Logf("pool, running workers number:%d", Running())
t.Logf("pool, free workers number:%d", Free())
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
@ -235,7 +233,7 @@ func TestAntsPool(t *testing.T) {
func TestPanicHandler(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
p0, err := ants.NewPool(10, ants.WithPanicHandler(func(p interface{}) {
p0, err := NewPool(10, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
@ -257,7 +255,7 @@ func TestPanicHandler(t *testing.T) {
t.Errorf("pool should be empty after panic")
}
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) {
p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
@ -280,7 +278,7 @@ func TestPanicHandler(t *testing.T) {
func TestPanicHandlerPreMalloc(t *testing.T) {
var panicCounter int64
var wg sync.WaitGroup
p0, err := ants.NewPool(10, ants.WithPreAlloc(true), ants.WithPanicHandler(func(p interface{}) {
p0, err := NewPool(10, WithPreAlloc(true), WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
t.Logf("catch panic with PanicHandler: %v", p)
@ -302,7 +300,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
t.Errorf("pool should be empty after panic")
}
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) { panic(p) }, ants.WithPanicHandler(func(p interface{}) {
p1, err := NewPoolWithFunc(10, func(p interface{}) { panic(p) }, WithPanicHandler(func(p interface{}) {
defer wg.Done()
atomic.AddInt64(&panicCounter, 1)
}))
@ -323,7 +321,7 @@ func TestPanicHandlerPreMalloc(t *testing.T) {
}
func TestPoolPanicWithoutHandler(t *testing.T) {
p0, err := ants.NewPool(10)
p0, err := NewPool(10)
if err != nil {
t.Fatalf("create new pool failed: %s", err.Error())
}
@ -332,7 +330,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
panic("Oops!")
})
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) {
p1, err := NewPoolWithFunc(10, func(p interface{}) {
panic(p)
})
if err != nil {
@ -343,7 +341,7 @@ func TestPoolPanicWithoutHandler(t *testing.T) {
}
func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
p0, err := ants.NewPool(10, ants.WithPreAlloc(true))
p0, err := NewPool(10, WithPreAlloc(true))
if err != nil {
t.Fatalf("create new pool failed: %s", err.Error())
}
@ -352,7 +350,7 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
panic("Oops!")
})
p1, err := ants.NewPoolWithFunc(10, func(p interface{}) {
p1, err := NewPoolWithFunc(10, func(p interface{}) {
panic(p)
})
if err != nil {
@ -363,46 +361,46 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
}
func TestPurge(t *testing.T) {
p, err := ants.NewPool(10)
p, err := NewPool(10)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
time.Sleep(3 * DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
p1, err := ants.NewPoolWithFunc(10, demoPoolFunc)
p1, err := NewPoolWithFunc(10, demoPoolFunc)
if err != nil {
t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error())
}
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
time.Sleep(3 * DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
}
func TestPurgePreMalloc(t *testing.T) {
p, err := ants.NewPool(10, ants.WithPreAlloc(true))
p, err := NewPool(10, WithPreAlloc(true))
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
time.Sleep(3 * DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
p1, err := ants.NewPoolWithFunc(10, demoPoolFunc)
p1, err := NewPoolWithFunc(10, demoPoolFunc)
if err != nil {
t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error())
}
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * ants.DefaultCleanIntervalTime)
time.Sleep(3 * DefaultCleanIntervalTime)
if p.Running() != 0 {
t.Error("all p should be purged")
}
@ -410,7 +408,7 @@ func TestPurgePreMalloc(t *testing.T) {
func TestNonblockingSubmit(t *testing.T) {
poolSize := 10
p, err := ants.NewPool(poolSize, ants.WithNonblocking(true))
p, err := NewPool(poolSize, WithNonblocking(true))
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
@ -430,7 +428,7 @@ func TestNonblockingSubmit(t *testing.T) {
if err := p.Submit(f); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
if err := p.Submit(demoFunc); err == nil || err != ants.ErrPoolOverload {
if err := p.Submit(demoFunc); err == nil || err != ErrPoolOverload {
t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload")
}
// interrupt f to get an available worker
@ -443,7 +441,7 @@ func TestNonblockingSubmit(t *testing.T) {
func TestMaxBlockingSubmit(t *testing.T) {
poolSize := 10
p, err := ants.NewPool(poolSize, ants.WithMaxBlockingTasks(1))
p, err := NewPool(poolSize, WithMaxBlockingTasks(1))
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
@ -473,7 +471,7 @@ func TestMaxBlockingSubmit(t *testing.T) {
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
if err := p.Submit(demoFunc); err != ants.ErrPoolOverload {
if err := p.Submit(demoFunc); err != ErrPoolOverload {
t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload")
}
// interrupt f to make blocking submit successful.
@ -488,7 +486,7 @@ func TestMaxBlockingSubmit(t *testing.T) {
func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithNonblocking(true))
p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithNonblocking(true))
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
@ -503,7 +501,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) {
if err := p.Invoke(ch); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
if err := p.Invoke(nil); err == nil || err != ants.ErrPoolOverload {
if err := p.Invoke(nil); err == nil || err != ErrPoolOverload {
t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload")
}
// interrupt f to get an available worker
@ -516,7 +514,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) {
func TestMaxBlockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc, ants.WithMaxBlockingTasks(1))
p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithMaxBlockingTasks(1))
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
@ -543,7 +541,7 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
if err := p.Invoke(Param); err != ants.ErrPoolOverload {
if err := p.Invoke(Param); err != ErrPoolOverload {
t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload: %v", err)
}
// interrupt one func to make blocking submit successful.
@ -556,23 +554,23 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
}
}
func TestRestCodeCoverage(t *testing.T) {
_, err := ants.NewPool(-1, ants.WithExpiryDuration(-1))
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)
_, err = ants.NewPool(1, ants.WithExpiryDuration(-1))
_, err = NewPool(1, WithExpiryDuration(-1))
t.Log(err)
_, err = ants.NewPoolWithFunc(-1, demoPoolFunc, ants.WithExpiryDuration(-1))
_, err = NewPoolWithFunc(-1, demoPoolFunc, WithExpiryDuration(-1))
t.Log(err)
_, err = ants.NewPoolWithFunc(1, demoPoolFunc, ants.WithExpiryDuration(-1))
_, err = NewPoolWithFunc(1, demoPoolFunc, WithExpiryDuration(-1))
t.Log(err)
options := ants.Options{}
options := Options{}
options.ExpiryDuration = time.Duration(10) * time.Second
options.Nonblocking = true
options.PreAlloc = true
poolOpts, _ := ants.NewPool(1, ants.WithOptions(options))
poolOpts, _ := NewPool(1, WithOptions(options))
t.Logf("Pool with options, capacity: %d", poolOpts.Cap())
p0, _ := ants.NewPool(TestSize)
p0, _ := NewPool(TestSize)
defer func() {
_ = p0.Submit(demoFunc)
}()
@ -587,7 +585,7 @@ func TestRestCodeCoverage(t *testing.T) {
p0.Tune(TestSize / 10)
t.Logf("pool, after tuning capacity, capacity:%d, running:%d", p0.Cap(), p0.Running())
pprem, _ := ants.NewPool(TestSize, ants.WithPreAlloc(true))
pprem, _ := NewPool(TestSize, WithPreAlloc(true))
defer func() {
_ = pprem.Submit(demoFunc)
}()
@ -602,7 +600,7 @@ func TestRestCodeCoverage(t *testing.T) {
pprem.Tune(TestSize / 10)
t.Logf("pre-malloc pool, after tuning capacity, capacity:%d, running:%d", pprem.Cap(), pprem.Running())
p, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc)
p, _ := NewPoolWithFunc(TestSize, demoPoolFunc)
defer func() {
_ = p.Invoke(Param)
}()
@ -610,7 +608,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
_ = p.Invoke(Param)
}
time.Sleep(ants.DefaultCleanIntervalTime)
time.Sleep(DefaultCleanIntervalTime)
t.Logf("pool with func, capacity:%d", p.Cap())
t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free())
@ -618,7 +616,7 @@ func TestRestCodeCoverage(t *testing.T) {
p.Tune(TestSize / 10)
t.Logf("pool with func, after tuning capacity, capacity:%d, running:%d", p.Cap(), p.Running())
ppremWithFunc, _ := ants.NewPoolWithFunc(TestSize, demoPoolFunc, ants.WithPreAlloc(true))
ppremWithFunc, _ := NewPoolWithFunc(TestSize, demoPoolFunc, WithPreAlloc(true))
defer func() {
_ = ppremWithFunc.Invoke(Param)
}()
@ -626,7 +624,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
_ = ppremWithFunc.Invoke(Param)
}
time.Sleep(ants.DefaultCleanIntervalTime)
time.Sleep(DefaultCleanIntervalTime)
t.Logf("pre-malloc pool with func, capacity:%d", ppremWithFunc.Cap())
t.Logf("pre-malloc pool with func, running workers number:%d", ppremWithFunc.Running())
t.Logf("pre-malloc pool with func, free workers number:%d", ppremWithFunc.Free())

View File

@ -78,4 +78,7 @@ func main() {
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
}

20
pool.go
View File

@ -42,7 +42,7 @@ type Pool struct {
expiryDuration time.Duration
// workers is a slice that store the available workers.
workers workerQueue
workers workerArray
// release is used to notice the pool to closed itself.
release int32
@ -88,15 +88,15 @@ func (p *Pool) periodicallyPurge() {
}
p.lock.Lock()
stream := p.workers.releaseExpiry(p.expiryDuration)
expiredWorkers := p.workers.findOutExpiry(p.expiryDuration)
p.lock.Unlock()
// Notify obsolete workers to stop.
// This notification must be outside the p.lock, since w.task
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
for w := range stream {
w.task <- nil
for i := range expiredWorkers {
expiredWorkers[i].task <- nil
}
// There might be a situation that all workers have been cleaned up(no any worker is running)
@ -142,9 +142,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
},
}
if opts.PreAlloc {
p.workers = newQueue(loopQueueType, size)
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newQueue(stackType, 0)
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
@ -198,7 +198,7 @@ func (p *Pool) Release() {
p.once.Do(func() {
atomic.StoreInt32(&p.release, 1)
p.lock.Lock()
p.workers.releaseAll()
p.workers.release()
p.lock.Unlock()
})
}
@ -225,7 +225,7 @@ func (p *Pool) retrieveWorker() *goWorker {
p.lock.Lock()
w = p.workers.dequeue()
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
} else if p.Running() < p.Cap() {
@ -250,7 +250,7 @@ func (p *Pool) retrieveWorker() *goWorker {
return w
}
w = p.workers.dequeue()
w = p.workers.detach()
if w == nil {
goto Reentry
}
@ -268,7 +268,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool {
worker.recycleTime = time.Now()
p.lock.Lock()
err := p.workers.enqueue(worker)
err := p.workers.insert(worker)
if err != nil {
return false
}

41
worker_array.go Normal file
View File

@ -0,0 +1,41 @@
package ants
import (
"errors"
"time"
)
var (
// ErrQueueIsFull ...
ErrQueueIsFull = errors.New("the queue is full")
// ErrQueueLengthIsZero ...
ErrQueueLengthIsZero = errors.New("the queue length is zero")
)
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
findOutExpiry(duration time.Duration) []*goWorker
release()
}
type arrayType int
const (
stackType arrayType = 1 << iota
loopQueueType
)
func newWorkerArray(aType arrayType, size int) workerArray {
switch aType {
case stackType:
return newWorkerStack(size)
case loopQueueType:
return newWorkerLoopQueue(size)
default:
return newWorkerStack(size)
}
}

View File

@ -10,14 +10,13 @@ type loopQueue struct {
remainder int
}
func newLoopQueue(size int) *loopQueue {
func newWorkerLoopQueue(size int) *loopQueue {
if size <= 0 {
return nil
}
wq := loopQueue{
items: make([]*goWorker, size+1),
expiry: make([]*goWorker, 0),
head: 0,
tail: 0,
remainder: size + 1,
@ -34,32 +33,26 @@ func (wq *loopQueue) len() int {
return (wq.tail - wq.head + wq.remainder) % wq.remainder
}
func (wq *loopQueue) cap() int {
if wq.remainder == 0 {
return 0
}
return wq.remainder - 1
}
func (wq *loopQueue) isEmpty() bool {
return wq.tail == wq.head
}
func (wq *loopQueue) enqueue(worker *goWorker) error {
func (wq *loopQueue) insert(worker *goWorker) error {
if wq.remainder == 0 {
return ErrQueueLengthIsZero
}
if (wq.tail+1)%wq.remainder == wq.head {
next := (wq.tail + 1) % wq.remainder
if next == wq.head {
return ErrQueueIsFull
}
wq.items[wq.tail] = worker
wq.tail = (wq.tail + 1) % wq.remainder
wq.tail = next
return nil
}
func (wq *loopQueue) dequeue() *goWorker {
func (wq *loopQueue) detach() *goWorker {
if wq.len() == 0 {
return nil
}
@ -70,58 +63,25 @@ func (wq *loopQueue) dequeue() *goWorker {
return w
}
func (wq *loopQueue) releaseExpiry(duration time.Duration) chan *goWorker {
stream := make(chan *goWorker)
func (wq *loopQueue) findOutExpiry(duration time.Duration) []*goWorker {
if wq.len() == 0 {
close(stream)
return stream
return nil
}
wq.expiry = wq.expiry[:0]
expiryTime := time.Now().Add(-duration)
for wq.head != wq.tail {
if expiryTime.After(wq.items[wq.head].recycleTime) {
wq.expiry = append(wq.expiry, wq.items[wq.head])
wq.head = (wq.head + 1) % wq.remainder
continue
if expiryTime.Before(wq.items[wq.head].recycleTime) {
break
}
break
wq.expiry = append(wq.expiry, wq.items[wq.head])
wq.head = (wq.head + 1) % wq.remainder
}
go func() {
defer close(stream)
for i := 0; i < len(wq.expiry); i++ {
stream <- wq.expiry[i]
}
}()
return stream
return wq.expiry
}
//func (wq *LoopQueue)search(compareTime time.Time, l, r int) int {
// if l == r {
// if wq.items[l].recycleTime.After(compareTime) {
// return -1
// } else {
// return l
// }
// }
//
// c := cap(wq.items)
// mid := ((r-l+c)/2 + l) % c
// if mid == l {
// return wq.search(compareTime, l, l)
// } else if wq.items[mid].recycleTime.After(compareTime) {
// return wq.search(compareTime, l, mid-1)
// } else {
// return wq.search(compareTime, mid+1, r)
// }
//}
func (wq *loopQueue) releaseAll() {
func (wq *loopQueue) release() {
if wq.len() == 0 {
return
}

View File

@ -7,30 +7,26 @@ import (
func TestNewLoopQueue(t *testing.T) {
size := 100
q := newLoopQueue(size)
q := newWorkerLoopQueue(size)
if q.len() != 0 {
t.Fatalf("Len error")
}
if q.cap() != size {
t.Fatalf("Cap error")
}
if !q.isEmpty() {
t.Fatalf("IsEmpty error")
}
if q.dequeue() != nil {
if q.detach() != nil {
t.Fatalf("Dequeue error")
}
}
func TestLoopQueue(t *testing.T) {
size := 10
q := newLoopQueue(size)
q := newWorkerLoopQueue(size)
for i := 0; i < 5; i++ {
err := q.enqueue(&goWorker{recycleTime: time.Now()})
err := q.insert(&goWorker{recycleTime: time.Now()})
if err != nil {
break
}
@ -40,7 +36,7 @@ func TestLoopQueue(t *testing.T) {
t.Fatalf("Len error")
}
v := q.dequeue()
v := q.detach()
t.Log(v)
if q.len() != 4 {
@ -50,7 +46,7 @@ func TestLoopQueue(t *testing.T) {
time.Sleep(time.Second)
for i := 0; i < 6; i++ {
err := q.enqueue(&goWorker{recycleTime: time.Now()})
err := q.insert(&goWorker{recycleTime: time.Now()})
if err != nil {
break
}
@ -60,12 +56,12 @@ func TestLoopQueue(t *testing.T) {
t.Fatalf("Len error")
}
err := q.enqueue(&goWorker{recycleTime: time.Now()})
err := q.insert(&goWorker{recycleTime: time.Now()})
if err == nil {
t.Fatalf("Enqueue error")
}
q.releaseExpiry(time.Second)
q.findOutExpiry(time.Second)
if q.len() != 6 {
t.Fatalf("Len error: %d", q.len())

View File

@ -1,39 +0,0 @@
package ants
import (
"errors"
"time"
)
var (
ErrQueueIsFull = errors.New("the queue is full")
ErrQueueLengthIsZero = errors.New("the queue length is zero")
)
type workerQueue interface {
len() int
cap() int
isEmpty() bool
enqueue(worker *goWorker) error
dequeue() *goWorker
releaseExpiry(duration time.Duration) chan *goWorker
releaseAll()
}
type queueType int
const (
stackType queueType = 1 << iota
loopQueueType
)
func newQueue(qType queueType, size int) workerQueue {
switch qType {
case stackType:
return newWorkerStack(size)
case loopQueueType:
return newLoopQueue(size)
default:
return newWorkerStack(size)
}
}

View File

@ -5,6 +5,7 @@ import "time"
type workerStack struct {
items []*goWorker
expiry []*goWorker
size int
}
func newWorkerStack(size int) *workerStack {
@ -13,8 +14,8 @@ func newWorkerStack(size int) *workerStack {
}
wq := workerStack{
items: make([]*goWorker, 0, size),
expiry: make([]*goWorker, 0),
items: make([]*goWorker, 0, size),
size: size,
}
return &wq
}
@ -23,20 +24,16 @@ func (wq *workerStack) len() int {
return len(wq.items)
}
func (wq *workerStack) cap() int {
return cap(wq.items)
}
func (wq *workerStack) isEmpty() bool {
return len(wq.items) == 0
}
func (wq *workerStack) enqueue(worker *goWorker) error {
func (wq *workerStack) insert(worker *goWorker) error {
wq.items = append(wq.items, worker)
return nil
}
func (wq *workerStack) dequeue() *goWorker {
func (wq *workerStack) detach() *goWorker {
l := wq.len()
if l == 0 {
return nil
@ -48,13 +45,10 @@ func (wq *workerStack) dequeue() *goWorker {
return w
}
func (wq *workerStack) releaseExpiry(duration time.Duration) chan *goWorker {
stream := make(chan *goWorker)
func (wq *workerStack) findOutExpiry(duration time.Duration) []*goWorker {
n := wq.len()
if n == 0 {
close(stream)
return stream
return nil
}
expiryTime := time.Now().Add(-duration)
@ -66,16 +60,7 @@ func (wq *workerStack) releaseExpiry(duration time.Duration) chan *goWorker {
m := copy(wq.items, wq.items[index+1:])
wq.items = wq.items[:m]
}
go func() {
defer close(stream)
for i := 0; i < len(wq.expiry); i++ {
stream <- wq.expiry[i]
}
}()
return stream
return wq.expiry
}
func (wq *workerStack) search(l, r int, expiryTime time.Time) int {
@ -91,7 +76,7 @@ func (wq *workerStack) search(l, r int, expiryTime time.Time) int {
return r
}
func (wq *workerStack) releaseAll() {
func (wq *workerStack) release() {
for i := 0; i < wq.len(); i++ {
wq.items[i].task <- nil
}

View File

@ -12,15 +12,11 @@ func TestNewWorkerStack(t *testing.T) {
t.Fatalf("Len error")
}
if q.cap() != size {
t.Fatalf("Cap error")
}
if !q.isEmpty() {
t.Fatalf("IsEmpty error")
}
if q.dequeue() != nil {
if q.detach() != nil {
t.Fatalf("Dequeue error")
}
}
@ -29,7 +25,7 @@ func TestWorkerStack(t *testing.T) {
q := newWorkerStack(0)
for i := 0; i < 5; i++ {
err := q.enqueue(&goWorker{recycleTime: time.Now()})
err := q.insert(&goWorker{recycleTime: time.Now()})
if err != nil {
break
}
@ -40,7 +36,7 @@ func TestWorkerStack(t *testing.T) {
expired := time.Now()
err := q.enqueue(&goWorker{recycleTime: expired})
err := q.insert(&goWorker{recycleTime: expired})
if err != nil {
t.Fatalf("Enqueue error")
}
@ -48,7 +44,7 @@ func TestWorkerStack(t *testing.T) {
time.Sleep(time.Second)
for i := 0; i < 6; i++ {
err := q.enqueue(&goWorker{recycleTime: time.Now()})
err := q.insert(&goWorker{recycleTime: time.Now()})
if err != nil {
t.Fatalf("Enqueue error")
}
@ -58,7 +54,7 @@ func TestWorkerStack(t *testing.T) {
t.Fatalf("Len error")
}
q.releaseExpiry(time.Second)
q.findOutExpiry(time.Second)
if q.len() != 6 {
t.Fatalf("Len error")
@ -71,7 +67,7 @@ func TestSearch(t *testing.T) {
// 1
expiry1 := time.Now()
_ = q.enqueue(&goWorker{recycleTime: time.Now()})
_ = q.insert(&goWorker{recycleTime: time.Now()})
index := q.search(0, q.len()-1, time.Now())
if index != 0 {
@ -85,7 +81,7 @@ func TestSearch(t *testing.T) {
// 2
expiry2 := time.Now()
_ = q.enqueue(&goWorker{recycleTime: time.Now()})
_ = q.insert(&goWorker{recycleTime: time.Now()})
index = q.search(0, q.len()-1, expiry1)
if index != -1 {
@ -104,15 +100,15 @@ func TestSearch(t *testing.T) {
// more
for i := 0; i < 5; i++ {
_ = q.enqueue(&goWorker{recycleTime: time.Now()})
_ = q.insert(&goWorker{recycleTime: time.Now()})
}
expiry3 := time.Now()
_ = q.enqueue(&goWorker{recycleTime: expiry3})
_ = q.insert(&goWorker{recycleTime: expiry3})
for i := 0; i < 10; i++ {
_ = q.enqueue(&goWorker{recycleTime: time.Now()})
_ = q.insert(&goWorker{recycleTime: time.Now()})
}
index = q.search(0, q.len()-1, expiry3)