Update hook expiration logic

This commit is contained in:
tidwall 2021-09-12 09:55:58 -07:00
parent c686b87dc2
commit 83094b2740
6 changed files with 89 additions and 266 deletions

View File

@ -1,114 +0,0 @@
package expire
import (
"sync"
"time"
)
// Item is a something that can expire
type Item interface {
Expires() time.Time
}
// List of expireable items
type List struct {
mu sync.Mutex
queue queue
bgrun bool
Expired func(item Item)
}
// Push an item onto the queue
func (list *List) Push(item Item) {
unix := item.Expires().UnixNano()
list.mu.Lock()
if !list.bgrun {
list.bgrun = true
go list.bg()
}
list.queue.push(unix, item)
list.mu.Unlock()
}
func (list *List) bg() {
now := time.Now().UnixNano()
for {
list.mu.Lock()
if list.queue.len == 0 {
list.bgrun = false
list.mu.Unlock()
break
}
if now > list.queue.peek().unix { // now.After(list.queue.peek().unix)
n := list.queue.pop()
exfn := list.Expired
list.mu.Unlock()
if exfn != nil {
exfn(n.item)
}
} else {
list.mu.Unlock()
time.Sleep(time.Second / 10)
now = time.Now().UnixNano()
}
}
}
type qnode struct {
unix int64
item Item
}
type queue struct {
nodes []qnode
len int
}
func (q *queue) push(unix int64, item Item) {
if q.nodes == nil {
q.nodes = make([]qnode, 2)
} else {
q.nodes = append(q.nodes, qnode{})
}
i := q.len + 1
j := i / 2
for i > 1 && q.nodes[j].unix > unix {
q.nodes[i] = q.nodes[j]
i = j
j = j / 2
}
q.nodes[i].unix = unix
q.nodes[i].item = item
q.len++
}
func (q *queue) peek() qnode {
if q.len == 0 {
return qnode{}
}
return q.nodes[1]
}
func (q *queue) pop() qnode {
if q.len == 0 {
return qnode{}
}
n := q.nodes[1]
q.nodes[1] = q.nodes[q.len]
q.len--
var j, k int
i := 1
for i != q.len+1 {
k = q.len + 1
j = 2 * i
if j <= q.len && q.nodes[j].unix < q.nodes[k].unix {
k = j
}
if j+1 <= q.len && q.nodes[j+1].unix < q.nodes[k].unix {
k = j + 1
}
q.nodes[i] = q.nodes[k]
i = k
}
return n
}

View File

@ -1,88 +0,0 @@
package expire
import (
"fmt"
"math/rand"
"sort"
"sync"
"testing"
"time"
)
type testItem struct {
str string
exp time.Time
}
func (item *testItem) Expires() time.Time {
return item.exp
}
func TestBasic(t *testing.T) {
var list List
now := time.Now()
list.Push(&testItem{"13", now.Add(13)})
list.Push(&testItem{"11", now.Add(11)})
list.Push(&testItem{"14", now.Add(14)})
list.Push(&testItem{"10", now.Add(10)})
list.Push(&testItem{"15", now.Add(15)})
list.Push(&testItem{"12", now.Add(12)})
var lunix int64
for list.queue.len > 0 {
n2 := list.queue.pop()
if n2.unix < lunix {
t.Fatal("out of order")
}
}
}
func TestRandomQueue(t *testing.T) {
N := 1000
now := time.Now()
var list List
for i := 0; i < N; i++ {
list.Push(&testItem{fmt.Sprintf("%d", i),
now.Add(time.Duration(rand.Float64() * float64(time.Second)))})
}
var items []Item
for list.queue.len > 0 {
n1 := list.queue.peek()
n2 := list.queue.pop()
if n1 != n2 {
t.Fatal("mismatch")
}
if n1.unix > n2.unix {
t.Fatal("out of order")
}
items = append(items, n2.item)
}
if !sort.SliceIsSorted(items, func(i, j int) bool {
return items[i].Expires().Before(items[j].Expires())
}) {
t.Fatal("out of order")
}
}
func TestExpires(t *testing.T) {
N := 1000
now := time.Now()
var list List
for i := 0; i < N; i++ {
list.Push(&testItem{fmt.Sprintf("%d", i),
now.Add(time.Duration(rand.Float64() * float64(time.Second)))})
}
var wg sync.WaitGroup
wg.Add(N)
var items []Item
list.Expired = func(item Item) {
items = append(items, item)
wg.Done()
}
wg.Wait()
if len(items) != N {
t.Fatal("wrong result")
}
}

View File

@ -508,6 +508,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
server.cols = btree.NewNonConcurrent(byCollectionKey)
server.groupHooks = btree.NewNonConcurrent(byGroupHook)
server.groupObjects = btree.NewNonConcurrent(byGroupObject)
server.hookExpires = btree.NewNonConcurrent(byHookExpires)
server.hooks = make(map[string]*Hook)
server.hooksOut = make(map[string]*Hook)
server.hookTree = &rtree.RTree{}

View File

@ -18,32 +18,70 @@ func (s *Server) backgroundExpiring() {
func() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now().UnixNano()
var ids []string
var msgs []*Message
s.cols.Ascend(nil, func(v interface{}) bool {
col := v.(*collectionKeyContainer)
ids = col.col.Expired(now, ids[:0])
for _, id := range ids {
msgs = append(msgs, &Message{
Args: []string{"del", col.key, id},
})
}
return true
})
for _, msg := range msgs {
_, d, err := s.cmdDel(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
if len(msgs) > 0 {
log.Debugf("Expired %d items\n", len(msgs))
}
now := time.Now()
s.backgroundExpireObjects(now)
s.backgroundExpireHooks(now)
}()
time.Sleep(bgExpireDelay)
}
}
func (s *Server) backgroundExpireObjects(now time.Time) {
nano := now.UnixNano()
var ids []string
var msgs []*Message
s.cols.Ascend(nil, func(v interface{}) bool {
col := v.(*collectionKeyContainer)
ids = col.col.Expired(nano, ids[:0])
for _, id := range ids {
msgs = append(msgs, &Message{
Args: []string{"del", col.key, id},
})
}
return true
})
for _, msg := range msgs {
_, d, err := s.cmdDel(msg)
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
if len(msgs) > 0 {
log.Debugf("Expired %d objects\n", len(msgs))
}
}
func (s *Server) backgroundExpireHooks(now time.Time) {
var msgs []*Message
s.hookExpires.Ascend(nil, func(v interface{}) bool {
h := v.(*Hook)
if h.expires.After(now) {
return false
}
msg := &Message{}
if h.channel {
msg.Args = []string{"delchan", h.Name}
} else {
msg.Args = []string{"delhook", h.Name}
}
msgs = append(msgs, msg)
return true
})
for _, msg := range msgs {
_, d, err := s.cmdDelHook(msg, msg.Args[0] == "delchan")
if err != nil {
log.Fatal(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
log.Fatal(err)
}
}
if len(msgs) > 0 {
log.Debugf("Expired %d hooks\n", len(msgs))
}
}

View File

@ -170,7 +170,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
// for good measure.
prevHook.Signal()
if !hook.expires.IsZero() {
s.hookex.Push(hook)
s.hookExpires.Set(hook)
}
switch msg.OutputType {
case JSON:
@ -182,6 +182,9 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
prevHook.Close()
delete(s.hooks, name)
delete(s.hooksOut, name)
if !prevHook.expires.IsZero() {
s.hookExpires.Delete(prevHook)
}
s.groupDisconnectHook(name)
}
@ -224,7 +227,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
hook.Open() // Opens a goroutine to notify the hook
if !hook.expires.IsZero() {
s.hookex.Push(hook)
s.hookExpires.Set(hook)
}
switch msg.OutputType {
case JSON:
@ -235,6 +238,18 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
return NOMessage, d, nil
}
func byHookExpires(a, b interface{}) bool {
ha := a.(*Hook)
hb := b.(*Hook)
if ha.expires.Before(hb.expires) {
return true
}
if ha.expires.After(hb.expires) {
return false
}
return ha.Name < hb.Name
}
func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
res resp.Value, d commandDetails, err error,
) {
@ -254,6 +269,9 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
// remove hook from maps
delete(s.hooks, hook.Name)
delete(s.hooksOut, hook.Name)
if !hook.expires.IsZero() {
s.hookExpires.Delete(hook)
}
// remove any hook / object connections
s.groupDisconnectHook(hook.Name)
// remove hook from spatial index
@ -314,6 +332,9 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
// remove hook from maps
delete(s.hooks, hook.Name)
delete(s.hooksOut, hook.Name)
if !hook.expires.IsZero() {
s.hookExpires.Delete(hook)
}
// remove any hook / object connections
s.groupDisconnectHook(hook.Name)
// remove hook from spatial index
@ -344,35 +365,6 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
return
}
// possiblyExpireHook will evaluate a hook by it's name for expiration and
// purge it from the database if needed. This operation is called from an
// independent goroutine
func (s *Server) possiblyExpireHook(name string) {
s.mu.Lock()
if h, ok := s.hooks[name]; ok {
if !h.expires.IsZero() && time.Now().After(h.expires) {
// purge from database
msg := &Message{}
if h.channel {
msg.Args = []string{"delchan", h.Name}
} else {
msg.Args = []string{"delhook", h.Name}
}
_, d, err := s.cmdDelHook(msg, h.channel)
if err != nil {
s.mu.Unlock()
panic(err)
}
if err := s.writeAOF(msg.Args, &d); err != nil {
s.mu.Unlock()
panic(err)
}
log.Debugf("purged hook %v", h.Name)
}
}
s.mu.Unlock()
}
func (s *Server) cmdHooks(msg *Message, channel bool) (
res resp.Value, err error,
) {

View File

@ -36,7 +36,6 @@ import (
"github.com/tidwall/tile38/internal/collection"
"github.com/tidwall/tile38/internal/deadline"
"github.com/tidwall/tile38/internal/endpoint"
"github.com/tidwall/tile38/internal/expire"
"github.com/tidwall/tile38/internal/log"
)
@ -123,13 +122,13 @@ type Server struct {
hooksOut map[string]*Hook // hooks with "outside" detection
groupHooks *btree.BTree // hooks that are connected to objects
groupObjects *btree.BTree // objects that are connected to hooks
hookExpires *btree.BTree // queue of all hooks marked for expiration
aofconnM map[net.Conn]io.Closer
luascripts *lScriptMap
luapool *lStatePool
pubsub *pubsub
hookex expire.List
monconnsMu sync.RWMutex
monconns map[net.Conn]bool // monitor connections
@ -179,14 +178,9 @@ func Serve(opts Options) error {
groupHooks: btree.NewNonConcurrent(byGroupHook),
groupObjects: btree.NewNonConcurrent(byGroupObject),
hookExpires: btree.NewNonConcurrent(byHookExpires),
}
server.hookex.Expired = func(item expire.Item) {
switch v := item.(type) {
case *Hook:
server.possiblyExpireHook(v.Name)
}
}
server.epc = endpoint.NewManager(server)
server.luascripts = server.newScriptMap()
server.luapool = server.newPool()