diff --git a/pool.go b/pool.go index 3f52a86..fb2cc15 100644 --- a/pool.go +++ b/pool.go @@ -42,7 +42,7 @@ type Pool struct { expiryDuration time.Duration // workers is a slice that store the available workers. - workers []*goWorker + workers workerQueue // release is used to notice the pool to closed itself. release int32 @@ -82,35 +82,21 @@ func (p *Pool) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() - var expiredWorkers []*goWorker for range heartbeat.C { if atomic.LoadInt32(&p.release) == CLOSED { break } - currentTime := time.Now() + p.lock.Lock() - idleWorkers := p.workers - n := len(idleWorkers) - var i int - for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ { - } - expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) - if i > 0 { - m := copy(idleWorkers, idleWorkers[i:]) - for i = m; i < n; i++ { - idleWorkers[i] = nil - } - p.workers = idleWorkers[:m] - } + stream := p.workers.releaseExpiry(p.expiryDuration) p.lock.Unlock() // Notify obsolete workers to stop. // This notification must be outside the p.lock, since w.task // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. - for i, w := range expiredWorkers { + for w := range stream { w.task <- nil - expiredWorkers[i] = nil } // There might be a situation that all workers have been cleaned up(no any worker is running) @@ -156,8 +142,11 @@ func NewPool(size int, options ...Option) (*Pool, error) { }, } if opts.PreAlloc { - p.workers = make([]*goWorker, 0, size) + p.workers = newQueue(loopQueueType, size) + } else { + p.workers = newQueue(stackType, 0) } + p.cond = sync.NewCond(p.lock) // Start a goroutine to clean up expired workers periodically. @@ -166,7 +155,7 @@ func NewPool(size int, options ...Option) (*Pool, error) { return p, nil } -//--------------------------------------------------------------------------- +// --------------------------------------------------------------------------- // Submit submits a task to this pool. func (p *Pool) Submit(task func()) error { @@ -209,17 +198,12 @@ func (p *Pool) Release() { p.once.Do(func() { atomic.StoreInt32(&p.release, 1) p.lock.Lock() - idleWorkers := p.workers - for i, w := range idleWorkers { - w.task <- nil - idleWorkers[i] = nil - } - p.workers = nil + p.workers.releaseAll() p.lock.Unlock() }) } -//--------------------------------------------------------------------------- +// --------------------------------------------------------------------------- // incRunning increases the number of the currently running goroutines. func (p *Pool) incRunning() { @@ -240,12 +224,9 @@ func (p *Pool) retrieveWorker() *goWorker { } p.lock.Lock() - idleWorkers := p.workers - n := len(idleWorkers) - 1 - if n >= 0 { - w = idleWorkers[n] - idleWorkers[n] = nil - p.workers = idleWorkers[:n] + + w = p.workers.dequeue() + if w != nil { p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() @@ -268,13 +249,12 @@ func (p *Pool) retrieveWorker() *goWorker { spawnWorker() return w } - l := len(p.workers) - 1 - if l < 0 { + + w = p.workers.dequeue() + if w == nil { goto Reentry } - w = p.workers[l] - p.workers[l] = nil - p.workers = p.workers[:l] + p.lock.Unlock() } return w @@ -287,7 +267,11 @@ func (p *Pool) revertWorker(worker *goWorker) bool { } worker.recycleTime = time.Now() p.lock.Lock() - p.workers = append(p.workers, worker) + + err := p.workers.enqueue(worker) + if err != nil { + return false + } // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() diff --git a/worker_loop_queue.go b/worker_loop_queue.go new file mode 100644 index 0000000..508da28 --- /dev/null +++ b/worker_loop_queue.go @@ -0,0 +1,137 @@ +package ants + +import "time" + +type loopQueue struct { + items []*goWorker + expiry []*goWorker + head int + tail int + remainder int +} + +func newLoopQueue(size int) *loopQueue { + if size <= 0 { + return nil + } + + wq := loopQueue{ + items: make([]*goWorker, size+1), + expiry: make([]*goWorker, 0), + head: 0, + tail: 0, + remainder: size + 1, + } + + return &wq +} + +func (wq *loopQueue) len() int { + if wq.remainder == 0 { + return 0 + } + + return (wq.tail - wq.head + wq.remainder) % wq.remainder +} + +func (wq *loopQueue) cap() int { + if wq.remainder == 0 { + return 0 + } + return wq.remainder - 1 +} + +func (wq *loopQueue) isEmpty() bool { + return wq.tail == wq.head +} + +func (wq *loopQueue) enqueue(worker *goWorker) error { + if wq.remainder == 0 { + return ErrQueueLengthIsZero + } + if (wq.tail+1)%wq.remainder == wq.head { + return ErrQueueIsFull + } + + wq.items[wq.tail] = worker + wq.tail = (wq.tail + 1) % wq.remainder + + return nil +} + +func (wq *loopQueue) dequeue() *goWorker { + if wq.len() == 0 { + return nil + } + + w := wq.items[wq.head] + wq.head = (wq.head + 1) % wq.remainder + + return w +} + +func (wq *loopQueue) releaseExpiry(duration time.Duration) chan *goWorker { + stream := make(chan *goWorker) + + if wq.len() == 0 { + close(stream) + return stream + } + + wq.expiry = wq.expiry[:0] + expiryTime := time.Now().Add(-duration) + + for wq.head != wq.tail { + if expiryTime.After(wq.items[wq.head].recycleTime) { + wq.expiry = append(wq.expiry, wq.items[wq.head]) + wq.head = (wq.head + 1) % wq.remainder + continue + } + break + } + + go func() { + defer close(stream) + + for i := 0; i < len(wq.expiry); i++ { + stream <- wq.expiry[i] + } + }() + + return stream +} + +//func (wq *LoopQueue)search(compareTime time.Time, l, r int) int { +// if l == r { +// if wq.items[l].recycleTime.After(compareTime) { +// return -1 +// } else { +// return l +// } +// } +// +// c := cap(wq.items) +// mid := ((r-l+c)/2 + l) % c +// if mid == l { +// return wq.search(compareTime, l, l) +// } else if wq.items[mid].recycleTime.After(compareTime) { +// return wq.search(compareTime, l, mid-1) +// } else { +// return wq.search(compareTime, mid+1, r) +// } +//} + +func (wq *loopQueue) releaseAll() { + if wq.len() == 0 { + return + } + + for wq.head != wq.tail { + wq.items[wq.head].task <- nil + wq.head = (wq.head + 1) % wq.remainder + } + wq.items = wq.items[:0] + wq.remainder = 0 + wq.head = 0 + wq.tail = 0 +} diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go new file mode 100644 index 0000000..ded180c --- /dev/null +++ b/worker_loop_queue_test.go @@ -0,0 +1,73 @@ +package ants + +import ( + "testing" + "time" +) + +func TestNewLoopQueue(t *testing.T) { + size := 100 + q := newLoopQueue(size) + if q.len() != 0 { + t.Fatalf("Len error") + } + + if q.cap() != size { + t.Fatalf("Cap error") + } + + if !q.isEmpty() { + t.Fatalf("IsEmpty error") + } + + if q.dequeue() != nil { + t.Fatalf("Dequeue error") + } +} + +func TestLoopQueue(t *testing.T) { + size := 10 + q := newLoopQueue(size) + + for i := 0; i < 5; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + break + } + } + + if q.len() != 5 { + t.Fatalf("Len error") + } + + v := q.dequeue() + t.Log(v) + + if q.len() != 4 { + t.Fatalf("Len error") + } + + time.Sleep(time.Second) + + for i := 0; i < 6; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + break + } + } + + if q.len() != 10 { + t.Fatalf("Len error") + } + + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err == nil { + t.Fatalf("Enqueue error") + } + + q.releaseExpiry(time.Second) + + if q.len() != 6 { + t.Fatalf("Len error: %d", q.len()) + } +} diff --git a/worker_queue.go b/worker_queue.go new file mode 100644 index 0000000..2c189d3 --- /dev/null +++ b/worker_queue.go @@ -0,0 +1,39 @@ +package ants + +import ( + "errors" + "time" +) + +var ( + ErrQueueIsFull = errors.New("the queue is full") + ErrQueueLengthIsZero = errors.New("the queue length is zero") +) + +type workerQueue interface { + len() int + cap() int + isEmpty() bool + enqueue(worker *goWorker) error + dequeue() *goWorker + releaseExpiry(duration time.Duration) chan *goWorker + releaseAll() +} + +type queueType int + +const ( + stackType queueType = 1 << iota + loopQueueType +) + +func newQueue(qType queueType, size int) workerQueue { + switch qType { + case stackType: + return newWorkerStack(size) + case loopQueueType: + return newLoopQueue(size) + default: + return newWorkerStack(size) + } +} diff --git a/worker_stack.go b/worker_stack.go new file mode 100644 index 0000000..5364816 --- /dev/null +++ b/worker_stack.go @@ -0,0 +1,99 @@ +package ants + +import "time" + +type workerStack struct { + items []*goWorker + expiry []*goWorker +} + +func newWorkerStack(size int) *workerStack { + if size < 0 { + return nil + } + + wq := workerStack{ + items: make([]*goWorker, 0, size), + expiry: make([]*goWorker, 0), + } + return &wq +} + +func (wq *workerStack) len() int { + return len(wq.items) +} + +func (wq *workerStack) cap() int { + return cap(wq.items) +} + +func (wq *workerStack) isEmpty() bool { + return len(wq.items) == 0 +} + +func (wq *workerStack) enqueue(worker *goWorker) error { + wq.items = append(wq.items, worker) + return nil +} + +func (wq *workerStack) dequeue() *goWorker { + l := wq.len() + if l == 0 { + return nil + } + + w := wq.items[l-1] + wq.items = wq.items[:l-1] + + return w +} + +func (wq *workerStack) releaseExpiry(duration time.Duration) chan *goWorker { + stream := make(chan *goWorker) + + n := wq.len() + if n == 0 { + close(stream) + return stream + } + + expiryTime := time.Now().Add(-duration) + index := wq.search(0, n-1, expiryTime) + + wq.expiry = wq.expiry[:0] + if index != -1 { + wq.expiry = append(wq.expiry, wq.items[:index+1]...) + m := copy(wq.items, wq.items[index+1:]) + wq.items = wq.items[:m] + } + + go func() { + defer close(stream) + + for i := 0; i < len(wq.expiry); i++ { + stream <- wq.expiry[i] + } + }() + + return stream +} + +func (wq *workerStack) search(l, r int, expiryTime time.Time) int { + var mid int + for l <= r { + mid = (l + r) / 2 + if expiryTime.Before(wq.items[mid].recycleTime) { + r = mid - 1 + } else { + l = mid + 1 + } + } + return r +} + +func (wq *workerStack) releaseAll() { + for i := 0; i < wq.len(); i++ { + wq.items[i].task <- nil + } + wq.items = wq.items[:0] +} diff --git a/worker_stack_test.go b/worker_stack_test.go new file mode 100644 index 0000000..5e6da62 --- /dev/null +++ b/worker_stack_test.go @@ -0,0 +1,122 @@ +package ants + +import ( + "testing" + "time" +) + +func TestNewWorkerStack(t *testing.T) { + size := 100 + q := newWorkerStack(size) + if q.len() != 0 { + t.Fatalf("Len error") + } + + if q.cap() != size { + t.Fatalf("Cap error") + } + + if !q.isEmpty() { + t.Fatalf("IsEmpty error") + } + + if q.dequeue() != nil { + t.Fatalf("Dequeue error") + } +} + +func TestWorkerStack(t *testing.T) { + q := newWorkerStack(0) + + for i := 0; i < 5; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + break + } + } + if q.len() != 5 { + t.Fatalf("Len error") + } + + expired := time.Now() + + err := q.enqueue(&goWorker{recycleTime: expired}) + if err != nil { + t.Fatalf("Enqueue error") + } + + time.Sleep(time.Second) + + for i := 0; i < 6; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + t.Fatalf("Enqueue error") + } + } + + if q.len() != 12 { + t.Fatalf("Len error") + } + + q.releaseExpiry(time.Second) + + if q.len() != 6 { + t.Fatalf("Len error") + } +} + +func TestSearch(t *testing.T) { + q := newWorkerStack(0) + + // 1 + expiry1 := time.Now() + + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + + index := q.search(0, q.len()-1, time.Now()) + if index != 0 { + t.Fatalf("should is 0") + } + + index = q.search(0, q.len()-1, expiry1) + if index != -1 { + t.Fatalf("should is -1") + } + + // 2 + expiry2 := time.Now() + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + + index = q.search(0, q.len()-1, expiry1) + if index != -1 { + t.Fatalf("should is -1") + } + + index = q.search(0, q.len()-1, expiry2) + if index != 0 { + t.Fatalf("should is 0") + } + + index = q.search(0, q.len()-1, time.Now()) + if index != 1 { + t.Fatalf("should is 1") + } + + // more + for i := 0; i < 5; i++ { + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + } + + expiry3 := time.Now() + + _ = q.enqueue(&goWorker{recycleTime: expiry3}) + + for i := 0; i < 10; i++ { + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + } + + index = q.search(0, q.len()-1, expiry3) + if index != 7 { + t.Fatalf("should is 7") + } +}