From f0e23928f4388661a8e0f6f5ff93f8f9514a1762 Mon Sep 17 00:00:00 2001 From: Kevin Bai Date: Thu, 10 Oct 2019 00:59:19 +0800 Subject: [PATCH] add loop queue (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add loop queue * add loop queue * fix the bugs add loop queue move the worker queue to directory 按照新的接口实现 lifo 队列 添加新接口的环形队列实现 rename the slice queue 修复了 unlock 使用 queue 管理 goWorkerWithFunc 使用 dequeue 判断队列 add remainder 增加测试文件 循环队列需要一个空闲位 * remove interface{} * Refine the logic of sync.Pool * Add flowcharts of ants into READMEs * Add the installation about ants v2 * Renew the functional options in READMEs * Renew English and Chinese flowcharts * rename package name 移动 worker queue 位置 worker queue 都修改为私有接口 考虑到性能问题,把 interface{} 改回到 *goworker * 修改 releaseExpiry 和 releaseAll * remove files * fix some bug --- pool.go | 62 +++++++---------- worker_loop_queue.go | 137 ++++++++++++++++++++++++++++++++++++++ worker_loop_queue_test.go | 73 ++++++++++++++++++++ worker_queue.go | 39 +++++++++++ worker_stack.go | 99 +++++++++++++++++++++++++++ worker_stack_test.go | 122 +++++++++++++++++++++++++++++++++ 6 files changed, 493 insertions(+), 39 deletions(-) create mode 100644 worker_loop_queue.go create mode 100644 worker_loop_queue_test.go create mode 100644 worker_queue.go create mode 100644 worker_stack.go create mode 100644 worker_stack_test.go diff --git a/pool.go b/pool.go index 3f52a86..fb2cc15 100644 --- a/pool.go +++ b/pool.go @@ -42,7 +42,7 @@ type Pool struct { expiryDuration time.Duration // workers is a slice that store the available workers. - workers []*goWorker + workers workerQueue // release is used to notice the pool to closed itself. release int32 @@ -82,35 +82,21 @@ func (p *Pool) periodicallyPurge() { heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() - var expiredWorkers []*goWorker for range heartbeat.C { if atomic.LoadInt32(&p.release) == CLOSED { break } - currentTime := time.Now() + p.lock.Lock() - idleWorkers := p.workers - n := len(idleWorkers) - var i int - for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ { - } - expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) - if i > 0 { - m := copy(idleWorkers, idleWorkers[i:]) - for i = m; i < n; i++ { - idleWorkers[i] = nil - } - p.workers = idleWorkers[:m] - } + stream := p.workers.releaseExpiry(p.expiryDuration) p.lock.Unlock() // Notify obsolete workers to stop. // This notification must be outside the p.lock, since w.task // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. - for i, w := range expiredWorkers { + for w := range stream { w.task <- nil - expiredWorkers[i] = nil } // There might be a situation that all workers have been cleaned up(no any worker is running) @@ -156,8 +142,11 @@ func NewPool(size int, options ...Option) (*Pool, error) { }, } if opts.PreAlloc { - p.workers = make([]*goWorker, 0, size) + p.workers = newQueue(loopQueueType, size) + } else { + p.workers = newQueue(stackType, 0) } + p.cond = sync.NewCond(p.lock) // Start a goroutine to clean up expired workers periodically. @@ -166,7 +155,7 @@ func NewPool(size int, options ...Option) (*Pool, error) { return p, nil } -//--------------------------------------------------------------------------- +// --------------------------------------------------------------------------- // Submit submits a task to this pool. func (p *Pool) Submit(task func()) error { @@ -209,17 +198,12 @@ func (p *Pool) Release() { p.once.Do(func() { atomic.StoreInt32(&p.release, 1) p.lock.Lock() - idleWorkers := p.workers - for i, w := range idleWorkers { - w.task <- nil - idleWorkers[i] = nil - } - p.workers = nil + p.workers.releaseAll() p.lock.Unlock() }) } -//--------------------------------------------------------------------------- +// --------------------------------------------------------------------------- // incRunning increases the number of the currently running goroutines. func (p *Pool) incRunning() { @@ -240,12 +224,9 @@ func (p *Pool) retrieveWorker() *goWorker { } p.lock.Lock() - idleWorkers := p.workers - n := len(idleWorkers) - 1 - if n >= 0 { - w = idleWorkers[n] - idleWorkers[n] = nil - p.workers = idleWorkers[:n] + + w = p.workers.dequeue() + if w != nil { p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() @@ -268,13 +249,12 @@ func (p *Pool) retrieveWorker() *goWorker { spawnWorker() return w } - l := len(p.workers) - 1 - if l < 0 { + + w = p.workers.dequeue() + if w == nil { goto Reentry } - w = p.workers[l] - p.workers[l] = nil - p.workers = p.workers[:l] + p.lock.Unlock() } return w @@ -287,7 +267,11 @@ func (p *Pool) revertWorker(worker *goWorker) bool { } worker.recycleTime = time.Now() p.lock.Lock() - p.workers = append(p.workers, worker) + + err := p.workers.enqueue(worker) + if err != nil { + return false + } // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() diff --git a/worker_loop_queue.go b/worker_loop_queue.go new file mode 100644 index 0000000..508da28 --- /dev/null +++ b/worker_loop_queue.go @@ -0,0 +1,137 @@ +package ants + +import "time" + +type loopQueue struct { + items []*goWorker + expiry []*goWorker + head int + tail int + remainder int +} + +func newLoopQueue(size int) *loopQueue { + if size <= 0 { + return nil + } + + wq := loopQueue{ + items: make([]*goWorker, size+1), + expiry: make([]*goWorker, 0), + head: 0, + tail: 0, + remainder: size + 1, + } + + return &wq +} + +func (wq *loopQueue) len() int { + if wq.remainder == 0 { + return 0 + } + + return (wq.tail - wq.head + wq.remainder) % wq.remainder +} + +func (wq *loopQueue) cap() int { + if wq.remainder == 0 { + return 0 + } + return wq.remainder - 1 +} + +func (wq *loopQueue) isEmpty() bool { + return wq.tail == wq.head +} + +func (wq *loopQueue) enqueue(worker *goWorker) error { + if wq.remainder == 0 { + return ErrQueueLengthIsZero + } + if (wq.tail+1)%wq.remainder == wq.head { + return ErrQueueIsFull + } + + wq.items[wq.tail] = worker + wq.tail = (wq.tail + 1) % wq.remainder + + return nil +} + +func (wq *loopQueue) dequeue() *goWorker { + if wq.len() == 0 { + return nil + } + + w := wq.items[wq.head] + wq.head = (wq.head + 1) % wq.remainder + + return w +} + +func (wq *loopQueue) releaseExpiry(duration time.Duration) chan *goWorker { + stream := make(chan *goWorker) + + if wq.len() == 0 { + close(stream) + return stream + } + + wq.expiry = wq.expiry[:0] + expiryTime := time.Now().Add(-duration) + + for wq.head != wq.tail { + if expiryTime.After(wq.items[wq.head].recycleTime) { + wq.expiry = append(wq.expiry, wq.items[wq.head]) + wq.head = (wq.head + 1) % wq.remainder + continue + } + break + } + + go func() { + defer close(stream) + + for i := 0; i < len(wq.expiry); i++ { + stream <- wq.expiry[i] + } + }() + + return stream +} + +//func (wq *LoopQueue)search(compareTime time.Time, l, r int) int { +// if l == r { +// if wq.items[l].recycleTime.After(compareTime) { +// return -1 +// } else { +// return l +// } +// } +// +// c := cap(wq.items) +// mid := ((r-l+c)/2 + l) % c +// if mid == l { +// return wq.search(compareTime, l, l) +// } else if wq.items[mid].recycleTime.After(compareTime) { +// return wq.search(compareTime, l, mid-1) +// } else { +// return wq.search(compareTime, mid+1, r) +// } +//} + +func (wq *loopQueue) releaseAll() { + if wq.len() == 0 { + return + } + + for wq.head != wq.tail { + wq.items[wq.head].task <- nil + wq.head = (wq.head + 1) % wq.remainder + } + wq.items = wq.items[:0] + wq.remainder = 0 + wq.head = 0 + wq.tail = 0 +} diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go new file mode 100644 index 0000000..ded180c --- /dev/null +++ b/worker_loop_queue_test.go @@ -0,0 +1,73 @@ +package ants + +import ( + "testing" + "time" +) + +func TestNewLoopQueue(t *testing.T) { + size := 100 + q := newLoopQueue(size) + if q.len() != 0 { + t.Fatalf("Len error") + } + + if q.cap() != size { + t.Fatalf("Cap error") + } + + if !q.isEmpty() { + t.Fatalf("IsEmpty error") + } + + if q.dequeue() != nil { + t.Fatalf("Dequeue error") + } +} + +func TestLoopQueue(t *testing.T) { + size := 10 + q := newLoopQueue(size) + + for i := 0; i < 5; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + break + } + } + + if q.len() != 5 { + t.Fatalf("Len error") + } + + v := q.dequeue() + t.Log(v) + + if q.len() != 4 { + t.Fatalf("Len error") + } + + time.Sleep(time.Second) + + for i := 0; i < 6; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + break + } + } + + if q.len() != 10 { + t.Fatalf("Len error") + } + + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err == nil { + t.Fatalf("Enqueue error") + } + + q.releaseExpiry(time.Second) + + if q.len() != 6 { + t.Fatalf("Len error: %d", q.len()) + } +} diff --git a/worker_queue.go b/worker_queue.go new file mode 100644 index 0000000..2c189d3 --- /dev/null +++ b/worker_queue.go @@ -0,0 +1,39 @@ +package ants + +import ( + "errors" + "time" +) + +var ( + ErrQueueIsFull = errors.New("the queue is full") + ErrQueueLengthIsZero = errors.New("the queue length is zero") +) + +type workerQueue interface { + len() int + cap() int + isEmpty() bool + enqueue(worker *goWorker) error + dequeue() *goWorker + releaseExpiry(duration time.Duration) chan *goWorker + releaseAll() +} + +type queueType int + +const ( + stackType queueType = 1 << iota + loopQueueType +) + +func newQueue(qType queueType, size int) workerQueue { + switch qType { + case stackType: + return newWorkerStack(size) + case loopQueueType: + return newLoopQueue(size) + default: + return newWorkerStack(size) + } +} diff --git a/worker_stack.go b/worker_stack.go new file mode 100644 index 0000000..5364816 --- /dev/null +++ b/worker_stack.go @@ -0,0 +1,99 @@ +package ants + +import "time" + +type workerStack struct { + items []*goWorker + expiry []*goWorker +} + +func newWorkerStack(size int) *workerStack { + if size < 0 { + return nil + } + + wq := workerStack{ + items: make([]*goWorker, 0, size), + expiry: make([]*goWorker, 0), + } + return &wq +} + +func (wq *workerStack) len() int { + return len(wq.items) +} + +func (wq *workerStack) cap() int { + return cap(wq.items) +} + +func (wq *workerStack) isEmpty() bool { + return len(wq.items) == 0 +} + +func (wq *workerStack) enqueue(worker *goWorker) error { + wq.items = append(wq.items, worker) + return nil +} + +func (wq *workerStack) dequeue() *goWorker { + l := wq.len() + if l == 0 { + return nil + } + + w := wq.items[l-1] + wq.items = wq.items[:l-1] + + return w +} + +func (wq *workerStack) releaseExpiry(duration time.Duration) chan *goWorker { + stream := make(chan *goWorker) + + n := wq.len() + if n == 0 { + close(stream) + return stream + } + + expiryTime := time.Now().Add(-duration) + index := wq.search(0, n-1, expiryTime) + + wq.expiry = wq.expiry[:0] + if index != -1 { + wq.expiry = append(wq.expiry, wq.items[:index+1]...) + m := copy(wq.items, wq.items[index+1:]) + wq.items = wq.items[:m] + } + + go func() { + defer close(stream) + + for i := 0; i < len(wq.expiry); i++ { + stream <- wq.expiry[i] + } + }() + + return stream +} + +func (wq *workerStack) search(l, r int, expiryTime time.Time) int { + var mid int + for l <= r { + mid = (l + r) / 2 + if expiryTime.Before(wq.items[mid].recycleTime) { + r = mid - 1 + } else { + l = mid + 1 + } + } + return r +} + +func (wq *workerStack) releaseAll() { + for i := 0; i < wq.len(); i++ { + wq.items[i].task <- nil + } + wq.items = wq.items[:0] +} diff --git a/worker_stack_test.go b/worker_stack_test.go new file mode 100644 index 0000000..5e6da62 --- /dev/null +++ b/worker_stack_test.go @@ -0,0 +1,122 @@ +package ants + +import ( + "testing" + "time" +) + +func TestNewWorkerStack(t *testing.T) { + size := 100 + q := newWorkerStack(size) + if q.len() != 0 { + t.Fatalf("Len error") + } + + if q.cap() != size { + t.Fatalf("Cap error") + } + + if !q.isEmpty() { + t.Fatalf("IsEmpty error") + } + + if q.dequeue() != nil { + t.Fatalf("Dequeue error") + } +} + +func TestWorkerStack(t *testing.T) { + q := newWorkerStack(0) + + for i := 0; i < 5; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + break + } + } + if q.len() != 5 { + t.Fatalf("Len error") + } + + expired := time.Now() + + err := q.enqueue(&goWorker{recycleTime: expired}) + if err != nil { + t.Fatalf("Enqueue error") + } + + time.Sleep(time.Second) + + for i := 0; i < 6; i++ { + err := q.enqueue(&goWorker{recycleTime: time.Now()}) + if err != nil { + t.Fatalf("Enqueue error") + } + } + + if q.len() != 12 { + t.Fatalf("Len error") + } + + q.releaseExpiry(time.Second) + + if q.len() != 6 { + t.Fatalf("Len error") + } +} + +func TestSearch(t *testing.T) { + q := newWorkerStack(0) + + // 1 + expiry1 := time.Now() + + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + + index := q.search(0, q.len()-1, time.Now()) + if index != 0 { + t.Fatalf("should is 0") + } + + index = q.search(0, q.len()-1, expiry1) + if index != -1 { + t.Fatalf("should is -1") + } + + // 2 + expiry2 := time.Now() + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + + index = q.search(0, q.len()-1, expiry1) + if index != -1 { + t.Fatalf("should is -1") + } + + index = q.search(0, q.len()-1, expiry2) + if index != 0 { + t.Fatalf("should is 0") + } + + index = q.search(0, q.len()-1, time.Now()) + if index != 1 { + t.Fatalf("should is 1") + } + + // more + for i := 0; i < 5; i++ { + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + } + + expiry3 := time.Now() + + _ = q.enqueue(&goWorker{recycleTime: expiry3}) + + for i := 0; i < 10; i++ { + _ = q.enqueue(&goWorker{recycleTime: time.Now()}) + } + + index = q.search(0, q.len()-1, expiry3) + if index != 7 { + t.Fatalf("should is 7") + } +}