expire package

This commit is contained in:
tidwall 2018-08-13 17:05:20 -07:00
parent 4e8a7ccfcd
commit 3aa394219d
2 changed files with 213 additions and 0 deletions

115
pkg/expire/expire.go Normal file
View File

@ -0,0 +1,115 @@
package expire
import (
"sync"
"time"
)
// Item is a something that can expire
type Item interface {
Expires() time.Time
}
// List of expireable items
type List struct {
mu sync.Mutex
queue queue
bgrun bool
arr []Item
Expired func(item Item)
}
// Push an item onto the queue
func (list *List) Push(item Item) {
unix := item.Expires().UnixNano()
list.mu.Lock()
if !list.bgrun {
list.bgrun = true
go list.bg()
}
list.queue.push(unix, item)
list.mu.Unlock()
}
func (list *List) bg() {
unix := time.Now().UnixNano()
for {
list.mu.Lock()
if list.queue.len == 0 {
list.bgrun = false
list.mu.Unlock()
break
}
if unix > list.queue.peek().unix {
n := list.queue.pop()
list.mu.Unlock()
if list.Expired != nil {
list.Expired(n.item)
}
} else {
list.mu.Unlock()
time.Sleep(time.Second / 10)
unix = time.Now().UnixNano()
}
}
}
type qnode struct {
unix int64
item Item
}
type queue struct {
nodes []qnode
len int
size int
}
func (q *queue) push(unix int64, item Item) {
if q.nodes == nil {
q.nodes = make([]qnode, 2)
} else {
q.nodes = append(q.nodes, qnode{})
}
i := q.len + 1
j := i / 2
for i > 1 && q.nodes[j].unix > unix {
q.nodes[i] = q.nodes[j]
i = j
j = j / 2
}
q.nodes[i].unix = unix
q.nodes[i].item = item
q.len++
}
func (q *queue) peek() qnode {
if q.len == 0 {
return qnode{}
}
return q.nodes[1]
}
func (q *queue) pop() qnode {
if q.len == 0 {
return qnode{}
}
n := q.nodes[1]
q.nodes[1] = q.nodes[q.len]
q.len--
var j, k int
i := 1
for i != q.len+1 {
k = q.len + 1
j = 2 * i
if j <= q.len && q.nodes[j].unix < q.nodes[k].unix {
k = j
}
if j+1 <= q.len && q.nodes[j+1].unix < q.nodes[k].unix {
k = j + 1
}
q.nodes[i] = q.nodes[k]
i = k
}
return n
}

98
pkg/expire/expire_test.go Normal file
View File

@ -0,0 +1,98 @@
package expire
import (
"fmt"
"math/rand"
"sort"
"sync"
"testing"
"time"
)
type testItem struct {
str string
exp time.Time
}
func (item *testItem) Expires() time.Time {
return item.exp
}
func TestBasic(t *testing.T) {
var list List
now := time.Now()
list.Push(&testItem{"13", now.Add(13)})
list.Push(&testItem{"11", now.Add(11)})
list.Push(&testItem{"14", now.Add(14)})
list.Push(&testItem{"10", now.Add(10)})
list.Push(&testItem{"15", now.Add(15)})
list.Push(&testItem{"12", now.Add(12)})
var lunix int64
for list.queue.len > 0 {
n2 := list.queue.pop()
if n2.unix < lunix {
t.Fatal("out of order")
}
}
}
func TestRandomQueue(t *testing.T) {
N := 100000
now := time.Now()
var list List
for i := 0; i < N; i++ {
list.Push(&testItem{fmt.Sprintf("%d", i),
now.Add(time.Duration(rand.Float64() * float64(time.Second)))})
}
// var wg sync.WaitGroup
// wg.Add(N)
// var items []Item
// list.Expired = func(item Item) {
// items = append(items, item)
// wg.Done()
// }
// wg.Wait()
var items []Item
for list.queue.len > 0 {
n1 := list.queue.peek()
n2 := list.queue.pop()
if n1 != n2 {
t.Fatal("mismatch")
}
if n1.unix > n2.unix {
t.Fatal("out of order")
}
items = append(items, n2.item)
}
if !sort.SliceIsSorted(items, func(i, j int) bool {
return items[i].Expires().Before(items[j].Expires())
}) {
t.Fatal("out of order")
}
}
func TestExpires(t *testing.T) {
N := 100000
now := time.Now()
var list List
for i := 0; i < N; i++ {
list.Push(&testItem{fmt.Sprintf("%d", i),
now.Add(time.Duration(rand.Float64() * float64(time.Second)))})
}
var wg sync.WaitGroup
wg.Add(N)
var items []Item
list.Expired = func(item Item) {
items = append(items, item)
wg.Done()
}
wg.Wait()
if len(items) != N {
t.Fatal("wrong result")
}
}