diff --git a/internal/expire/expire.go b/internal/expire/expire.go deleted file mode 100644 index a501f28a..00000000 --- a/internal/expire/expire.go +++ /dev/null @@ -1,114 +0,0 @@ -package expire - -import ( - "sync" - "time" -) - -// Item is a something that can expire -type Item interface { - Expires() time.Time -} - -// List of expireable items -type List struct { - mu sync.Mutex - queue queue - bgrun bool - Expired func(item Item) -} - -// Push an item onto the queue -func (list *List) Push(item Item) { - unix := item.Expires().UnixNano() - list.mu.Lock() - if !list.bgrun { - list.bgrun = true - go list.bg() - } - list.queue.push(unix, item) - list.mu.Unlock() -} - -func (list *List) bg() { - now := time.Now().UnixNano() - for { - list.mu.Lock() - if list.queue.len == 0 { - list.bgrun = false - list.mu.Unlock() - break - } - if now > list.queue.peek().unix { // now.After(list.queue.peek().unix) - n := list.queue.pop() - exfn := list.Expired - list.mu.Unlock() - if exfn != nil { - exfn(n.item) - } - } else { - list.mu.Unlock() - time.Sleep(time.Second / 10) - now = time.Now().UnixNano() - } - } -} - -type qnode struct { - unix int64 - item Item -} - -type queue struct { - nodes []qnode - len int -} - -func (q *queue) push(unix int64, item Item) { - if q.nodes == nil { - q.nodes = make([]qnode, 2) - } else { - q.nodes = append(q.nodes, qnode{}) - } - i := q.len + 1 - j := i / 2 - for i > 1 && q.nodes[j].unix > unix { - q.nodes[i] = q.nodes[j] - i = j - j = j / 2 - } - q.nodes[i].unix = unix - q.nodes[i].item = item - q.len++ -} - -func (q *queue) peek() qnode { - if q.len == 0 { - return qnode{} - } - return q.nodes[1] -} - -func (q *queue) pop() qnode { - if q.len == 0 { - return qnode{} - } - n := q.nodes[1] - q.nodes[1] = q.nodes[q.len] - q.len-- - var j, k int - i := 1 - for i != q.len+1 { - k = q.len + 1 - j = 2 * i - if j <= q.len && q.nodes[j].unix < q.nodes[k].unix { - k = j - } - if j+1 <= q.len && q.nodes[j+1].unix < q.nodes[k].unix { - k = j + 1 - } - q.nodes[i] = q.nodes[k] - i = k - } - return n -} diff --git a/internal/expire/expire_test.go b/internal/expire/expire_test.go deleted file mode 100644 index 1707b45b..00000000 --- a/internal/expire/expire_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package expire - -import ( - "fmt" - "math/rand" - "sort" - "sync" - "testing" - "time" -) - -type testItem struct { - str string - exp time.Time -} - -func (item *testItem) Expires() time.Time { - return item.exp -} - -func TestBasic(t *testing.T) { - var list List - now := time.Now() - list.Push(&testItem{"13", now.Add(13)}) - list.Push(&testItem{"11", now.Add(11)}) - list.Push(&testItem{"14", now.Add(14)}) - list.Push(&testItem{"10", now.Add(10)}) - list.Push(&testItem{"15", now.Add(15)}) - list.Push(&testItem{"12", now.Add(12)}) - - var lunix int64 - for list.queue.len > 0 { - n2 := list.queue.pop() - if n2.unix < lunix { - t.Fatal("out of order") - } - } -} - -func TestRandomQueue(t *testing.T) { - N := 1000 - now := time.Now() - var list List - for i := 0; i < N; i++ { - list.Push(&testItem{fmt.Sprintf("%d", i), - now.Add(time.Duration(rand.Float64() * float64(time.Second)))}) - } - var items []Item - for list.queue.len > 0 { - n1 := list.queue.peek() - n2 := list.queue.pop() - if n1 != n2 { - t.Fatal("mismatch") - } - if n1.unix > n2.unix { - t.Fatal("out of order") - } - items = append(items, n2.item) - } - - if !sort.SliceIsSorted(items, func(i, j int) bool { - return items[i].Expires().Before(items[j].Expires()) - }) { - t.Fatal("out of order") - } - -} - -func TestExpires(t *testing.T) { - N := 1000 - now := time.Now() - var list List - for i := 0; i < N; i++ { - list.Push(&testItem{fmt.Sprintf("%d", i), - now.Add(time.Duration(rand.Float64() * float64(time.Second)))}) - } - var wg sync.WaitGroup - wg.Add(N) - var items []Item - list.Expired = func(item Item) { - items = append(items, item) - wg.Done() - } - wg.Wait() - if len(items) != N { - t.Fatal("wrong result") - } -} diff --git a/internal/server/crud.go b/internal/server/crud.go index 5578b316..10be2793 100644 --- a/internal/server/crud.go +++ b/internal/server/crud.go @@ -508,6 +508,7 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails server.cols = btree.NewNonConcurrent(byCollectionKey) server.groupHooks = btree.NewNonConcurrent(byGroupHook) server.groupObjects = btree.NewNonConcurrent(byGroupObject) + server.hookExpires = btree.NewNonConcurrent(byHookExpires) server.hooks = make(map[string]*Hook) server.hooksOut = make(map[string]*Hook) server.hookTree = &rtree.RTree{} diff --git a/internal/server/expire.go b/internal/server/expire.go index 216cece3..16030c65 100644 --- a/internal/server/expire.go +++ b/internal/server/expire.go @@ -18,32 +18,70 @@ func (s *Server) backgroundExpiring() { func() { s.mu.Lock() defer s.mu.Unlock() - now := time.Now().UnixNano() - var ids []string - var msgs []*Message - s.cols.Ascend(nil, func(v interface{}) bool { - col := v.(*collectionKeyContainer) - ids = col.col.Expired(now, ids[:0]) - for _, id := range ids { - msgs = append(msgs, &Message{ - Args: []string{"del", col.key, id}, - }) - } - return true - }) - for _, msg := range msgs { - _, d, err := s.cmdDel(msg) - if err != nil { - log.Fatal(err) - } - if err := s.writeAOF(msg.Args, &d); err != nil { - log.Fatal(err) - } - } - if len(msgs) > 0 { - log.Debugf("Expired %d items\n", len(msgs)) - } + now := time.Now() + s.backgroundExpireObjects(now) + s.backgroundExpireHooks(now) }() time.Sleep(bgExpireDelay) } } + +func (s *Server) backgroundExpireObjects(now time.Time) { + nano := now.UnixNano() + var ids []string + var msgs []*Message + s.cols.Ascend(nil, func(v interface{}) bool { + col := v.(*collectionKeyContainer) + ids = col.col.Expired(nano, ids[:0]) + for _, id := range ids { + msgs = append(msgs, &Message{ + Args: []string{"del", col.key, id}, + }) + } + return true + }) + for _, msg := range msgs { + _, d, err := s.cmdDel(msg) + if err != nil { + log.Fatal(err) + } + if err := s.writeAOF(msg.Args, &d); err != nil { + log.Fatal(err) + } + } + if len(msgs) > 0 { + log.Debugf("Expired %d objects\n", len(msgs)) + } + +} + +func (s *Server) backgroundExpireHooks(now time.Time) { + var msgs []*Message + s.hookExpires.Ascend(nil, func(v interface{}) bool { + h := v.(*Hook) + if h.expires.After(now) { + return false + } + msg := &Message{} + if h.channel { + msg.Args = []string{"delchan", h.Name} + } else { + msg.Args = []string{"delhook", h.Name} + } + msgs = append(msgs, msg) + return true + }) + + for _, msg := range msgs { + _, d, err := s.cmdDelHook(msg, msg.Args[0] == "delchan") + if err != nil { + log.Fatal(err) + } + if err := s.writeAOF(msg.Args, &d); err != nil { + log.Fatal(err) + } + } + if len(msgs) > 0 { + log.Debugf("Expired %d hooks\n", len(msgs)) + } +} diff --git a/internal/server/hooks.go b/internal/server/hooks.go index 11d18db9..0a4e63d3 100644 --- a/internal/server/hooks.go +++ b/internal/server/hooks.go @@ -170,7 +170,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( // for good measure. prevHook.Signal() if !hook.expires.IsZero() { - s.hookex.Push(hook) + s.hookExpires.Set(hook) } switch msg.OutputType { case JSON: @@ -182,6 +182,9 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( prevHook.Close() delete(s.hooks, name) delete(s.hooksOut, name) + if !prevHook.expires.IsZero() { + s.hookExpires.Delete(prevHook) + } s.groupDisconnectHook(name) } @@ -224,7 +227,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( hook.Open() // Opens a goroutine to notify the hook if !hook.expires.IsZero() { - s.hookex.Push(hook) + s.hookExpires.Set(hook) } switch msg.OutputType { case JSON: @@ -235,6 +238,18 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( return NOMessage, d, nil } +func byHookExpires(a, b interface{}) bool { + ha := a.(*Hook) + hb := b.(*Hook) + if ha.expires.Before(hb.expires) { + return true + } + if ha.expires.After(hb.expires) { + return false + } + return ha.Name < hb.Name +} + func (s *Server) cmdDelHook(msg *Message, chanCmd bool) ( res resp.Value, d commandDetails, err error, ) { @@ -254,6 +269,9 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) ( // remove hook from maps delete(s.hooks, hook.Name) delete(s.hooksOut, hook.Name) + if !hook.expires.IsZero() { + s.hookExpires.Delete(hook) + } // remove any hook / object connections s.groupDisconnectHook(hook.Name) // remove hook from spatial index @@ -314,6 +332,9 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) ( // remove hook from maps delete(s.hooks, hook.Name) delete(s.hooksOut, hook.Name) + if !hook.expires.IsZero() { + s.hookExpires.Delete(hook) + } // remove any hook / object connections s.groupDisconnectHook(hook.Name) // remove hook from spatial index @@ -344,35 +365,6 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) ( return } -// possiblyExpireHook will evaluate a hook by it's name for expiration and -// purge it from the database if needed. This operation is called from an -// independent goroutine -func (s *Server) possiblyExpireHook(name string) { - s.mu.Lock() - if h, ok := s.hooks[name]; ok { - if !h.expires.IsZero() && time.Now().After(h.expires) { - // purge from database - msg := &Message{} - if h.channel { - msg.Args = []string{"delchan", h.Name} - } else { - msg.Args = []string{"delhook", h.Name} - } - _, d, err := s.cmdDelHook(msg, h.channel) - if err != nil { - s.mu.Unlock() - panic(err) - } - if err := s.writeAOF(msg.Args, &d); err != nil { - s.mu.Unlock() - panic(err) - } - log.Debugf("purged hook %v", h.Name) - } - } - s.mu.Unlock() -} - func (s *Server) cmdHooks(msg *Message, channel bool) ( res resp.Value, err error, ) { diff --git a/internal/server/server.go b/internal/server/server.go index 56f84a61..659157ef 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -36,7 +36,6 @@ import ( "github.com/tidwall/tile38/internal/collection" "github.com/tidwall/tile38/internal/deadline" "github.com/tidwall/tile38/internal/endpoint" - "github.com/tidwall/tile38/internal/expire" "github.com/tidwall/tile38/internal/log" ) @@ -123,13 +122,13 @@ type Server struct { hooksOut map[string]*Hook // hooks with "outside" detection groupHooks *btree.BTree // hooks that are connected to objects groupObjects *btree.BTree // objects that are connected to hooks + hookExpires *btree.BTree // queue of all hooks marked for expiration aofconnM map[net.Conn]io.Closer luascripts *lScriptMap luapool *lStatePool pubsub *pubsub - hookex expire.List monconnsMu sync.RWMutex monconns map[net.Conn]bool // monitor connections @@ -179,14 +178,9 @@ func Serve(opts Options) error { groupHooks: btree.NewNonConcurrent(byGroupHook), groupObjects: btree.NewNonConcurrent(byGroupObject), + hookExpires: btree.NewNonConcurrent(byHookExpires), } - server.hookex.Expired = func(item expire.Item) { - switch v := item.(type) { - case *Hook: - server.possiblyExpireHook(v.Name) - } - } server.epc = endpoint.NewManager(server) server.luascripts = server.newScriptMap() server.luapool = server.newPool()