From c8389fe52cad1595f99c84eb09a9a8eaf8c3df6d Mon Sep 17 00:00:00 2001 From: tidwall Date: Fri, 20 Aug 2021 05:00:14 -0700 Subject: [PATCH] Fix memory leak with group id This commit fixes a memory leak that was being caused by hooks hanging on to the geofence group ids past the life of the object. --- internal/server/crud.go | 9 ++- internal/server/fence.go | 19 ++--- internal/server/group.go | 150 ++++++++++++++++++++++++++++++++++++++ internal/server/hooks.go | 5 ++ internal/server/search.go | 7 +- internal/server/server.go | 69 ++++++++++-------- internal/server/stats.go | 4 + 7 files changed, 213 insertions(+), 50 deletions(-) create mode 100644 internal/server/group.go diff --git a/internal/server/crud.go b/internal/server/crud.go index fd4d8b0c..5578b316 100644 --- a/internal/server/crud.go +++ b/internal/server/crud.go @@ -308,6 +308,7 @@ func (server *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, er found = true } } + server.groupDisconnectObject(d.key, d.id) d.command = "del" d.updated = found d.timestamp = time.Now() @@ -372,6 +373,7 @@ func (server *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, e } else { d.children[i] = dc } + server.groupDisconnectObject(dc.key, dc.id) } if atLeastOneNotDeleted { var nchildren []*commandDetails @@ -423,6 +425,7 @@ func (server *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, e d.key = "" // ignore the details d.updated = false } + server.groupDisconnectCollection(d.key) d.command = "drop" d.timestamp = time.Now() switch msg.OutputType { @@ -503,10 +506,12 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails return } server.cols = btree.NewNonConcurrent(byCollectionKey) + server.groupHooks = btree.NewNonConcurrent(byGroupHook) + server.groupObjects = btree.NewNonConcurrent(byGroupObject) server.hooks = make(map[string]*Hook) server.hooksOut = make(map[string]*Hook) - server.hookTree = rtree.RTree{} - server.hookCross = rtree.RTree{} + server.hookTree = &rtree.RTree{} + server.hookCross = &rtree.RTree{} d.command = "flushdb" d.updated = true d.timestamp = time.Now() diff --git a/internal/server/fence.go b/internal/server/fence.go index d0596dcf..eb0de389 100644 --- a/internal/server/fence.go +++ b/internal/server/fence.go @@ -195,23 +195,16 @@ func fenceMatch( } sw.mu.Unlock() - if fence.groups == nil { - fence.groups = make(map[string]string) - } - groupkey := details.key + ":" + details.id var group string - var ok bool if detect == "enter" { - group = bsonID() - fence.groups[groupkey] = group + group = sw.s.groupConnect(hookName, details.key, details.id) } else if detect == "cross" { - group = bsonID() - delete(fence.groups, groupkey) + sw.s.groupDisconnect(hookName, details.key, details.id) + group = sw.s.groupConnect(hookName, details.key, details.id) } else { - group, ok = fence.groups[groupkey] - if !ok { - group = bsonID() - fence.groups[groupkey] = group + group = sw.s.groupGet(hookName, details.key, details.id) + if group == "" { + group = sw.s.groupConnect(hookName, details.key, details.id) } } var msgs []string diff --git a/internal/server/group.go b/internal/server/group.go new file mode 100644 index 00000000..67e28fd9 --- /dev/null +++ b/internal/server/group.go @@ -0,0 +1,150 @@ +package server + +import ( + "github.com/tidwall/btree" +) + +func byGroupHook(va, vb interface{}) bool { + a, b := va.(*groupItem), vb.(*groupItem) + if a.hookName < b.hookName { + return true + } + if a.hookName > b.hookName { + return false + } + if a.colKey < b.colKey { + return true + } + if a.colKey > b.colKey { + return false + } + return a.objID < b.objID +} + +func byGroupObject(va, vb interface{}) bool { + a, b := va.(*groupItem), vb.(*groupItem) + if a.colKey < b.colKey { + return true + } + if a.colKey > b.colKey { + return false + } + if a.objID < b.objID { + return true + } + if a.objID > b.objID { + return false + } + return a.hookName < b.hookName +} + +type groupItem struct { + hookName string + colKey string + objID string + groupID string +} + +func newGroupItem(hookName, colKey, objID string) *groupItem { + groupID := bsonID() + g := &groupItem{} + // create a single string allocation + ustr := hookName + colKey + objID + groupID + var pos int + g.hookName = ustr[pos : pos+len(hookName)] + pos += len(hookName) + g.colKey = ustr[pos : pos+len(colKey)] + pos += len(colKey) + g.objID = ustr[pos : pos+len(objID)] + pos += len(objID) + g.groupID = ustr[pos : pos+len(groupID)] + pos += len(groupID) + return g +} + +func (s *Server) groupConnect(hookName, colKey, objID string) (groupID string) { + g := newGroupItem(hookName, colKey, objID) + s.groupHooks.Set(g) + s.groupObjects.Set(g) + return g.groupID +} + +func (s *Server) groupDisconnect(hookName, colKey, objID string) { + g := &groupItem{ + hookName: hookName, + colKey: colKey, + objID: objID, + } + s.groupHooks.Delete(g) + s.groupObjects.Delete(g) +} + +func (s *Server) groupGet(hookName, colKey, objID string) (groupID string) { + v := s.groupHooks.Get(&groupItem{ + hookName: hookName, + colKey: colKey, + objID: objID, + }) + if v != nil { + return v.(*groupItem).groupID + } + return "" +} + +func deleteGroups(s *Server, groups []*groupItem) { + var hhint btree.PathHint + var ohint btree.PathHint + for _, g := range groups { + s.groupHooks.DeleteHint(g, &hhint) + s.groupObjects.DeleteHint(g, &ohint) + } +} + +// groupDisconnectObject disconnects all hooks from provide object +func (s *Server) groupDisconnectObject(colKey, objID string) { + var groups []*groupItem + s.groupObjects.Ascend(&groupItem{colKey: colKey, objID: objID}, + func(v interface{}) bool { + g := v.(*groupItem) + if g.colKey != colKey || g.objID != objID { + return false + } + groups = append(groups, g) + return true + }, + ) + deleteGroups(s, groups) +} + +// groupDisconnectCollection disconnects all hooks from objects in provided +// collection. +func (s *Server) groupDisconnectCollection(colKey string) { + var groups []*groupItem + s.groupObjects.Ascend(&groupItem{colKey: colKey}, + func(v interface{}) bool { + g := v.(*groupItem) + if g.colKey != colKey { + return false + } + groups = append(groups, g) + return true + }, + ) + deleteGroups(s, groups) +} + +// groupDisconnectHook disconnects all objects from provided hook. +func (s *Server) groupDisconnectHook(hookName string) { + var groups []*groupItem + s.groupHooks.Ascend(&groupItem{hookName: hookName}, + func(v interface{}) bool { + g := v.(*groupItem) + if g.hookName != hookName { + return false + } + groups = append(groups, g) + return true + }, + ) + deleteGroups(s, groups) +} diff --git a/internal/server/hooks.go b/internal/server/hooks.go index bfbbbf8f..20e4ded2 100644 --- a/internal/server/hooks.go +++ b/internal/server/hooks.go @@ -182,6 +182,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( prevHook.Close() delete(s.hooks, name) delete(s.hooksOut, name) + s.groupDisconnectHook(name) } d.updated = true @@ -253,6 +254,8 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) ( // remove hook from maps delete(s.hooks, hook.Name) delete(s.hooksOut, hook.Name) + // remove any hook / object connections + s.groupDisconnectHook(hook.Name) // remove hook from spatial index if hook.Fence != nil && hook.Fence.obj != nil { rect := hook.Fence.obj.Rect() @@ -311,6 +314,8 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) ( // remove hook from maps delete(s.hooks, hook.Name) delete(s.hooksOut, hook.Name) + // remove any hook / object connections + s.groupDisconnectHook(hook.Name) // remove hook from spatial index if hook.Fence != nil && hook.Fence.obj != nil { rect := hook.Fence.obj.Rect() diff --git a/internal/server/search.go b/internal/server/search.go index c217636f..c662915d 100644 --- a/internal/server/search.go +++ b/internal/server/search.go @@ -20,10 +20,9 @@ const defaultCircleSteps = 64 type liveFenceSwitches struct { searchScanBaseTokens - obj geojson.Object - cmd string - roam roamSwitches - groups map[string]string + obj geojson.Object + cmd string + roam roamSwitches } type roamSwitches struct { diff --git a/internal/server/server.go b/internal/server/server.go index 81b5c897..d4e44c4f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/tidwall/btree" "github.com/tidwall/buntdb" "github.com/tidwall/geojson" @@ -37,8 +38,6 @@ import ( "github.com/tidwall/tile38/internal/endpoint" "github.com/tidwall/tile38/internal/expire" "github.com/tidwall/tile38/internal/log" - - "github.com/prometheus/client_golang/prometheus" ) var errOOM = errors.New("OOM command not allowed when used memory > 'maxmemory'") @@ -108,19 +107,22 @@ type Server struct { qidx uint64 // hook queue log last idx cols *btree.BTree // data collections - follows map[*bytes.Buffer]bool - fcond *sync.Cond - lstack []*commandDetails - lives map[*liveBuffer]bool - lcond *sync.Cond - fcup bool // follow caught up - fcuponce bool // follow caught up once - shrinking bool // aof shrinking flag - shrinklog [][]string // aof shrinking log - hooks map[string]*Hook // hook name - hookCross rtree.RTree // hook spatial tree for "cross" geofences - hookTree rtree.RTree // hook spatial tree for all - hooksOut map[string]*Hook // hooks with "outside" detection + follows map[*bytes.Buffer]bool + fcond *sync.Cond + lstack []*commandDetails + lives map[*liveBuffer]bool + lcond *sync.Cond + fcup bool // follow caught up + fcuponce bool // follow caught up once + shrinking bool // aof shrinking flag + shrinklog [][]string // aof shrinking log + hooks map[string]*Hook // hook name + hookCross *rtree.RTree // hook spatial tree for "cross" geofences + hookTree *rtree.RTree // hook spatial tree for all + hooksOut map[string]*Hook // hooks with "outside" detection + groupHooks *btree.BTree // hooks that are connected to objects + groupObjects *btree.BTree // objects that are connected to hooks + aofconnM map[net.Conn]io.Closer luascripts *lScriptMap luapool *lStatePool @@ -144,22 +146,27 @@ func Serve(host string, port int, dir string, useHTTP bool, metricsAddr string) // Initialize the server server := &Server{ - host: host, - port: port, - dir: dir, - follows: make(map[*bytes.Buffer]bool), - fcond: sync.NewCond(&sync.Mutex{}), - lives: make(map[*liveBuffer]bool), - lcond: sync.NewCond(&sync.Mutex{}), - hooks: make(map[string]*Hook), - hooksOut: make(map[string]*Hook), - aofconnM: make(map[net.Conn]io.Closer), - started: time.Now(), - conns: make(map[int]*Client), - http: useHTTP, - pubsub: newPubsub(), - monconns: make(map[net.Conn]bool), - cols: btree.NewNonConcurrent(byCollectionKey), + host: host, + port: port, + dir: dir, + follows: make(map[*bytes.Buffer]bool), + fcond: sync.NewCond(&sync.Mutex{}), + lives: make(map[*liveBuffer]bool), + lcond: sync.NewCond(&sync.Mutex{}), + hooks: make(map[string]*Hook), + hooksOut: make(map[string]*Hook), + hookCross: &rtree.RTree{}, + hookTree: &rtree.RTree{}, + aofconnM: make(map[net.Conn]io.Closer), + started: time.Now(), + conns: make(map[int]*Client), + http: useHTTP, + pubsub: newPubsub(), + monconns: make(map[net.Conn]bool), + cols: btree.NewNonConcurrent(byCollectionKey), + + groupHooks: btree.NewNonConcurrent(byGroupHook), + groupObjects: btree.NewNonConcurrent(byGroupObject), } server.hookex.Expired = func(item expire.Item) { diff --git a/internal/server/stats.go b/internal/server/stats.go index f37e0b06..0e0ca699 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -338,6 +338,10 @@ func (s *Server) extStats(m map[string]interface{}) { m["tile38_num_collections"] = s.cols.Len() // Number of hooks in the database m["tile38_num_hooks"] = len(s.hooks) + // Number of hook groups in the database + m["tile38_num_hook_groups"] = s.groupHooks.Len() + // Number of object groups in the database + m["tile38_num_object_groups"] = s.groupObjects.Len() avgsz := 0 if points != 0 {