Optimize loop queue of workers

This commit is contained in:
Andy Pan 2019-10-20 18:22:13 +08:00
parent 52b301019a
commit 7aaa4349f5
6 changed files with 79 additions and 64 deletions

View File

@ -73,7 +73,7 @@ func (p *Pool) periodicallyPurge() {
} }
p.lock.Lock() p.lock.Lock()
expiredWorkers := p.workers.findOutExpiry(p.options.ExpiryDuration) expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
p.lock.Unlock() p.lock.Unlock()
// Notify obsolete workers to stop. // Notify obsolete workers to stop.
@ -180,7 +180,7 @@ func (p *Pool) Release() {
p.once.Do(func() { p.once.Do(func() {
atomic.StoreInt32(&p.release, 1) atomic.StoreInt32(&p.release, 1)
p.lock.Lock() p.lock.Lock()
p.workers.release() p.workers.reset()
p.lock.Unlock() p.lock.Unlock()
}) })
} }

View File

@ -6,11 +6,11 @@ import (
) )
var ( var (
// ErrQueueIsFull will be returned when the worker queue is full. // errQueueIsFull will be returned when the worker queue is full.
ErrQueueIsFull = errors.New("the queue is full") errQueueIsFull = errors.New("the queue is full")
// ErrQueueLengthIsZero will be returned when trying to insert item to a released worker queue. // errQueueIsReleased will be returned when trying to insert item to a released worker queue.
ErrQueueLengthIsZero = errors.New("the queue length is zero") errQueueIsReleased = errors.New("the queue length is zero")
) )
type workerArray interface { type workerArray interface {
@ -18,8 +18,8 @@ type workerArray interface {
isEmpty() bool isEmpty() bool
insert(worker *goWorker) error insert(worker *goWorker) error
detach() *goWorker detach() *goWorker
findOutExpiry(duration time.Duration) []*goWorker retrieveExpiry(duration time.Duration) []*goWorker
release() reset()
} }
type arrayType int type arrayType int

View File

@ -3,95 +3,115 @@ package ants
import "time" import "time"
type loopQueue struct { type loopQueue struct {
items []*goWorker items []*goWorker
expiry []*goWorker expiry []*goWorker
head int head int
tail int tail int
remainder int size int
isFull bool
} }
func newWorkerLoopQueue(size int) *loopQueue { func newWorkerLoopQueue(size int) *loopQueue {
if size <= 0 { return &loopQueue{
return nil items: make([]*goWorker, size),
size: size,
} }
wq := loopQueue{
items: make([]*goWorker, size+1),
head: 0,
tail: 0,
remainder: size + 1,
}
return &wq
} }
func (wq *loopQueue) len() int { func (wq *loopQueue) len() int {
if wq.remainder == 0 { if wq.size == 0 {
return 0 return 0
} }
return (wq.tail - wq.head + wq.remainder) % wq.remainder if wq.head == wq.tail {
if wq.isFull {
return wq.size
}
return 0
}
if wq.tail > wq.head {
return wq.tail - wq.head
}
return wq.size - wq.head + wq.tail
} }
func (wq *loopQueue) isEmpty() bool { func (wq *loopQueue) isEmpty() bool {
return wq.tail == wq.head return wq.head == wq.tail && !wq.isFull
} }
func (wq *loopQueue) insert(worker *goWorker) error { func (wq *loopQueue) insert(worker *goWorker) error {
if wq.remainder == 0 { if wq.size == 0 {
return ErrQueueLengthIsZero return errQueueIsReleased
}
next := (wq.tail + 1) % wq.remainder
if next == wq.head {
return ErrQueueIsFull
} }
if wq.isFull {
return errQueueIsFull
}
wq.items[wq.tail] = worker wq.items[wq.tail] = worker
wq.tail = next wq.tail++
if wq.tail == wq.size {
wq.tail = 0
}
if wq.tail == wq.head {
wq.isFull = true
}
return nil return nil
} }
func (wq *loopQueue) detach() *goWorker { func (wq *loopQueue) detach() *goWorker {
if wq.len() == 0 { if wq.isEmpty() {
return nil return nil
} }
w := wq.items[wq.head] w := wq.items[wq.head]
wq.head = (wq.head + 1) % wq.remainder wq.head++
if wq.head == wq.size {
wq.head = 0
}
wq.isFull = false
return w return w
} }
func (wq *loopQueue) findOutExpiry(duration time.Duration) []*goWorker { func (wq *loopQueue) retrieveExpiry(duration time.Duration) []*goWorker {
if wq.len() == 0 { if wq.isEmpty() {
return nil return nil
} }
wq.expiry = wq.expiry[:0] wq.expiry = wq.expiry[:0]
expiryTime := time.Now().Add(-duration) expiryTime := time.Now().Add(-duration)
for wq.head != wq.tail { for !wq.isEmpty() {
if expiryTime.Before(wq.items[wq.head].recycleTime) { if expiryTime.Before(wq.items[wq.head].recycleTime) {
break break
} }
wq.expiry = append(wq.expiry, wq.items[wq.head]) wq.expiry = append(wq.expiry, wq.items[wq.head])
wq.head = (wq.head + 1) % wq.remainder wq.head++
if wq.head == wq.size {
wq.head = 0
}
wq.isFull = false
} }
return wq.expiry return wq.expiry
} }
func (wq *loopQueue) release() { func (wq *loopQueue) reset() {
if wq.len() == 0 { if wq.isEmpty() {
return return
} }
for wq.head != wq.tail { Releasing:
wq.items[wq.head].task <- nil if w := wq.detach(); w != nil {
wq.head = (wq.head + 1) % wq.remainder w.task <- nil
goto Releasing
} }
wq.items = wq.items[:0] wq.items = wq.items[:0]
wq.remainder = 0 wq.size = 0
wq.head = 0 wq.head = 0
wq.tail = 0 wq.tail = 0
} }

View File

@ -61,7 +61,7 @@ func TestLoopQueue(t *testing.T) {
t.Fatalf("Enqueue error") t.Fatalf("Enqueue error")
} }
q.findOutExpiry(time.Second) q.retrieveExpiry(time.Second)
if q.len() != 6 { if q.len() != 6 {
t.Fatalf("Len error: %d", q.len()) t.Fatalf("Len error: %d", q.len())

View File

@ -9,15 +9,10 @@ type workerStack struct {
} }
func newWorkerStack(size int) *workerStack { func newWorkerStack(size int) *workerStack {
if size < 0 { return &workerStack{
return nil
}
wq := workerStack{
items: make([]*goWorker, 0, size), items: make([]*goWorker, 0, size),
size: size, size: size,
} }
return &wq
} }
func (wq *workerStack) len() int { func (wq *workerStack) len() int {
@ -45,14 +40,14 @@ func (wq *workerStack) detach() *goWorker {
return w return w
} }
func (wq *workerStack) findOutExpiry(duration time.Duration) []*goWorker { func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
n := wq.len() n := wq.len()
if n == 0 { if n == 0 {
return nil return nil
} }
expiryTime := time.Now().Add(-duration) expiryTime := time.Now().Add(-duration)
index := wq.search(0, n-1, expiryTime) index := wq.binarySearch(0, n-1, expiryTime)
wq.expiry = wq.expiry[:0] wq.expiry = wq.expiry[:0]
if index != -1 { if index != -1 {
@ -63,7 +58,7 @@ func (wq *workerStack) findOutExpiry(duration time.Duration) []*goWorker {
return wq.expiry return wq.expiry
} }
func (wq *workerStack) search(l, r int, expiryTime time.Time) int { func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
var mid int var mid int
for l <= r { for l <= r {
mid = (l + r) / 2 mid = (l + r) / 2
@ -76,7 +71,7 @@ func (wq *workerStack) search(l, r int, expiryTime time.Time) int {
return r return r
} }
func (wq *workerStack) release() { func (wq *workerStack) reset() {
for i := 0; i < wq.len(); i++ { for i := 0; i < wq.len(); i++ {
wq.items[i].task <- nil wq.items[i].task <- nil
} }

View File

@ -54,7 +54,7 @@ func TestWorkerStack(t *testing.T) {
t.Fatalf("Len error") t.Fatalf("Len error")
} }
q.findOutExpiry(time.Second) q.retrieveExpiry(time.Second)
if q.len() != 6 { if q.len() != 6 {
t.Fatalf("Len error") t.Fatalf("Len error")
@ -69,12 +69,12 @@ func TestSearch(t *testing.T) {
_ = q.insert(&goWorker{recycleTime: time.Now()}) _ = q.insert(&goWorker{recycleTime: time.Now()})
index := q.search(0, q.len()-1, time.Now()) index := q.binarySearch(0, q.len()-1, time.Now())
if index != 0 { if index != 0 {
t.Fatalf("should is 0") t.Fatalf("should is 0")
} }
index = q.search(0, q.len()-1, expiry1) index = q.binarySearch(0, q.len()-1, expiry1)
if index != -1 { if index != -1 {
t.Fatalf("should is -1") t.Fatalf("should is -1")
} }
@ -83,17 +83,17 @@ func TestSearch(t *testing.T) {
expiry2 := time.Now() expiry2 := time.Now()
_ = q.insert(&goWorker{recycleTime: time.Now()}) _ = q.insert(&goWorker{recycleTime: time.Now()})
index = q.search(0, q.len()-1, expiry1) index = q.binarySearch(0, q.len()-1, expiry1)
if index != -1 { if index != -1 {
t.Fatalf("should is -1") t.Fatalf("should is -1")
} }
index = q.search(0, q.len()-1, expiry2) index = q.binarySearch(0, q.len()-1, expiry2)
if index != 0 { if index != 0 {
t.Fatalf("should is 0") t.Fatalf("should is 0")
} }
index = q.search(0, q.len()-1, time.Now()) index = q.binarySearch(0, q.len()-1, time.Now())
if index != 1 { if index != 1 {
t.Fatalf("should is 1") t.Fatalf("should is 1")
} }
@ -111,7 +111,7 @@ func TestSearch(t *testing.T) {
_ = q.insert(&goWorker{recycleTime: time.Now()}) _ = q.insert(&goWorker{recycleTime: time.Now()})
} }
index = q.search(0, q.len()-1, expiry3) index = q.binarySearch(0, q.len()-1, expiry3)
if index != 7 { if index != 7 {
t.Fatalf("should is 7") t.Fatalf("should is 7")
} }