diff --git a/internal/server/aof.go b/internal/server/aof.go index c8e8382d..73a70845 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -13,6 +13,7 @@ import ( "time" "github.com/tidwall/buntdb" + "github.com/tidwall/geojson" "github.com/tidwall/redcon" "github.com/tidwall/resp" "github.com/tidwall/tile38/internal/log" @@ -189,23 +190,60 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { return nil } +func (server *Server) getQueueCandidates(d *commandDetails) []*Hook { + var candidates []*Hook + // add the hooks with "outside" detection + if len(server.hooksOut) > 0 { + for _, hook := range server.hooksOut { + if hook.Key == d.key { + candidates = append(candidates, hook) + } + } + } + // search the hook spatial tree + for _, obj := range []geojson.Object{d.obj, d.oldObj} { + if obj == nil { + continue + } + rect := obj.Rect() + server.hookTree.Search( + []float64{rect.Min.X, rect.Min.Y}, + []float64{rect.Max.X, rect.Max.Y}, + func(_, _ []float64, value interface{}) bool { + hook := value.(*Hook) + var found bool + for _, candidate := range candidates { + if candidate == hook { + found = true + break + } + } + if !found { + candidates = append(candidates, hook) + } + return true + }, + ) + } + return candidates +} + func (server *Server) queueHooks(d *commandDetails) error { // big list of all of the messages var hmsgs []string var hooks []*Hook - // find the hook by the key - if hm, ok := server.hookcols[d.key]; ok { - for _, hook := range hm { - // match the fence - msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, hook.Metas, d) - if len(msgs) > 0 { - if hook.channel { - server.Publish(hook.Name, msgs...) - } else { - // append each msg to the big list - hmsgs = append(hmsgs, msgs...) - hooks = append(hooks, hook) - } + + candidates := server.getQueueCandidates(d) + for _, hook := range candidates { + // match the fence + msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, hook.Metas, d) + if len(msgs) > 0 { + if hook.channel { + server.Publish(hook.Name, msgs...) + } else { + // append each msg to the big list + hmsgs = append(hmsgs, msgs...) + hooks = append(hooks, hook) } } } diff --git a/internal/server/crud.go b/internal/server/crud.go index e2a17933..b25a594d 100644 --- a/internal/server/crud.go +++ b/internal/server/crud.go @@ -8,6 +8,7 @@ import ( "time" "github.com/mmcloughlin/geohash" + "github.com/tidwall/boxtree/d2" "github.com/tidwall/geojson" "github.com/tidwall/geojson/geometry" "github.com/tidwall/resp" @@ -466,7 +467,8 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails server.exlistmu.Unlock() server.expires = make(map[string]map[string]time.Time) server.hooks = make(map[string]*Hook) - server.hookcols = make(map[string]map[string]*Hook) + server.hooksOut = make(map[string]*Hook) + server.hookTree = d2.BoxTree{} d.command = "flushdb" d.updated = true d.timestamp = time.Now() diff --git a/internal/server/hooks.go b/internal/server/hooks.go index 29ac5a7b..7b9d22c3 100644 --- a/internal/server/hooks.go +++ b/internal/server/hooks.go @@ -154,15 +154,16 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) ( return NOMessage, d, err } - if h, ok := c.hooks[name]; ok { - if h.channel != chanCmd { + prevHook := c.hooks[name] + if prevHook != nil { + if prevHook.channel != chanCmd { return NOMessage, d, errors.New("hooks and channels cannot share the same name") } - if h.Equals(hook) { + if prevHook.Equals(hook) { // it was a match so we do nothing. But let's signal just // for good measure. - h.Signal() + prevHook.Signal() if !hook.expires.IsZero() { c.hookex.Push(hook) } @@ -173,23 +174,36 @@ func (c *Server) cmdSetHook(msg *Message, chanCmd bool) ( return resp.IntegerValue(0), d, nil } } - h.Close() - // delete the previous hook - if hm, ok := c.hookcols[h.Key]; ok { - delete(hm, h.Name) - } - delete(c.hooks, h.Name) + prevHook.Close() + delete(c.hooks, name) + delete(c.hooksOut, name) } d.updated = true d.timestamp = time.Now() + c.hooks[name] = hook - hm, ok := c.hookcols[hook.Key] - if !ok { - hm = make(map[string]*Hook) - c.hookcols[hook.Key] = hm + if hook.Fence.detect == nil || hook.Fence.detect["outside"] { + c.hooksOut[name] = hook } - hm[name] = hook + + // remove previous hook from spatial index + if prevHook != nil && prevHook.Fence != nil && prevHook.Fence.obj != nil { + rect := prevHook.Fence.obj.Rect() + c.hookTree.Delete( + []float64{rect.Min.X, rect.Min.Y}, + []float64{rect.Max.X, rect.Max.Y}, + prevHook) + } + // add hook to spatial index + if hook != nil && hook.Fence != nil && hook.Fence.obj != nil { + rect := hook.Fence.obj.Rect() + c.hookTree.Insert( + []float64{rect.Min.X, rect.Min.Y}, + []float64{rect.Max.X, rect.Max.Y}, + hook) + } + hook.Open() if !hook.expires.IsZero() { c.hookex.Push(hook) @@ -217,13 +231,21 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) ( if len(vs) != 0 { return NOMessage, d, errInvalidNumberOfArguments } - if h, ok := c.hooks[name]; ok && h.channel == chanCmd { - h.Close() - if hm, ok := c.hookcols[h.Key]; ok { - delete(hm, h.Name) - } - delete(c.hooks, h.Name) + if hook, ok := c.hooks[name]; ok && hook.channel == chanCmd { + hook.Close() + delete(c.hooks, hook.Name) + delete(c.hooksOut, hook.Name) + d.updated = true + + // remove hook from spatial index + if hook != nil && hook.Fence != nil && hook.Fence.obj != nil { + rect := hook.Fence.obj.Rect() + c.hookTree.Delete( + []float64{rect.Min.X, rect.Min.Y}, + []float64{rect.Max.X, rect.Max.Y}, + hook) + } } d.timestamp = time.Now() @@ -264,9 +286,6 @@ func (c *Server) cmdPDelHook(msg *Message, channel bool) ( continue } h.Close() - if hm, ok := c.hookcols[h.Key]; ok { - delete(hm, h.Name) - } delete(c.hooks, h.Name) d.updated = true count++ diff --git a/internal/server/server.go b/internal/server/server.go index 6707e03f..75b37106 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -22,6 +22,7 @@ import ( "sync/atomic" "time" + "github.com/tidwall/boxtree/d2" "github.com/tidwall/buntdb" "github.com/tidwall/evio" "github.com/tidwall/geojson" @@ -104,12 +105,13 @@ type Server struct { lstack []*commandDetails lives map[*liveBuffer]bool lcond *sync.Cond - fcup bool // follow caught up - fcuponce bool // follow caught up once - shrinking bool // aof shrinking flag - shrinklog [][]string // aof shrinking log - hooks map[string]*Hook // hook name - hookcols map[string]map[string]*Hook // col key + fcup bool // follow caught up + fcuponce bool // follow caught up once + shrinking bool // aof shrinking flag + shrinklog [][]string // aof shrinking log + hooks map[string]*Hook // hook name + hookTree d2.BoxTree // hook spatial tree containing all + hooksOut map[string]*Hook // hooks with "outside" detection aofconnM map[net.Conn]bool luascripts *lScriptMap luapool *lStatePool @@ -138,7 +140,7 @@ func Serve(host string, port int, dir string, http bool) error { lives: make(map[*liveBuffer]bool), lcond: sync.NewCond(&sync.Mutex{}), hooks: make(map[string]*Hook), - hookcols: make(map[string]map[string]*Hook), + hooksOut: make(map[string]*Hook), aofconnM: make(map[net.Conn]bool), expires: make(map[string]map[string]time.Time), started: time.Now(),