Change hooks collection type from hashmap to btree

This commit changes the collection type that holds all of the
hooks from a hashmap to a btree. This allows for better
flexibility for operations that need to perform range searches
and scanning of the collection.
This commit is contained in:
tidwall 2021-09-13 10:02:36 -07:00
parent 83094b2740
commit 8829b8ffc3
6 changed files with 86 additions and 72 deletions

View File

@ -224,11 +224,13 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error {
func (s *Server) getQueueCandidates(d *commandDetails) []*Hook { func (s *Server) getQueueCandidates(d *commandDetails) []*Hook {
candidates := make(map[*Hook]bool) candidates := make(map[*Hook]bool)
// add the hooks with "outside" detection // add the hooks with "outside" detection
for _, hook := range s.hooksOut { s.hooksOut.Ascend(nil, func(v interface{}) bool {
hook := v.(*Hook)
if hook.Key == d.key { if hook.Key == d.key {
candidates[hook] = true candidates[hook] = true
} }
} return true
})
// look for candidates that might "cross" geofences // look for candidates that might "cross" geofences
if d.oldObj != nil && d.obj != nil && s.hookCross.Len() > 0 { if d.oldObj != nil && d.obj != nil && s.hookCross.Len() > 0 {
r1, r2 := d.oldObj.Rect(), d.obj.Rect() r1, r2 := d.oldObj.Rect(), d.obj.Rect()

View File

@ -3,11 +3,11 @@ package server
import ( import (
"math" "math"
"os" "os"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/tidwall/btree"
"github.com/tidwall/geojson" "github.com/tidwall/geojson"
"github.com/tidwall/tile38/core" "github.com/tidwall/tile38/core"
"github.com/tidwall/tile38/internal/collection" "github.com/tidwall/tile38/internal/collection"
@ -169,17 +169,19 @@ func (server *Server) aofshrink() {
func() { func() {
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
for name := range server.hooks { hnames = make([]string, 0, server.hooks.Len())
hnames = append(hnames, name) server.hooks.Walk(func(v []interface{}) {
for _, v := range v {
hnames = append(hnames, v.(*Hook).Name)
} }
})
}() }()
// sort the names for consistency var hookHint btree.PathHint
sort.Strings(hnames)
for _, name := range hnames { for _, name := range hnames {
func() { func() {
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
hook := server.hooks[name] hook, _ := server.hooks.GetHint(name, &hookHint).(*Hook)
if hook == nil { if hook == nil {
return return
} }

View File

@ -462,12 +462,14 @@ func (server *Server) cmdRename(msg *Message, nx bool) (res resp.Value, d comman
err = errKeyNotFound err = errKeyNotFound
return return
} }
for _, h := range server.hooks { server.hooks.Ascend(nil, func(v interface{}) bool {
h := v.(*Hook)
if h.Key == d.key || h.Key == d.newKey { if h.Key == d.key || h.Key == d.newKey {
err = errKeyHasHooksSet err = errKeyHasHooksSet
return return false
}
} }
return true
})
d.command = "rename" d.command = "rename"
newCol := server.getCol(d.newKey) newCol := server.getCol(d.newKey)
if newCol == nil { if newCol == nil {
@ -505,14 +507,17 @@ func (server *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails
err = errInvalidNumberOfArguments err = errInvalidNumberOfArguments
return return
} }
// clear the entire database
server.cols = btree.NewNonConcurrent(byCollectionKey) server.cols = btree.NewNonConcurrent(byCollectionKey)
server.groupHooks = btree.NewNonConcurrent(byGroupHook) server.groupHooks = btree.NewNonConcurrent(byGroupHook)
server.groupObjects = btree.NewNonConcurrent(byGroupObject) server.groupObjects = btree.NewNonConcurrent(byGroupObject)
server.hookExpires = btree.NewNonConcurrent(byHookExpires) server.hookExpires = btree.NewNonConcurrent(byHookExpires)
server.hooks = make(map[string]*Hook) server.hooks = btree.NewNonConcurrent(byHookName)
server.hooksOut = make(map[string]*Hook) server.hooksOut = btree.NewNonConcurrent(byHookName)
server.hookTree = &rtree.RTree{} server.hookTree = &rtree.RTree{}
server.hookCross = &rtree.RTree{} server.hookCross = &rtree.RTree{}
d.command = "flushdb" d.command = "flushdb"
d.updated = true d.updated = true
d.timestamp = time.Now() d.timestamp = time.Now()

View File

@ -22,18 +22,8 @@ var hookLogSetDefaults = &buntdb.SetOptions{
TTL: time.Second * 30, TTL: time.Second * 30,
} }
type hooksByName []*Hook func byHookName(a, b interface{}) bool {
return a.(*Hook).Name < b.(*Hook).Name
func (a hooksByName) Len() int {
return len(a)
}
func (a hooksByName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}
func (a hooksByName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
} }
func (s *Server) cmdSetHook(msg *Message, chanCmd bool) ( func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
@ -159,7 +149,7 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
return NOMessage, d, err return NOMessage, d, err
} }
prevHook := s.hooks[name] prevHook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook)
if prevHook != nil { if prevHook != nil {
if prevHook.channel != chanCmd { if prevHook.channel != chanCmd {
return NOMessage, d, return NOMessage, d,
@ -180,8 +170,8 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
} }
} }
prevHook.Close() prevHook.Close()
delete(s.hooks, name) s.hooks.Delete(prevHook)
delete(s.hooksOut, name) s.hooksOut.Delete(prevHook)
if !prevHook.expires.IsZero() { if !prevHook.expires.IsZero() {
s.hookExpires.Delete(prevHook) s.hookExpires.Delete(prevHook)
} }
@ -191,9 +181,9 @@ func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
d.updated = true d.updated = true
d.timestamp = time.Now() d.timestamp = time.Now()
s.hooks[name] = hook s.hooks.Set(hook)
if hook.Fence.detect == nil || hook.Fence.detect["outside"] { if hook.Fence.detect == nil || hook.Fence.detect["outside"] {
s.hooksOut[name] = hook s.hooksOut.Set(hook)
} }
// remove previous hook from spatial index // remove previous hook from spatial index
@ -264,11 +254,12 @@ func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
if len(vs) != 0 { if len(vs) != 0 {
return NOMessage, d, errInvalidNumberOfArguments return NOMessage, d, errInvalidNumberOfArguments
} }
if hook, ok := s.hooks[name]; ok && hook.channel == chanCmd { hook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook)
if hook != nil && hook.channel == chanCmd {
hook.Close() hook.Close()
// remove hook from maps // remove hook from maps
delete(s.hooks, hook.Name) s.hooks.Delete(hook)
delete(s.hooksOut, hook.Name) s.hooksOut.Delete(hook)
if !hook.expires.IsZero() { if !hook.expires.IsZero() {
s.hookExpires.Delete(hook) s.hookExpires.Delete(hook)
} }
@ -320,18 +311,20 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
} }
count := 0 count := 0
for name, hook := range s.hooks { var hooks []*Hook
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
hooks = append(hooks, hook)
return true
})
for _, hook := range hooks {
if hook.channel != channel { if hook.channel != channel {
continue continue
} }
match, _ := glob.Match(pattern, name)
if !match {
continue
}
hook.Close() hook.Close()
// remove hook from maps // remove hook from maps
delete(s.hooks, hook.Name) s.hooks.Delete(hook)
delete(s.hooksOut, hook.Name) s.hooksOut.Delete(hook)
if !hook.expires.IsZero() { if !hook.expires.IsZero() {
s.hookExpires.Delete(hook) s.hookExpires.Delete(hook)
} }
@ -365,6 +358,26 @@ func (s *Server) cmdPDelHook(msg *Message, channel bool) (
return return
} }
func (s *Server) forEachHookByPattern(
pattern string, channel bool, iter func(hook *Hook) bool,
) {
g := glob.Parse(pattern, false)
hasUpperLimit := g.Limits[1] != ""
s.hooks.Ascend(&Hook{Name: g.Limits[0]}, func(v interface{}) bool {
hook := v.(*Hook)
if hasUpperLimit && hook.Name > g.Limits[1] {
return false
}
if hook.channel == channel {
match, _ := glob.Match(pattern, hook.Name)
if match {
return iter(hook)
}
}
return true
})
}
func (s *Server) cmdHooks(msg *Message, channel bool) ( func (s *Server) cmdHooks(msg *Message, channel bool) (
res resp.Value, err error, res resp.Value, err error,
) { ) {
@ -381,18 +394,6 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
return NOMessage, errInvalidNumberOfArguments return NOMessage, errInvalidNumberOfArguments
} }
var hooks []*Hook
for name, hook := range s.hooks {
if hook.channel != channel {
continue
}
match, _ := glob.Match(pattern, name)
if match {
hooks = append(hooks, hook)
}
}
sort.Sort(hooksByName(hooks))
switch msg.OutputType { switch msg.OutputType {
case JSON: case JSON:
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
@ -402,7 +403,8 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
} else { } else {
buf.WriteString(`"hooks":[`) buf.WriteString(`"hooks":[`)
} }
for i, hook := range hooks { var i int
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
var ttl = -1 var ttl = -1
if !hook.expires.IsZero() { if !hook.expires.IsZero() {
ttl = int(hook.expires.Sub(start).Seconds()) ttl = int(hook.expires.Sub(start).Seconds())
@ -444,13 +446,15 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
buf.WriteString(jsonString(meta.Value)) buf.WriteString(jsonString(meta.Value))
} }
buf.WriteString(`}}`) buf.WriteString(`}}`)
} i++
return true
})
buf.WriteString(`],"elapsed":"` + buf.WriteString(`],"elapsed":"` +
time.Since(start).String() + "\"}") time.Since(start).String() + "\"}")
return resp.StringValue(buf.String()), nil return resp.StringValue(buf.String()), nil
case RESP: case RESP:
var vals []resp.Value var vals []resp.Value
for _, hook := range hooks { s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
var hvals []resp.Value var hvals []resp.Value
hvals = append(hvals, resp.StringValue(hook.Name)) hvals = append(hvals, resp.StringValue(hook.Name))
hvals = append(hvals, resp.StringValue(hook.Key)) hvals = append(hvals, resp.StringValue(hook.Key))
@ -471,7 +475,8 @@ func (s *Server) cmdHooks(msg *Message, channel bool) (
} }
hvals = append(hvals, resp.ArrayValue(metas)) hvals = append(hvals, resp.ArrayValue(metas))
vals = append(vals, resp.ArrayValue(hvals)) vals = append(vals, resp.ArrayValue(hvals))
} return true
})
return resp.ArrayValue(vals), nil return resp.ArrayValue(vals), nil
} }
return resp.SimpleStringValue(""), nil return resp.SimpleStringValue(""), nil

View File

@ -116,10 +116,10 @@ type Server struct {
fcuponce bool // follow caught up once fcuponce bool // follow caught up once
shrinking bool // aof shrinking flag shrinking bool // aof shrinking flag
shrinklog [][]string // aof shrinking log shrinklog [][]string // aof shrinking log
hooks map[string]*Hook // hook name hooks *btree.BTree // hook name -- [string]*Hook
hookCross *rtree.RTree // hook spatial tree for "cross" geofences hookCross *rtree.RTree // hook spatial tree for "cross" geofences
hookTree *rtree.RTree // hook spatial tree for all hookTree *rtree.RTree // hook spatial tree for all
hooksOut map[string]*Hook // hooks with "outside" detection hooksOut *btree.BTree // hooks with "outside" detection -- [string]*Hook
groupHooks *btree.BTree // hooks that are connected to objects groupHooks *btree.BTree // hooks that are connected to objects
groupObjects *btree.BTree // objects that are connected to hooks groupObjects *btree.BTree // objects that are connected to hooks
hookExpires *btree.BTree // queue of all hooks marked for expiration hookExpires *btree.BTree // queue of all hooks marked for expiration
@ -164,8 +164,8 @@ func Serve(opts Options) error {
fcond: sync.NewCond(&sync.Mutex{}), fcond: sync.NewCond(&sync.Mutex{}),
lives: make(map[*liveBuffer]bool), lives: make(map[*liveBuffer]bool),
lcond: sync.NewCond(&sync.Mutex{}), lcond: sync.NewCond(&sync.Mutex{}),
hooks: make(map[string]*Hook), hooks: btree.NewNonConcurrent(byHookName),
hooksOut: make(map[string]*Hook), hooksOut: btree.NewNonConcurrent(byHookName),
hookCross: &rtree.RTree{}, hookCross: &rtree.RTree{},
hookTree: &rtree.RTree{}, hookTree: &rtree.RTree{},
aofconnM: make(map[net.Conn]io.Closer), aofconnM: make(map[net.Conn]io.Closer),

View File

@ -157,7 +157,7 @@ func (s *Server) basicStats(m map[string]interface{}) {
m["pid"] = os.Getpid() m["pid"] = os.Getpid()
m["aof_size"] = s.aofsz m["aof_size"] = s.aofsz
m["num_collections"] = s.cols.Len() m["num_collections"] = s.cols.Len()
m["num_hooks"] = len(s.hooks) m["num_hooks"] = s.hooks.Len()
sz := 0 sz := 0
s.cols.Ascend(nil, func(v interface{}) bool { s.cols.Ascend(nil, func(v interface{}) bool {
col := v.(*collectionKeyContainer).col col := v.(*collectionKeyContainer).col
@ -337,7 +337,7 @@ func (s *Server) extStats(m map[string]interface{}) {
// Number of collections in the database // Number of collections in the database
m["tile38_num_collections"] = s.cols.Len() m["tile38_num_collections"] = s.cols.Len()
// Number of hooks in the database // Number of hooks in the database
m["tile38_num_hooks"] = len(s.hooks) m["tile38_num_hooks"] = s.hooks.Len()
// Number of hook groups in the database // Number of hook groups in the database
m["tile38_num_hook_groups"] = s.groupHooks.Len() m["tile38_num_hook_groups"] = s.groupHooks.Len()
// Number of object groups in the database // Number of object groups in the database