From 8829b8ffc3204847a6bd77b8769afc4b15edfe13 Mon Sep 17 00:00:00 2001 From: tidwall Date: Mon, 13 Sep 2021 10:02:36 -0700 Subject: [PATCH] Change hooks collection type from hashmap to btree This commit changes the collection type that holds all of the hooks from a hashmap to a btree. This allows for better flexibility for operations that need to perform range searches and scanning of the collection. --- internal/server/aof.go | 6 ++- internal/server/aofshrink.go | 16 ++++--- internal/server/crud.go | 15 ++++-- internal/server/hooks.go | 91 +++++++++++++++++++----------------- internal/server/server.go | 26 +++++------ internal/server/stats.go | 4 +- 6 files changed, 86 insertions(+), 72 deletions(-) diff --git a/internal/server/aof.go b/internal/server/aof.go index 73cdfcf3..6b440e21 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -224,11 +224,13 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error { func (s *Server) getQueueCandidates(d *commandDetails) []*Hook { candidates := make(map[*Hook]bool) // add the hooks with "outside" detection - for _, hook := range s.hooksOut { + s.hooksOut.Ascend(nil, func(v interface{}) bool { + hook := v.(*Hook) if hook.Key == d.key { candidates[hook] = true } - } + return true + }) // look for candidates that might "cross" geofences if d.oldObj != nil && d.obj != nil && s.hookCross.Len() > 0 { r1, r2 := d.oldObj.Rect(), d.obj.Rect() diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index f53db56e..bd1c5f08 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -3,11 +3,11 @@ package server import ( "math" "os" - "sort" "strconv" "strings" "time" + "github.com/tidwall/btree" "github.com/tidwall/geojson" "github.com/tidwall/tile38/core" "github.com/tidwall/tile38/internal/collection" @@ -169,17 +169,19 @@ func (server *Server) aofshrink() { func() { server.mu.Lock() defer server.mu.Unlock() - for name := range server.hooks { - hnames = append(hnames, name) - } + hnames = make([]string, 0, server.hooks.Len()) + server.hooks.Walk(func(v []interface{}) { + for _, v := range v { + hnames = append(hnames, v.(*Hook).Name) + } + }) }() - // sort the names for consistency - sort.Strings(hnames) + var hookHint btree.PathHint for _, name := range hnames { func() { server.mu.Lock() defer server.mu.Unlock() - hook := server.hooks[name] + hook, _ := server.hooks.GetHint(name, &hookHint).(*Hook) if hook == nil { return } diff --git a/internal/server/crud.go b/internal/server/crud.go index 10be2793..5c0e1c03 100644 --- a/internal/server/crud.go +++ b/internal/server/crud.go @@ -462,12 +462,14 @@ func (server *Server) cmdRename(msg *Message, nx bool) (res resp.Value, d comman err = errKeyNotFound return } - for _, h := range server.hooks { + server.hooks.Ascend(nil, func(v interface{}) bool { + h := v.(*Hook) if h.Key == d.key || h.Key == d.newKey { err = errKeyHasHooksSet - return + return false } - } + return true + }) d.command = "rename" newCol := server.getCol(d.newKey) if newCol == nil { @@ -505,14 +507,17 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails err = errInvalidNumberOfArguments return } + + // clear the entire database server.cols = btree.NewNonConcurrent(byCollectionKey) server.groupHooks = btree.NewNonConcurrent(byGroupHook) server.groupObjects = btree.NewNonConcurrent(byGroupObject) server.hookExpires = btree.NewNonConcurrent(byHookExpires) - server.hooks = make(map[string]*Hook) - server.hooksOut = make(map[string]*Hook) + server.hooks = btree.NewNonConcurrent(byHookName) + server.hooksOut = btree.NewNonConcurrent(byHookName) server.hookTree = &rtree.RTree{} server.hookCross = &rtree.RTree{} + d.command = "flushdb" d.updated = true d.timestamp = time.Now() diff --git a/internal/server/hooks.go b/internal/server/hooks.go index 0a4e63d3..71e5dd58 100644 --- a/internal/server/hooks.go +++ b/internal/server/hooks.go @@ -22,18 +22,8 @@ var hookLogSetDefaults = &buntdb.SetOptions{ TTL: time.Second * 30, } -type hooksByName []*Hook - -func (a hooksByName) Len() int { - return len(a) -} - -func (a hooksByName) Less(i, j int) bool { - return a[i].Name < a[j].Name -} - -func (a hooksByName) Swap(i, j int) { - a[i], a[j] = a[j], a[i] +func byHookName(a, b interface{}) bool { + return a.(*Hook).Name < b.(*Hook).Name } func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( @@ -159,7 +149,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( return NOMessage, d, err } - prevHook := s.hooks[name] + prevHook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook) if prevHook != nil { if prevHook.channel != chanCmd { return NOMessage, d, @@ -180,8 +170,8 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( } } prevHook.Close() - delete(s.hooks, name) - delete(s.hooksOut, name) + s.hooks.Delete(prevHook) + s.hooksOut.Delete(prevHook) if !prevHook.expires.IsZero() { s.hookExpires.Delete(prevHook) } @@ -191,9 +181,9 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( d.updated = true d.timestamp = time.Now() - s.hooks[name] = hook + s.hooks.Set(hook) if hook.Fence.detect == nil || hook.Fence.detect["outside"] { - s.hooksOut[name] = hook + s.hooksOut.Set(hook) } // remove previous hook from spatial index @@ -264,11 +254,12 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) ( if len(vs) != 0 { return NOMessage, d, errInvalidNumberOfArguments } - if hook, ok := s.hooks[name]; ok && hook.channel == chanCmd { + hook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook) + if hook != nil && hook.channel == chanCmd { hook.Close() // remove hook from maps - delete(s.hooks, hook.Name) - delete(s.hooksOut, hook.Name) + s.hooks.Delete(hook) + s.hooksOut.Delete(hook) if !hook.expires.IsZero() { s.hookExpires.Delete(hook) } @@ -320,18 +311,20 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) ( } count := 0 - for name, hook := range s.hooks { + var hooks []*Hook + s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool { + hooks = append(hooks, hook) + return true + }) + + for _, hook := range hooks { if hook.channel != channel { continue } - match, _ := glob.Match(pattern, name) - if !match { - continue - } hook.Close() // remove hook from maps - delete(s.hooks, hook.Name) - delete(s.hooksOut, hook.Name) + s.hooks.Delete(hook) + s.hooksOut.Delete(hook) if !hook.expires.IsZero() { s.hookExpires.Delete(hook) } @@ -365,6 +358,26 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) ( return } +func (s *Server) forEachHookByPattern( + pattern string, channel bool, iter func(hook *Hook) bool, +) { + g := glob.Parse(pattern, false) + hasUpperLimit := g.Limits[1] != "" + s.hooks.Ascend(&Hook{Name: g.Limits[0]}, func(v interface{}) bool { + hook := v.(*Hook) + if hasUpperLimit && hook.Name > g.Limits[1] { + return false + } + if hook.channel == channel { + match, _ := glob.Match(pattern, hook.Name) + if match { + return iter(hook) + } + } + return true + }) +} + func (s *Server) cmdHooks(msg *Message, channel bool) ( res resp.Value, err error, ) { @@ -381,18 +394,6 @@ func (s *Server) cmdHooks(msg *Message, channel bool) ( return NOMessage, errInvalidNumberOfArguments } - var hooks []*Hook - for name, hook := range s.hooks { - if hook.channel != channel { - continue - } - match, _ := glob.Match(pattern, name) - if match { - hooks = append(hooks, hook) - } - } - sort.Sort(hooksByName(hooks)) - switch msg.OutputType { case JSON: buf := &bytes.Buffer{} @@ -402,7 +403,8 @@ func (s *Server) cmdHooks(msg *Message, channel bool) ( } else { buf.WriteString(`"hooks":[`) } - for i, hook := range hooks { + var i int + s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool { var ttl = -1 if !hook.expires.IsZero() { ttl = int(hook.expires.Sub(start).Seconds()) @@ -444,13 +446,15 @@ func (s *Server) cmdHooks(msg *Message, channel bool) ( buf.WriteString(jsonString(meta.Value)) } buf.WriteString(`}}`) - } + i++ + return true + }) buf.WriteString(`],"elapsed":"` + time.Since(start).String() + "\"}") return resp.StringValue(buf.String()), nil case RESP: var vals []resp.Value - for _, hook := range hooks { + s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool { var hvals []resp.Value hvals = append(hvals, resp.StringValue(hook.Name)) hvals = append(hvals, resp.StringValue(hook.Key)) @@ -471,7 +475,8 @@ func (s *Server) cmdHooks(msg *Message, channel bool) ( } hvals = append(hvals, resp.ArrayValue(metas)) vals = append(vals, resp.ArrayValue(hvals)) - } + return true + }) return resp.ArrayValue(vals), nil } return resp.SimpleStringValue(""), nil diff --git a/internal/server/server.go b/internal/server/server.go index 659157ef..831589ce 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -112,17 +112,17 @@ type Server struct { lstack []*commandDetails lives map[*liveBuffer]bool lcond *sync.Cond - fcup bool // follow caught up - fcuponce bool // follow caught up once - shrinking bool // aof shrinking flag - shrinklog [][]string // aof shrinking log - hooks map[string]*Hook // hook name - hookCross *rtree.RTree // hook spatial tree for "cross" geofences - hookTree *rtree.RTree // hook spatial tree for all - hooksOut map[string]*Hook // hooks with "outside" detection - groupHooks *btree.BTree // hooks that are connected to objects - groupObjects *btree.BTree // objects that are connected to hooks - hookExpires *btree.BTree // queue of all hooks marked for expiration + fcup bool // follow caught up + fcuponce bool // follow caught up once + shrinking bool // aof shrinking flag + shrinklog [][]string // aof shrinking log + hooks *btree.BTree // hook name -- [string]*Hook + hookCross *rtree.RTree // hook spatial tree for "cross" geofences + hookTree *rtree.RTree // hook spatial tree for all + hooksOut *btree.BTree // hooks with "outside" detection -- [string]*Hook + groupHooks *btree.BTree // hooks that are connected to objects + groupObjects *btree.BTree // objects that are connected to hooks + hookExpires *btree.BTree // queue of all hooks marked for expiration aofconnM map[net.Conn]io.Closer luascripts *lScriptMap @@ -164,8 +164,8 @@ func Serve(opts Options) error { fcond: sync.NewCond(&sync.Mutex{}), lives: make(map[*liveBuffer]bool), lcond: sync.NewCond(&sync.Mutex{}), - hooks: make(map[string]*Hook), - hooksOut: make(map[string]*Hook), + hooks: btree.NewNonConcurrent(byHookName), + hooksOut: btree.NewNonConcurrent(byHookName), hookCross: &rtree.RTree{}, hookTree: &rtree.RTree{}, aofconnM: make(map[net.Conn]io.Closer), diff --git a/internal/server/stats.go b/internal/server/stats.go index 0e0ca699..503beafe 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -157,7 +157,7 @@ func (s *Server) basicStats(m map[string]interface{}) { m["pid"] = os.Getpid() m["aof_size"] = s.aofsz m["num_collections"] = s.cols.Len() - m["num_hooks"] = len(s.hooks) + m["num_hooks"] = s.hooks.Len() sz := 0 s.cols.Ascend(nil, func(v interface{}) bool { col := v.(*collectionKeyContainer).col @@ -337,7 +337,7 @@ func (s *Server) extStats(m map[string]interface{}) { // Number of collections in the database m["tile38_num_collections"] = s.cols.Len() // Number of hooks in the database - m["tile38_num_hooks"] = len(s.hooks) + m["tile38_num_hooks"] = s.hooks.Len() // Number of hook groups in the database m["tile38_num_hook_groups"] = s.groupHooks.Len() // Number of object groups in the database