diff --git a/worker_loop_queue.go b/worker_loop_queue.go index 2f4a79e..cc73934 100644 --- a/worker_loop_queue.go +++ b/worker_loop_queue.go @@ -79,29 +79,74 @@ func (wq *loopQueue) detach() *goWorker { } func (wq *loopQueue) retrieveExpiry(duration time.Duration) []*goWorker { - if wq.isEmpty() { + expiryTime := time.Now().Add(-duration) + index := wq.binarySearch(expiryTime) + if index == -1 { return nil } - wq.expiry = wq.expiry[:0] - expiryTime := time.Now().Add(-duration) - for !wq.isEmpty() { - if expiryTime.Before(wq.items[wq.head].recycleTime) { - break + if wq.head <= index { + wq.expiry = append(wq.expiry, wq.items[wq.head:index+1]...) + for i := wq.head; i < index+1; i++ { + wq.items[i] = nil } - wq.expiry = append(wq.expiry, wq.items[wq.head]) - wq.items[wq.head] = nil - wq.head++ - if wq.head == wq.size { - wq.head = 0 + } else { + wq.expiry = append(wq.expiry, wq.items[0:index+1]...) + wq.expiry = append(wq.expiry, wq.items[wq.head:]...) + for i := 0; i < index+1; i++ { + wq.items[i] = nil } + for i := wq.head; i < wq.size; i++ { + wq.items[i] = nil + } + } + head := (index + 1) % wq.size + wq.head = head + if len(wq.expiry) > 0 { wq.isFull = false } return wq.expiry } +func (wq *loopQueue) binarySearch(expiryTime time.Time) int { + var mid, nlen, basel, tmid int + nlen = len(wq.items) + + // if no need to remove work, return -1 + if wq.isEmpty() || expiryTime.Before(wq.items[wq.head].recycleTime) { + return -1 + } + + // example + // size = 8, head = 7, tail = 4 + // [ 2, 3, 4, 5, nil, nil, nil, 1] true position + // 0 1 2 3 4 5 6 7 + // tail head + // + // 1 2 3 4 nil nil nil 0 mapped position + // r l + + // base algorithm is a copy from worker_stack + // map head and tail to effective left and right + r := (wq.tail - 1 - wq.head + nlen) % nlen + basel = wq.head + l := 0 + for l <= r { + mid = l + ((r - l) >> 1) + // calculate true mid position from mapped mid position + tmid = (mid + basel + nlen) % nlen + if expiryTime.Before(wq.items[tmid].recycleTime) { + r = mid - 1 + } else { + l = mid + 1 + } + } + // return true position from mapped position + return (r + basel + nlen) % nlen +} + func (wq *loopQueue) reset() { if wq.isEmpty() { return diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go index b045c6c..f0c0b5f 100644 --- a/worker_loop_queue_test.go +++ b/worker_loop_queue_test.go @@ -1,3 +1,5 @@ +// +build !windows + package ants import ( @@ -46,3 +48,130 @@ func TestLoopQueue(t *testing.T) { q.retrieveExpiry(time.Second) assert.EqualValuesf(t, 6, q.len(), "Len error: %d", q.len()) } + +func TestRotatedArraySearch(t *testing.T) { + size := 10 + q := newWorkerLoopQueue(size) + + // 1 + expiry1 := time.Now() + + _ = q.insert(&goWorker{recycleTime: time.Now()}) + + assert.EqualValues(t, 0, q.binarySearch(time.Now()), "index should be 0") + assert.EqualValues(t, -1, q.binarySearch(expiry1), "index should be -1") + + // 2 + expiry2 := time.Now() + _ = q.insert(&goWorker{recycleTime: time.Now()}) + + assert.EqualValues(t, -1, q.binarySearch(expiry1), "index should be -1") + + assert.EqualValues(t, 0, q.binarySearch(expiry2), "index should be 0") + + assert.EqualValues(t, 1, q.binarySearch(time.Now()), "index should be 1") + + // more + for i := 0; i < 5; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + + expiry3 := time.Now() + _ = q.insert(&goWorker{recycleTime: expiry3}) + + var err error + for err != errQueueIsFull { + err = q.insert(&goWorker{recycleTime: time.Now()}) + } + + assert.EqualValues(t, 7, q.binarySearch(expiry3), "index should be 7") + + // rotate + for i := 0; i < 6; i++ { + _ = q.detach() + } + + expiry4 := time.Now() + _ = q.insert(&goWorker{recycleTime: expiry4}) + + for i := 0; i < 4; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + // head = 6, tail = 5, insert direction -> + // [expiry4, time, time, time, time, nil/tail, time/head, time, time, time] + assert.EqualValues(t, 0, q.binarySearch(expiry4), "index should be 0") + + for i := 0; i < 3; i++ { + _ = q.detach() + } + expiry5 := time.Now() + _ = q.insert(&goWorker{recycleTime: expiry5}) + + // head = 6, tail = 5, insert direction -> + // [expiry4, time, time, time, time, expiry5, nil/tail, nil, nil, time/head] + assert.EqualValues(t, 5, q.binarySearch(expiry5), "index should be 5") + + for i := 0; i < 3; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + // head = 9, tail = 9, insert direction -> + // [expiry4, time, time, time, time, expiry5, time, time, time, time/head/tail] + assert.EqualValues(t, -1, q.binarySearch(expiry2), "index should be -1") + + assert.EqualValues(t, 9, q.binarySearch(q.items[9].recycleTime), "index should be 9") + assert.EqualValues(t, 8, q.binarySearch(time.Now()), "index should be 8") +} + +func TestRetrieveExpiry(t *testing.T) { + size := 10 + q := newWorkerLoopQueue(size) + expirew := make([]*goWorker, 0) + u, _ := time.ParseDuration("1s") + + // test [ time+1s, time+1s, time+1s, time+1s, time+1s, time, time, time, time, time] + for i := 0; i < size/2; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + expirew = append(expirew, q.items[:size/2]...) + time.Sleep(u) + + for i := 0; i < size/2; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + workers := q.retrieveExpiry(u) + + assert.EqualValues(t, expirew, workers, "expired workers aren't right") + + // test [ time, time, time, time, time, time+1s, time+1s, time+1s, time+1s, time+1s] + time.Sleep(u) + + for i := 0; i < size/2; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + expirew = expirew[:0] + expirew = append(expirew, q.items[size/2:]...) + + workers2 := q.retrieveExpiry(u) + + assert.EqualValues(t, expirew, workers2, "expired workers aren't right") + + // test [ time+1s, time+1s, time+1s, nil, nil, time+1s, time+1s, time+1s, time+1s, time+1s] + for i := 0; i < size/2; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + for i := 0; i < size/2; i++ { + _ = q.detach() + } + for i := 0; i < 3; i++ { + _ = q.insert(&goWorker{recycleTime: time.Now()}) + } + time.Sleep(u) + + expirew = expirew[:0] + expirew = append(expirew, q.items[0:3]...) + expirew = append(expirew, q.items[size/2:]...) + + workers3 := q.retrieveExpiry(u) + + assert.EqualValues(t, expirew, workers3, "expired workers aren't right") +}