mirror of https://github.com/tidwall/tile38.git
Moved root collection keys into generic btree.
Also updated the background expires logic to remove an extra allocation.
This commit is contained in:
parent
dd11eded5c
commit
1177bbb80c
4
go.mod
4
go.mod
|
@ -16,7 +16,7 @@ require (
|
||||||
github.com/peterh/liner v1.2.1
|
github.com/peterh/liner v1.2.1
|
||||||
github.com/prometheus/client_golang v1.12.1
|
github.com/prometheus/client_golang v1.12.1
|
||||||
github.com/streadway/amqp v1.0.0
|
github.com/streadway/amqp v1.0.0
|
||||||
github.com/tidwall/btree v1.4.2
|
github.com/tidwall/btree v1.4.3
|
||||||
github.com/tidwall/buntdb v1.2.9
|
github.com/tidwall/buntdb v1.2.9
|
||||||
github.com/tidwall/geojson v1.3.4
|
github.com/tidwall/geojson v1.3.4
|
||||||
github.com/tidwall/gjson v1.12.1
|
github.com/tidwall/gjson v1.12.1
|
||||||
|
@ -25,7 +25,7 @@ require (
|
||||||
github.com/tidwall/redbench v0.1.0
|
github.com/tidwall/redbench v0.1.0
|
||||||
github.com/tidwall/redcon v1.4.4
|
github.com/tidwall/redcon v1.4.4
|
||||||
github.com/tidwall/resp v0.1.0
|
github.com/tidwall/resp v0.1.0
|
||||||
github.com/tidwall/rtree v1.8.0
|
github.com/tidwall/rtree v1.8.1
|
||||||
github.com/tidwall/sjson v1.2.4
|
github.com/tidwall/sjson v1.2.4
|
||||||
github.com/xdg/scram v1.0.5
|
github.com/xdg/scram v1.0.5
|
||||||
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da
|
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da
|
||||||
|
|
8
go.sum
8
go.sum
|
@ -350,8 +350,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
|
||||||
github.com/tidwall/assert v0.1.0 h1:aWcKyRBUAdLoVebxo95N7+YZVTFF/ASTr7BN4sLP6XI=
|
github.com/tidwall/assert v0.1.0 h1:aWcKyRBUAdLoVebxo95N7+YZVTFF/ASTr7BN4sLP6XI=
|
||||||
github.com/tidwall/assert v0.1.0/go.mod h1:QLYtGyeqse53vuELQheYl9dngGCJQ+mTtlxcktb+Kj8=
|
github.com/tidwall/assert v0.1.0/go.mod h1:QLYtGyeqse53vuELQheYl9dngGCJQ+mTtlxcktb+Kj8=
|
||||||
github.com/tidwall/btree v1.1.0/go.mod h1:TzIRzen6yHbibdSfK6t8QimqbUnoxUSrZfeW7Uob0q4=
|
github.com/tidwall/btree v1.1.0/go.mod h1:TzIRzen6yHbibdSfK6t8QimqbUnoxUSrZfeW7Uob0q4=
|
||||||
github.com/tidwall/btree v1.4.2 h1:PpkaieETJMUxYNADsjgtNRcERX7mGc/GP2zp/r5FM3g=
|
github.com/tidwall/btree v1.4.3 h1:Lf5U/66bk0ftNppOBjVoy/AIPBrLMkheBp4NnSNiYOo=
|
||||||
github.com/tidwall/btree v1.4.2/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
|
github.com/tidwall/btree v1.4.3/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
|
||||||
github.com/tidwall/buntdb v1.2.9 h1:XVz684P7X6HCTrdr385yDZWB1zt/n20ZNG3M1iGyFm4=
|
github.com/tidwall/buntdb v1.2.9 h1:XVz684P7X6HCTrdr385yDZWB1zt/n20ZNG3M1iGyFm4=
|
||||||
github.com/tidwall/buntdb v1.2.9/go.mod h1:IwyGSvvDg6hnKSIhtdZ0AqhCZGH8ukdtCAzaP8fI1X4=
|
github.com/tidwall/buntdb v1.2.9/go.mod h1:IwyGSvvDg6hnKSIhtdZ0AqhCZGH8ukdtCAzaP8fI1X4=
|
||||||
github.com/tidwall/cities v0.1.0 h1:CVNkmMf7NEC9Bvokf5GoSsArHCKRMTgLuubRTHnH0mE=
|
github.com/tidwall/cities v0.1.0 h1:CVNkmMf7NEC9Bvokf5GoSsArHCKRMTgLuubRTHnH0mE=
|
||||||
|
@ -380,8 +380,8 @@ github.com/tidwall/resp v0.1.0/go.mod h1:18xEj855iMY2bK6tNF2A4x+nZy5gWO1iO7OOl3j
|
||||||
github.com/tidwall/rtred v0.1.2 h1:exmoQtOLvDoO8ud++6LwVsAMTu0KPzLTUrMln8u1yu8=
|
github.com/tidwall/rtred v0.1.2 h1:exmoQtOLvDoO8ud++6LwVsAMTu0KPzLTUrMln8u1yu8=
|
||||||
github.com/tidwall/rtred v0.1.2/go.mod h1:hd69WNXQ5RP9vHd7dqekAz+RIdtfBogmglkZSRxCHFQ=
|
github.com/tidwall/rtred v0.1.2/go.mod h1:hd69WNXQ5RP9vHd7dqekAz+RIdtfBogmglkZSRxCHFQ=
|
||||||
github.com/tidwall/rtree v1.3.1/go.mod h1:S+JSsqPTI8LfWA4xHBo5eXzie8WJLVFeppAutSegl6M=
|
github.com/tidwall/rtree v1.3.1/go.mod h1:S+JSsqPTI8LfWA4xHBo5eXzie8WJLVFeppAutSegl6M=
|
||||||
github.com/tidwall/rtree v1.8.0 h1:nYVLh9UKJrd4CZCNawD3WbHNxmI9LYR4j3E2hqO3tjQ=
|
github.com/tidwall/rtree v1.8.1 h1:Hv0gvvznkDI5YwBkJp9pYh8ZEU1L2A9puLwwGPcQ9j4=
|
||||||
github.com/tidwall/rtree v1.8.0/go.mod h1:iDJQ9NBRtbfKkzZu02za+mIlaP+bjYPnunbSNidpbCQ=
|
github.com/tidwall/rtree v1.8.1/go.mod h1:iDJQ9NBRtbfKkzZu02za+mIlaP+bjYPnunbSNidpbCQ=
|
||||||
github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc=
|
github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc=
|
||||||
github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM=
|
github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM=
|
||||||
github.com/tidwall/tinyqueue v0.1.1 h1:SpNEvEggbpyN5DIReaJ2/1ndroY8iyEGxPYxoSaymYE=
|
github.com/tidwall/tinyqueue v0.1.1 h1:SpNEvEggbpyN5DIReaJ2/1ndroY8iyEGxPYxoSaymYE=
|
||||||
|
|
|
@ -789,15 +789,9 @@ func nextStep(step uint64, cursor Cursor, deadline *deadline.Deadline) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expired returns a list of all objects that have expired.
|
// ScanExpires returns a list of all objects that have expired.
|
||||||
func (c *Collection) Expired(now int64, buffer []string) (ids []string) {
|
func (c *Collection) ScanExpires(iter func(id string, expires int64) bool) {
|
||||||
ids = buffer[:0]
|
|
||||||
c.expires.Scan(func(item *itemT) bool {
|
c.expires.Scan(func(item *itemT) bool {
|
||||||
if now < item.expires {
|
return iter(item.id, item.expires)
|
||||||
return false
|
|
||||||
}
|
|
||||||
ids = append(ids, item.id)
|
|
||||||
return true
|
|
||||||
})
|
})
|
||||||
return ids
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,7 +61,8 @@ func (s *Server) aofshrink() {
|
||||||
func() {
|
func() {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
|
s.cols.Ascend(nextkey,
|
||||||
|
func(key string, col *collection.Collection) bool {
|
||||||
if len(keys) == maxkeys {
|
if len(keys) == maxkeys {
|
||||||
keysdone = false
|
keysdone = false
|
||||||
nextkey = key
|
nextkey = key
|
||||||
|
@ -69,7 +70,8 @@ func (s *Server) aofshrink() {
|
||||||
}
|
}
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
return true
|
return true
|
||||||
})
|
},
|
||||||
|
)
|
||||||
}()
|
}()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -87,8 +89,8 @@ func (s *Server) aofshrink() {
|
||||||
idsdone = true
|
idsdone = true
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
col := s.getCol(keys[0])
|
col, ok := s.cols.Get(keys[0])
|
||||||
if col == nil {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var fnames = col.FieldArr() // reload an array of field names to match each object
|
var fnames = col.FieldArr() // reload an array of field names to match each object
|
||||||
|
|
|
@ -7,11 +7,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mmcloughlin/geohash"
|
"github.com/mmcloughlin/geohash"
|
||||||
"github.com/tidwall/btree"
|
|
||||||
"github.com/tidwall/geojson"
|
"github.com/tidwall/geojson"
|
||||||
"github.com/tidwall/geojson/geometry"
|
"github.com/tidwall/geojson/geometry"
|
||||||
"github.com/tidwall/resp"
|
"github.com/tidwall/resp"
|
||||||
"github.com/tidwall/rtree"
|
|
||||||
"github.com/tidwall/tile38/internal/collection"
|
"github.com/tidwall/tile38/internal/collection"
|
||||||
"github.com/tidwall/tile38/internal/glob"
|
"github.com/tidwall/tile38/internal/glob"
|
||||||
)
|
)
|
||||||
|
@ -50,7 +48,7 @@ func (s *Server) cmdBounds(msg *Message) (resp.Value, error) {
|
||||||
return NOMessage, errInvalidNumberOfArguments
|
return NOMessage, errInvalidNumberOfArguments
|
||||||
}
|
}
|
||||||
|
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
if msg.OutputType == RESP {
|
if msg.OutputType == RESP {
|
||||||
return resp.NullValue(), nil
|
return resp.NullValue(), nil
|
||||||
|
@ -104,7 +102,7 @@ func (s *Server) cmdType(msg *Message) (resp.Value, error) {
|
||||||
return NOMessage, errInvalidNumberOfArguments
|
return NOMessage, errInvalidNumberOfArguments
|
||||||
}
|
}
|
||||||
|
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
if msg.OutputType == RESP {
|
if msg.OutputType == RESP {
|
||||||
return resp.SimpleStringValue("none"), nil
|
return resp.SimpleStringValue("none"), nil
|
||||||
|
@ -142,7 +140,7 @@ func (s *Server) cmdGet(msg *Message) (resp.Value, error) {
|
||||||
vs = vs[1:]
|
vs = vs[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
if msg.OutputType == RESP {
|
if msg.OutputType == RESP {
|
||||||
return resp.NullValue(), nil
|
return resp.NullValue(), nil
|
||||||
|
@ -306,12 +304,12 @@ func (s *Server) cmdDel(msg *Message) (res resp.Value, d commandDetails, err err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
found := false
|
found := false
|
||||||
col := s.getCol(d.key)
|
col, _ := s.cols.Get(d.key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
d.obj, d.fields, ok = col.Delete(d.id)
|
d.obj, d.fields, ok = col.Delete(d.id)
|
||||||
if ok {
|
if ok {
|
||||||
if col.Count() == 0 {
|
if col.Count() == 0 {
|
||||||
s.deleteCol(d.key)
|
s.cols.Delete(d.key)
|
||||||
}
|
}
|
||||||
found = true
|
found = true
|
||||||
} else if erron404 {
|
} else if erron404 {
|
||||||
|
@ -370,7 +368,7 @@ func (s *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
}
|
}
|
||||||
|
|
||||||
var expired int
|
var expired int
|
||||||
col := s.getCol(d.key)
|
col, _ := s.cols.Get(d.key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
g := glob.Parse(d.pattern, false)
|
g := glob.Parse(d.pattern, false)
|
||||||
if g.Limits[0] == "" && g.Limits[1] == "" {
|
if g.Limits[0] == "" && g.Limits[1] == "" {
|
||||||
|
@ -399,7 +397,7 @@ func (s *Server) cmdPdel(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
d.children = nchildren
|
d.children = nchildren
|
||||||
}
|
}
|
||||||
if col.Count() == 0 {
|
if col.Count() == 0 {
|
||||||
s.deleteCol(d.key)
|
s.cols.Delete(d.key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.command = "pdel"
|
d.command = "pdel"
|
||||||
|
@ -431,9 +429,9 @@ func (s *Server) cmdDrop(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
err = errInvalidNumberOfArguments
|
err = errInvalidNumberOfArguments
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
col := s.getCol(d.key)
|
col, _ := s.cols.Get(d.key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
s.deleteCol(d.key)
|
s.cols.Delete(d.key)
|
||||||
d.updated = true
|
d.updated = true
|
||||||
} else {
|
} else {
|
||||||
d.key = "" // ignore the details
|
d.key = "" // ignore the details
|
||||||
|
@ -472,7 +470,7 @@ func (s *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err
|
||||||
err = errInvalidNumberOfArguments
|
err = errInvalidNumberOfArguments
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
col := s.getCol(d.key)
|
col, _ := s.cols.Get(d.key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
err = errKeyNotFound
|
err = errKeyNotFound
|
||||||
return
|
return
|
||||||
|
@ -486,18 +484,18 @@ func (s *Server) cmdRename(msg *Message) (res resp.Value, d commandDetails, err
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
d.command = "rename"
|
d.command = "rename"
|
||||||
newCol := s.getCol(d.newKey)
|
newCol, _ := s.cols.Get(d.newKey)
|
||||||
if newCol == nil {
|
if newCol == nil {
|
||||||
d.updated = true
|
d.updated = true
|
||||||
} else if nx {
|
} else if nx {
|
||||||
d.updated = false
|
d.updated = false
|
||||||
} else {
|
} else {
|
||||||
s.deleteCol(d.newKey)
|
s.cols.Delete(d.newKey)
|
||||||
d.updated = true
|
d.updated = true
|
||||||
}
|
}
|
||||||
if d.updated {
|
if d.updated {
|
||||||
s.deleteCol(d.key)
|
s.cols.Delete(d.key)
|
||||||
s.setCol(d.newKey, col)
|
s.cols.Set(d.newKey, col)
|
||||||
}
|
}
|
||||||
d.timestamp = time.Now()
|
d.timestamp = time.Now()
|
||||||
switch msg.OutputType {
|
switch msg.OutputType {
|
||||||
|
@ -524,14 +522,14 @@ func (s *Server) cmdFlushDB(msg *Message) (res resp.Value, d commandDetails, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear the entire database
|
// clear the entire database
|
||||||
s.cols = btree.NewNonConcurrent(byCollectionKey)
|
s.cols.Clear()
|
||||||
s.groupHooks = btree.NewNonConcurrent(byGroupHook)
|
s.groupHooks.Clear()
|
||||||
s.groupObjects = btree.NewNonConcurrent(byGroupObject)
|
s.groupObjects.Clear()
|
||||||
s.hookExpires = btree.NewNonConcurrent(byHookExpires)
|
s.hookExpires.Clear()
|
||||||
s.hooks = btree.NewNonConcurrent(byHookName)
|
s.hooks.Clear()
|
||||||
s.hooksOut = btree.NewNonConcurrent(byHookName)
|
s.hooksOut.Clear()
|
||||||
s.hookTree = &rtree.RTree{}
|
s.hookTree.Clear()
|
||||||
s.hookCross = &rtree.RTree{}
|
s.hookCross.Clear()
|
||||||
|
|
||||||
d.command = "flushdb"
|
d.command = "flushdb"
|
||||||
d.updated = true
|
d.updated = true
|
||||||
|
@ -781,13 +779,13 @@ func (s *Server) cmdSet(msg *Message) (res resp.Value, d commandDetails, err err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
col := s.getCol(d.key)
|
col, _ := s.cols.Get(d.key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
if xx {
|
if xx {
|
||||||
goto notok
|
goto notok
|
||||||
}
|
}
|
||||||
col = collection.New()
|
col = collection.New()
|
||||||
s.setCol(d.key, col)
|
s.cols.Set(d.key, col)
|
||||||
}
|
}
|
||||||
if xx || nx {
|
if xx || nx {
|
||||||
_, _, _, ok := col.Get(d.id)
|
_, _, _, ok := col.Get(d.id)
|
||||||
|
@ -890,7 +888,7 @@ func (s *Server) cmdFset(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
var updateCount int
|
var updateCount int
|
||||||
d, fields, values, xx, err = s.parseFSetArgs(vs)
|
d, fields, values, xx, err = s.parseFSetArgs(vs)
|
||||||
|
|
||||||
col := s.getCol(d.key)
|
col, _ := s.cols.Get(d.key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
err = errKeyNotFound
|
err = errKeyNotFound
|
||||||
return
|
return
|
||||||
|
@ -949,7 +947,7 @@ func (s *Server) cmdExpire(msg *Message) (res resp.Value, d commandDetails, err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ok = false
|
ok = false
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
ex := time.Now().Add(time.Duration(float64(time.Second) * value)).UnixNano()
|
ex := time.Now().Add(time.Duration(float64(time.Second) * value)).UnixNano()
|
||||||
ok = col.SetExpires(id, ex)
|
ok = col.SetExpires(id, ex)
|
||||||
|
@ -993,7 +991,7 @@ func (s *Server) cmdPersist(msg *Message) (res resp.Value, d commandDetails, err
|
||||||
}
|
}
|
||||||
var cleared bool
|
var cleared bool
|
||||||
ok = false
|
ok = false
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
var ex int64
|
var ex int64
|
||||||
_, _, ex, ok = col.Get(id)
|
_, _, ex, ok = col.Get(id)
|
||||||
|
@ -1046,7 +1044,7 @@ func (s *Server) cmdTTL(msg *Message) (res resp.Value, err error) {
|
||||||
var v float64
|
var v float64
|
||||||
ok = false
|
ok = false
|
||||||
var ok2 bool
|
var ok2 bool
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
var ex int64
|
var ex int64
|
||||||
_, _, ex, ok = col.Get(id)
|
_, _, ex, ok = col.Get(id)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package server
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/tidwall/tile38/internal/collection"
|
||||||
"github.com/tidwall/tile38/internal/log"
|
"github.com/tidwall/tile38/internal/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,16 +29,15 @@ func (s *Server) backgroundExpiring() {
|
||||||
|
|
||||||
func (s *Server) backgroundExpireObjects(now time.Time) {
|
func (s *Server) backgroundExpireObjects(now time.Time) {
|
||||||
nano := now.UnixNano()
|
nano := now.UnixNano()
|
||||||
var ids []string
|
|
||||||
var msgs []*Message
|
var msgs []*Message
|
||||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
s.cols.Scan(func(key string, col *collection.Collection) bool {
|
||||||
col := v.(*collectionKeyContainer)
|
col.ScanExpires(func(id string, expires int64) bool {
|
||||||
ids = col.col.Expired(nano, ids[:0])
|
if nano < expires {
|
||||||
for _, id := range ids {
|
return false
|
||||||
msgs = append(msgs, &Message{
|
|
||||||
Args: []string{"del", col.key, id},
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
msgs = append(msgs, &Message{Args: []string{"del", key, id}})
|
||||||
|
return true
|
||||||
|
})
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
|
@ -52,7 +52,6 @@ func (s *Server) backgroundExpireObjects(now time.Time) {
|
||||||
if len(msgs) > 0 {
|
if len(msgs) > 0 {
|
||||||
log.Debugf("Expired %d objects\n", len(msgs))
|
log.Debugf("Expired %d objects\n", len(msgs))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) backgroundExpireHooks(now time.Time) {
|
func (s *Server) backgroundExpireHooks(now time.Time) {
|
||||||
|
|
|
@ -288,7 +288,7 @@ func extendRoamMessage(
|
||||||
math.Floor(match.meters*1000)/1000, 'f', -1, 64)
|
math.Floor(match.meters*1000)/1000, 'f', -1, 64)
|
||||||
if fence.roam.scan != "" {
|
if fence.roam.scan != "" {
|
||||||
nmsg = append(nmsg, `,"scan":[`...)
|
nmsg = append(nmsg, `,"scan":[`...)
|
||||||
col := sw.s.getCol(fence.roam.key)
|
col, _ := sw.s.cols.Get(fence.roam.key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
obj, _, _, ok := col.Get(match.id)
|
obj, _, _, ok := col.Get(match.id)
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -375,7 +375,7 @@ func fenceMatchNearbys(
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
col := s.getCol(fence.roam.key)
|
col, _ := s.cols.Get(fence.roam.key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,7 +184,7 @@ func (s *Server) cmdJget(msg *Message) (resp.Value, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
if msg.OutputType == RESP {
|
if msg.OutputType == RESP {
|
||||||
return resp.NullValue(), nil
|
return resp.NullValue(), nil
|
||||||
|
@ -262,7 +262,7 @@ func (s *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
raw = true
|
raw = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
var createcol bool
|
var createcol bool
|
||||||
if col == nil {
|
if col == nil {
|
||||||
col = collection.New()
|
col = collection.New()
|
||||||
|
@ -293,7 +293,7 @@ func (s *Server) cmdJset(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
return s.cmdSet(&nmsg)
|
return s.cmdSet(&nmsg)
|
||||||
}
|
}
|
||||||
if createcol {
|
if createcol {
|
||||||
s.setCol(key, col)
|
s.cols.Set(key, col)
|
||||||
}
|
}
|
||||||
|
|
||||||
d.key = key
|
d.key = key
|
||||||
|
@ -325,7 +325,7 @@ func (s *Server) cmdJdel(msg *Message) (res resp.Value, d commandDetails, err er
|
||||||
id := msg.Args[2]
|
id := msg.Args[2]
|
||||||
path := msg.Args[3]
|
path := msg.Args[3]
|
||||||
|
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
if msg.OutputType == RESP {
|
if msg.OutputType == RESP {
|
||||||
return resp.IntegerValue(0), d, nil
|
return resp.IntegerValue(0), d, nil
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tidwall/resp"
|
"github.com/tidwall/resp"
|
||||||
|
"github.com/tidwall/tile38/internal/collection"
|
||||||
"github.com/tidwall/tile38/internal/glob"
|
"github.com/tidwall/tile38/internal/glob"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,18 +37,17 @@ func (s *Server) cmdKeys(msg *Message) (res resp.Value, err error) {
|
||||||
var greaterPivot string
|
var greaterPivot string
|
||||||
var vals []resp.Value
|
var vals []resp.Value
|
||||||
|
|
||||||
iterator := func(v interface{}) bool {
|
iter := func(key string, col *collection.Collection) bool {
|
||||||
vcol := v.(*collectionKeyContainer)
|
|
||||||
var match bool
|
var match bool
|
||||||
if everything {
|
if everything {
|
||||||
match = true
|
match = true
|
||||||
} else if greater {
|
} else if greater {
|
||||||
if !strings.HasPrefix(vcol.key, greaterPivot) {
|
if !strings.HasPrefix(key, greaterPivot) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
match = true
|
match = true
|
||||||
} else {
|
} else {
|
||||||
match, _ = glob.Match(pattern, vcol.key)
|
match, _ = glob.Match(pattern, key)
|
||||||
}
|
}
|
||||||
if match {
|
if match {
|
||||||
if once {
|
if once {
|
||||||
|
@ -59,9 +59,9 @@ func (s *Server) cmdKeys(msg *Message) (res resp.Value, err error) {
|
||||||
}
|
}
|
||||||
switch msg.OutputType {
|
switch msg.OutputType {
|
||||||
case JSON:
|
case JSON:
|
||||||
wr.WriteString(jsonString(vcol.key))
|
wr.WriteString(jsonString(key))
|
||||||
case RESP:
|
case RESP:
|
||||||
vals = append(vals, resp.StringValue(vcol.key))
|
vals = append(vals, resp.StringValue(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no more than one match is expected, stop searching
|
// If no more than one match is expected, stop searching
|
||||||
|
@ -75,17 +75,17 @@ func (s *Server) cmdKeys(msg *Message) (res resp.Value, err error) {
|
||||||
// TODO: This can be further optimized by using glob.Parse and limits
|
// TODO: This can be further optimized by using glob.Parse and limits
|
||||||
if pattern == "*" {
|
if pattern == "*" {
|
||||||
everything = true
|
everything = true
|
||||||
s.cols.Ascend(nil, iterator)
|
s.cols.Scan(iter)
|
||||||
} else if strings.HasSuffix(pattern, "*") {
|
} else if strings.HasSuffix(pattern, "*") {
|
||||||
greaterPivot = pattern[:len(pattern)-1]
|
greaterPivot = pattern[:len(pattern)-1]
|
||||||
if glob.IsGlob(greaterPivot) {
|
if glob.IsGlob(greaterPivot) {
|
||||||
s.cols.Ascend(nil, iterator)
|
s.cols.Scan(iter)
|
||||||
} else {
|
} else {
|
||||||
greater = true
|
greater = true
|
||||||
s.cols.Ascend(&collectionKeyContainer{key: greaterPivot}, iterator)
|
s.cols.Ascend(greaterPivot, iter)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s.cols.Ascend(nil, iterator)
|
s.cols.Scan(iter)
|
||||||
}
|
}
|
||||||
if msg.OutputType == JSON {
|
if msg.OutputType == JSON {
|
||||||
wr.WriteString(`],"elapsed":"` + time.Since(start).String() + "\"}")
|
wr.WriteString(`],"elapsed":"` + time.Since(start).String() + "\"}")
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/tidwall/tile38/core"
|
"github.com/tidwall/tile38/core"
|
||||||
|
"github.com/tidwall/tile38/internal/collection"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||||
|
@ -121,31 +122,30 @@ func (s *Server) Collect(ch chan<- prometheus.Metric) {
|
||||||
/*
|
/*
|
||||||
add objects/points/strings stats for each collection
|
add objects/points/strings stats for each collection
|
||||||
*/
|
*/
|
||||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
s.cols.Scan(func(key string, col *collection.Collection) bool {
|
||||||
c := v.(*collectionKeyContainer)
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
metricDescriptions["collection_objects"],
|
metricDescriptions["collection_objects"],
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(c.col.Count()),
|
float64(col.Count()),
|
||||||
c.key,
|
key,
|
||||||
)
|
)
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
metricDescriptions["collection_points"],
|
metricDescriptions["collection_points"],
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(c.col.PointCount()),
|
float64(col.PointCount()),
|
||||||
c.key,
|
key,
|
||||||
)
|
)
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
metricDescriptions["collection_strings"],
|
metricDescriptions["collection_strings"],
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(c.col.StringCount()),
|
float64(col.StringCount()),
|
||||||
c.key,
|
key,
|
||||||
)
|
)
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
metricDescriptions["collection_weight"],
|
metricDescriptions["collection_weight"],
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(c.col.TotalWeight()),
|
float64(col.TotalWeight()),
|
||||||
c.key,
|
key,
|
||||||
)
|
)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
|
@ -126,7 +126,7 @@ func (sw *scanWriter) loadWheres() {
|
||||||
sw.wheres = nil
|
sw.wheres = nil
|
||||||
sw.whereins = nil
|
sw.whereins = nil
|
||||||
sw.fvals = nil
|
sw.fvals = nil
|
||||||
sw.col = sw.s.getCol(sw.key)
|
sw.col, _ = sw.s.cols.Get(sw.key)
|
||||||
if sw.col != nil {
|
if sw.col != nil {
|
||||||
sw.fmap = sw.col.FieldMap()
|
sw.fmap = sw.col.FieldMap()
|
||||||
sw.farr = sw.col.FieldArr()
|
sw.farr = sw.col.FieldArr()
|
||||||
|
|
|
@ -370,7 +370,7 @@ func (s *Server) cmdSearchArgs(
|
||||||
err = errInvalidNumberOfArguments
|
err = errInvalidNumberOfArguments
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
err = errKeyNotFound
|
err = errKeyNotFound
|
||||||
return
|
return
|
||||||
|
|
|
@ -106,7 +106,8 @@ type Server struct {
|
||||||
aofsz int // active size of the aof file
|
aofsz int // active size of the aof file
|
||||||
qdb *buntdb.DB // hook queue log
|
qdb *buntdb.DB // hook queue log
|
||||||
qidx uint64 // hook queue log last idx
|
qidx uint64 // hook queue log last idx
|
||||||
cols *btree.BTree // data collections
|
|
||||||
|
cols *btree.Map[string, *collection.Collection] // data collections
|
||||||
|
|
||||||
follows map[*bytes.Buffer]bool
|
follows map[*bytes.Buffer]bool
|
||||||
fcond *sync.Cond
|
fcond *sync.Cond
|
||||||
|
@ -177,7 +178,7 @@ func Serve(opts Options) error {
|
||||||
http: opts.UseHTTP,
|
http: opts.UseHTTP,
|
||||||
pubsub: newPubsub(),
|
pubsub: newPubsub(),
|
||||||
monconns: make(map[net.Conn]bool),
|
monconns: make(map[net.Conn]bool),
|
||||||
cols: btree.NewNonConcurrent(byCollectionKey),
|
cols: &btree.Map[string, *collection.Collection]{},
|
||||||
|
|
||||||
groupHooks: btree.NewNonConcurrent(byGroupHook),
|
groupHooks: btree.NewNonConcurrent(byGroupHook),
|
||||||
groupObjects: btree.NewNonConcurrent(byGroupObject),
|
groupObjects: btree.NewNonConcurrent(byGroupObject),
|
||||||
|
@ -673,47 +674,6 @@ func (s *Server) backgroundSyncAOF() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// collectionKeyContainer is a wrapper object around a collection that includes
|
|
||||||
// the collection and the key. It's needed for support with the btree package,
|
|
||||||
// which requires a comparator less function.
|
|
||||||
type collectionKeyContainer struct {
|
|
||||||
key string
|
|
||||||
col *collection.Collection
|
|
||||||
}
|
|
||||||
|
|
||||||
func byCollectionKey(a, b interface{}) bool {
|
|
||||||
return a.(*collectionKeyContainer).key < b.(*collectionKeyContainer).key
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) setCol(key string, col *collection.Collection) {
|
|
||||||
s.cols.Set(&collectionKeyContainer{key, col})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) getCol(key string) *collection.Collection {
|
|
||||||
if v := s.cols.Get(&collectionKeyContainer{key: key}); v != nil {
|
|
||||||
return v.(*collectionKeyContainer).col
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) scanGreaterOrEqual(
|
|
||||||
key string, iterator func(key string, col *collection.Collection) bool,
|
|
||||||
) {
|
|
||||||
s.cols.Ascend(&collectionKeyContainer{key: key},
|
|
||||||
func(v interface{}) bool {
|
|
||||||
vcol := v.(*collectionKeyContainer)
|
|
||||||
return iterator(vcol.key, vcol.col)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) deleteCol(key string) *collection.Collection {
|
|
||||||
if v := s.cols.Delete(&collectionKeyContainer{key: key}); v != nil {
|
|
||||||
return v.(*collectionKeyContainer).col
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func isReservedFieldName(field string) bool {
|
func isReservedFieldName(field string) bool {
|
||||||
switch field {
|
switch field {
|
||||||
case "z", "lat", "lon":
|
case "z", "lat", "lon":
|
||||||
|
@ -1046,7 +1006,7 @@ func randomKey(n int) string {
|
||||||
|
|
||||||
func (s *Server) reset() {
|
func (s *Server) reset() {
|
||||||
s.aofsz = 0
|
s.aofsz = 0
|
||||||
s.cols = btree.NewNonConcurrent(byCollectionKey)
|
s.cols.Clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) command(msg *Message, client *Client) (
|
func (s *Server) command(msg *Message, client *Client) (
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/tidwall/buntdb"
|
"github.com/tidwall/buntdb"
|
||||||
"github.com/tidwall/resp"
|
"github.com/tidwall/resp"
|
||||||
"github.com/tidwall/tile38/core"
|
"github.com/tidwall/tile38/core"
|
||||||
|
"github.com/tidwall/tile38/internal/collection"
|
||||||
)
|
)
|
||||||
|
|
||||||
var memStats runtime.MemStats
|
var memStats runtime.MemStats
|
||||||
|
@ -60,7 +61,7 @@ func (s *Server) cmdStats(msg *Message) (res resp.Value, err error) {
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col != nil {
|
if col != nil {
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["num_points"] = col.PointCount()
|
m["num_points"] = col.PointCount()
|
||||||
|
@ -160,8 +161,7 @@ func (s *Server) basicStats(m map[string]interface{}) {
|
||||||
m["num_collections"] = s.cols.Len()
|
m["num_collections"] = s.cols.Len()
|
||||||
m["num_hooks"] = s.hooks.Len()
|
m["num_hooks"] = s.hooks.Len()
|
||||||
sz := 0
|
sz := 0
|
||||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
s.cols.Scan(func(key string, col *collection.Collection) bool {
|
||||||
col := v.(*collectionKeyContainer).col
|
|
||||||
sz += col.TotalWeight()
|
sz += col.TotalWeight()
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -169,8 +169,7 @@ func (s *Server) basicStats(m map[string]interface{}) {
|
||||||
points := 0
|
points := 0
|
||||||
objects := 0
|
objects := 0
|
||||||
nstrings := 0
|
nstrings := 0
|
||||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
s.cols.Scan(func(key string, col *collection.Collection) bool {
|
||||||
col := v.(*collectionKeyContainer).col
|
|
||||||
points += col.PointCount()
|
points += col.PointCount()
|
||||||
objects += col.Count()
|
objects += col.Count()
|
||||||
nstrings += col.StringCount()
|
nstrings += col.StringCount()
|
||||||
|
@ -333,8 +332,7 @@ func (s *Server) extStats(m map[string]interface{}) {
|
||||||
points := 0
|
points := 0
|
||||||
objects := 0
|
objects := 0
|
||||||
strings := 0
|
strings := 0
|
||||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
s.cols.Scan(func(key string, col *collection.Collection) bool {
|
||||||
col := v.(*collectionKeyContainer).col
|
|
||||||
points += col.PointCount()
|
points += col.PointCount()
|
||||||
objects += col.Count()
|
objects += col.Count()
|
||||||
strings += col.StringCount()
|
strings += col.StringCount()
|
||||||
|
@ -365,8 +363,7 @@ func (s *Server) extStats(m map[string]interface{}) {
|
||||||
m["tile38_avg_point_size"] = avgsz
|
m["tile38_avg_point_size"] = avgsz
|
||||||
|
|
||||||
sz := 0
|
sz := 0
|
||||||
s.cols.Ascend(nil, func(v interface{}) bool {
|
s.cols.Scan(func(key string, col *collection.Collection) bool {
|
||||||
col := v.(*collectionKeyContainer).col
|
|
||||||
sz += col.TotalWeight()
|
sz += col.TotalWeight()
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
|
@ -273,7 +273,7 @@ func (s *Server) parseArea(ovs []string, doClip bool) (vs []string, o geojson.Ob
|
||||||
err = errInvalidNumberOfArguments
|
err = errInvalidNumberOfArguments
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
col := s.getCol(key)
|
col, _ := s.cols.Get(key)
|
||||||
if col == nil {
|
if col == nil {
|
||||||
err = errKeyNotFound
|
err = errKeyNotFound
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue