diff --git a/pkg/expire/expire.go b/pkg/expire/expire.go new file mode 100644 index 00000000..ae5577e3 --- /dev/null +++ b/pkg/expire/expire.go @@ -0,0 +1,115 @@ +package expire + +import ( + "sync" + "time" +) + +// Item is a something that can expire +type Item interface { + Expires() time.Time +} + +// List of expireable items +type List struct { + mu sync.Mutex + queue queue + bgrun bool + arr []Item + Expired func(item Item) +} + +// Push an item onto the queue +func (list *List) Push(item Item) { + unix := item.Expires().UnixNano() + list.mu.Lock() + if !list.bgrun { + list.bgrun = true + go list.bg() + } + list.queue.push(unix, item) + list.mu.Unlock() +} + +func (list *List) bg() { + unix := time.Now().UnixNano() + for { + list.mu.Lock() + if list.queue.len == 0 { + list.bgrun = false + list.mu.Unlock() + break + } + if unix > list.queue.peek().unix { + n := list.queue.pop() + list.mu.Unlock() + if list.Expired != nil { + list.Expired(n.item) + } + } else { + list.mu.Unlock() + time.Sleep(time.Second / 10) + unix = time.Now().UnixNano() + } + } +} + +type qnode struct { + unix int64 + item Item +} + +type queue struct { + nodes []qnode + len int + size int +} + +func (q *queue) push(unix int64, item Item) { + if q.nodes == nil { + q.nodes = make([]qnode, 2) + } else { + q.nodes = append(q.nodes, qnode{}) + } + i := q.len + 1 + j := i / 2 + for i > 1 && q.nodes[j].unix > unix { + q.nodes[i] = q.nodes[j] + i = j + j = j / 2 + } + q.nodes[i].unix = unix + q.nodes[i].item = item + q.len++ +} + +func (q *queue) peek() qnode { + if q.len == 0 { + return qnode{} + } + return q.nodes[1] +} + +func (q *queue) pop() qnode { + if q.len == 0 { + return qnode{} + } + n := q.nodes[1] + q.nodes[1] = q.nodes[q.len] + q.len-- + var j, k int + i := 1 + for i != q.len+1 { + k = q.len + 1 + j = 2 * i + if j <= q.len && q.nodes[j].unix < q.nodes[k].unix { + k = j + } + if j+1 <= q.len && q.nodes[j+1].unix < q.nodes[k].unix { + k = j + 1 + } + q.nodes[i] = q.nodes[k] + i = k + } + return n +} diff --git a/pkg/expire/expire_test.go b/pkg/expire/expire_test.go new file mode 100644 index 00000000..0bbc2c17 --- /dev/null +++ b/pkg/expire/expire_test.go @@ -0,0 +1,98 @@ +package expire + +import ( + "fmt" + "math/rand" + "sort" + "sync" + "testing" + "time" +) + +type testItem struct { + str string + exp time.Time +} + +func (item *testItem) Expires() time.Time { + return item.exp +} + +func TestBasic(t *testing.T) { + var list List + now := time.Now() + list.Push(&testItem{"13", now.Add(13)}) + list.Push(&testItem{"11", now.Add(11)}) + list.Push(&testItem{"14", now.Add(14)}) + list.Push(&testItem{"10", now.Add(10)}) + list.Push(&testItem{"15", now.Add(15)}) + list.Push(&testItem{"12", now.Add(12)}) + + var lunix int64 + for list.queue.len > 0 { + n2 := list.queue.pop() + if n2.unix < lunix { + t.Fatal("out of order") + } + } +} + +func TestRandomQueue(t *testing.T) { + N := 100000 + now := time.Now() + var list List + for i := 0; i < N; i++ { + list.Push(&testItem{fmt.Sprintf("%d", i), + now.Add(time.Duration(rand.Float64() * float64(time.Second)))}) + } + // var wg sync.WaitGroup + // wg.Add(N) + // var items []Item + // list.Expired = func(item Item) { + // items = append(items, item) + // wg.Done() + // } + + // wg.Wait() + + var items []Item + for list.queue.len > 0 { + n1 := list.queue.peek() + n2 := list.queue.pop() + if n1 != n2 { + t.Fatal("mismatch") + } + if n1.unix > n2.unix { + t.Fatal("out of order") + } + items = append(items, n2.item) + } + + if !sort.SliceIsSorted(items, func(i, j int) bool { + return items[i].Expires().Before(items[j].Expires()) + }) { + t.Fatal("out of order") + } + +} + +func TestExpires(t *testing.T) { + N := 100000 + now := time.Now() + var list List + for i := 0; i < N; i++ { + list.Push(&testItem{fmt.Sprintf("%d", i), + now.Add(time.Duration(rand.Float64() * float64(time.Second)))}) + } + var wg sync.WaitGroup + wg.Add(N) + var items []Item + list.Expired = func(item Item) { + items = append(items, item) + wg.Done() + } + wg.Wait() + if len(items) != N { + t.Fatal("wrong result") + } +}