tile38/internal/server/scanner.go

468 lines
11 KiB
Go
Raw Normal View History

package server
2016-03-05 02:08:16 +03:00
import (
"bytes"
"errors"
2017-08-10 23:31:36 +03:00
"math"
2016-03-05 02:08:16 +03:00
"strconv"
"github.com/mmcloughlin/geohash"
2022-09-20 03:47:38 +03:00
"github.com/tidwall/btree"
"github.com/tidwall/geojson"
2016-03-29 00:16:21 +03:00
"github.com/tidwall/resp"
"github.com/tidwall/tile38/internal/clip"
"github.com/tidwall/tile38/internal/collection"
2022-09-20 03:47:38 +03:00
"github.com/tidwall/tile38/internal/field"
"github.com/tidwall/tile38/internal/glob"
2016-03-05 02:08:16 +03:00
)
const limitItems = 100
type outputT int
const (
outputUnknown outputT = iota
outputIDs
outputObjects
outputCount
outputPoints
outputHashes
outputBounds
)
type scanWriter struct {
s *Server
2016-03-05 02:08:16 +03:00
wr *bytes.Buffer
2022-09-20 03:47:38 +03:00
name string
msg *Message
2016-03-05 02:08:16 +03:00
col *collection.Collection
2022-09-20 03:47:38 +03:00
fkeys btree.Set[string]
2016-03-05 02:08:16 +03:00
output outputT
wheres []whereT
2017-08-23 23:13:12 +03:00
whereins []whereinT
whereevals []whereevalT
2018-11-01 08:00:09 +03:00
numberIters uint64
2016-03-05 02:08:16 +03:00
numberItems uint64
nofields bool
cursor uint64
2016-03-05 02:08:16 +03:00
limit uint64
hitLimit bool
once bool
count uint64
precision uint64
globs []string
2016-03-05 02:08:16 +03:00
globEverything bool
fullFields bool
2016-03-29 00:16:21 +03:00
values []resp.Value
2016-07-13 06:11:02 +03:00
matchValues bool
2017-10-05 18:20:40 +03:00
respOut resp.Value
2022-09-20 03:47:38 +03:00
filled []ScanWriterParams
2016-03-05 02:08:16 +03:00
}
2017-01-10 19:49:48 +03:00
type ScanWriterParams struct {
2021-07-11 20:09:51 +03:00
id string
o geojson.Object
2022-09-20 03:47:38 +03:00
fields field.List
2021-07-11 20:09:51 +03:00
distance float64
distOutput bool // query or fence requested distance output
noTest bool
2021-07-11 20:09:51 +03:00
ignoreGlobMatch bool
clip geojson.Object
skipTesting bool
2017-01-10 19:49:48 +03:00
}
func (s *Server) newScanWriter(
2022-09-20 03:47:38 +03:00
wr *bytes.Buffer, msg *Message, name string, output outputT,
precision uint64, globs []string, matchValues bool,
2021-12-09 19:24:26 +03:00
cursor, limit uint64, wheres []whereT, whereins []whereinT,
whereevals []whereevalT, nofields bool,
2016-03-05 02:08:16 +03:00
) (
*scanWriter, error,
) {
switch output {
default:
return nil, errors.New("invalid output type")
case outputIDs, outputObjects, outputCount, outputBounds, outputPoints, outputHashes:
}
2017-08-10 23:31:36 +03:00
if limit == 0 {
if output == outputCount {
limit = math.MaxUint64
} else {
limit = limitItems
}
}
2016-03-05 02:08:16 +03:00
sw := &scanWriter{
s: s,
2016-07-13 06:11:02 +03:00
wr: wr,
2022-09-20 03:47:38 +03:00
name: name,
2016-07-13 06:11:02 +03:00
msg: msg,
globs: globs,
2016-07-13 07:51:01 +03:00
limit: limit,
2021-12-09 19:24:26 +03:00
cursor: cursor,
2016-07-13 07:51:01 +03:00
output: output,
2016-07-13 06:11:02 +03:00
nofields: nofields,
2016-07-13 07:51:01 +03:00
precision: precision,
2021-12-09 19:24:26 +03:00
whereevals: whereevals,
2016-07-13 06:11:02 +03:00
matchValues: matchValues,
2016-03-05 02:08:16 +03:00
}
if len(globs) == 0 || (len(globs) == 1 && globs[0] == "*") {
2016-03-05 02:08:16 +03:00
sw.globEverything = true
}
2022-09-20 03:47:38 +03:00
sw.wheres = wheres
sw.whereins = whereins
sw.col, _ = sw.s.cols.Get(sw.name)
2022-08-31 02:50:19 +03:00
return sw, nil
}
2016-03-05 02:08:16 +03:00
func (sw *scanWriter) hasFieldsOutput() bool {
switch sw.output {
default:
return false
case outputObjects, outputPoints, outputHashes, outputBounds:
return !sw.nofields
}
}
2022-09-20 03:47:38 +03:00
func (sw *scanWriter) writeFoot() {
2016-03-29 00:16:21 +03:00
switch sw.msg.OutputType {
case JSON:
2022-09-20 03:47:38 +03:00
if sw.fkeys.Len() > 0 && sw.hasFieldsOutput() {
2016-03-29 00:16:21 +03:00
sw.wr.WriteString(`,"fields":[`)
2022-09-20 03:47:38 +03:00
var i int
sw.fkeys.Scan(func(name string) bool {
2016-03-29 00:16:21 +03:00
if i > 0 {
sw.wr.WriteByte(',')
}
2022-09-20 03:47:38 +03:00
sw.wr.WriteString(jsonString(name))
i++
return true
})
2016-03-29 00:16:21 +03:00
sw.wr.WriteByte(']')
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
switch sw.output {
case outputIDs:
sw.wr.WriteString(`,"ids":[`)
case outputObjects:
sw.wr.WriteString(`,"objects":[`)
case outputPoints:
sw.wr.WriteString(`,"points":[`)
case outputBounds:
sw.wr.WriteString(`,"bounds":[`)
case outputHashes:
sw.wr.WriteString(`,"hashes":[`)
case outputCount:
2016-03-05 02:08:16 +03:00
2016-03-29 00:16:21 +03:00
}
case RESP:
2016-03-05 02:08:16 +03:00
}
2022-09-20 03:47:38 +03:00
for _, opts := range sw.filled {
sw.writeFilled(opts)
}
2018-11-01 08:00:09 +03:00
cursor := sw.numberIters
2016-03-05 02:08:16 +03:00
if !sw.hitLimit {
cursor = 0
}
2016-03-29 00:16:21 +03:00
switch sw.msg.OutputType {
case JSON:
2016-03-29 00:16:21 +03:00
switch sw.output {
default:
sw.wr.WriteByte(']')
case outputCount:
2016-03-05 02:08:16 +03:00
2016-03-29 00:16:21 +03:00
}
sw.wr.WriteString(`,"count":` + strconv.FormatUint(sw.count, 10))
sw.wr.WriteString(`,"cursor":` + strconv.FormatUint(cursor, 10))
case RESP:
2016-05-24 00:21:18 +03:00
if sw.output == outputCount {
2017-10-05 18:20:40 +03:00
sw.respOut = resp.IntegerValue(int(sw.count))
2016-05-24 00:21:18 +03:00
} else {
values := []resp.Value{
resp.IntegerValue(int(cursor)),
resp.ArrayValue(sw.values),
}
2017-10-05 18:20:40 +03:00
sw.respOut = resp.ArrayValue(values)
2016-03-29 00:16:21 +03:00
}
2016-03-05 02:08:16 +03:00
}
}
func extractZCoordinate(o geojson.Object) float64 {
for {
switch g := o.(type) {
case *geojson.Point:
return g.Z()
case *geojson.Feature:
o = g.Base()
default:
return 0
}
}
}
2022-09-20 03:47:38 +03:00
func getFieldValue(o geojson.Object, fields field.List, name string) field.Value {
if name == "z" {
return field.ValueOf(strconv.FormatFloat(extractZCoordinate(o), 'f', -1, 64))
}
f := fields.Get(name)
return f.Value()
}
func (sw *scanWriter) fieldMatch(o geojson.Object, fields field.List) (bool, error) {
for _, where := range sw.wheres {
if !where.match(getFieldValue(o, fields, where.name)) {
return false, nil
2016-03-05 02:08:16 +03:00
}
2022-09-20 03:47:38 +03:00
}
for _, wherein := range sw.whereins {
if !wherein.match(getFieldValue(o, fields, wherein.name)) {
return false, nil
}
2022-09-20 03:47:38 +03:00
}
if len(sw.whereevals) > 0 {
fieldsWithNames := make(map[string]field.Value)
fieldsWithNames["z"] = field.ValueOf(strconv.FormatFloat(extractZCoordinate(o), 'f', -1, 64))
fields.Scan(func(f field.Field) bool {
fieldsWithNames[f.Name()] = f.Value()
return true
})
for _, whereval := range sw.whereevals {
2022-09-20 03:47:38 +03:00
match, err := whereval.match(fieldsWithNames)
if err != nil {
return false, err
}
2022-09-20 03:47:38 +03:00
if !match {
return false, nil
2017-08-23 23:13:12 +03:00
}
}
2016-03-05 02:08:16 +03:00
}
2022-09-20 03:47:38 +03:00
return true, nil
2016-03-05 02:08:16 +03:00
}
func (sw *scanWriter) globMatch(id string, o geojson.Object) (ok, keepGoing bool) {
if sw.globEverything {
return true, true
}
var val string
if sw.matchValues {
val = o.String()
} else {
val = id
}
for _, pattern := range sw.globs {
ok, _ := glob.Match(pattern, val)
if ok {
return true, true
}
}
return false, true
}
2018-11-01 08:00:09 +03:00
// Increment cursor
2018-11-02 16:09:56 +03:00
func (sw *scanWriter) Offset() uint64 {
return sw.cursor
}
func (sw *scanWriter) Step(n uint64) {
2018-11-01 08:00:09 +03:00
sw.numberIters += n
}
// ok is whether the object passes the test and should be written
// keepGoing is whether there could be more objects to test
2022-09-20 03:47:38 +03:00
func (sw *scanWriter) testObject(id string, o geojson.Object, fields field.List,
) (ok, keepGoing bool, err error) {
match, kg := sw.globMatch(id, o)
if !match {
2022-09-20 03:47:38 +03:00
return false, kg, nil
}
2022-09-20 03:47:38 +03:00
ok, err = sw.fieldMatch(o, fields)
if err != nil {
return false, false, err
2016-04-03 00:13:20 +03:00
}
2022-09-20 03:47:38 +03:00
return ok, true, nil
}
2022-09-20 03:47:38 +03:00
func (sw *scanWriter) pushObject(opts ScanWriterParams) (keepGoing bool, err error) {
keepGoing = true
if !opts.noTest {
var ok bool
2022-09-20 03:47:38 +03:00
var err error
ok, keepGoing, err = sw.testObject(opts.id, opts.o, opts.fields)
if err != nil {
return false, err
}
if !ok {
2022-09-20 03:47:38 +03:00
return keepGoing, nil
}
2016-03-05 02:08:16 +03:00
}
sw.count++
if sw.output == outputCount {
2022-09-20 03:47:38 +03:00
return sw.count < sw.limit, nil
2016-03-05 02:08:16 +03:00
}
if opts.clip != nil {
opts.o = clip.Clip(opts.o, opts.clip, &sw.s.geomIndexOpts)
2018-05-08 02:18:18 +03:00
}
2022-09-20 03:47:38 +03:00
if !sw.fullFields {
opts.fields.Scan(func(f field.Field) bool {
sw.fkeys.Insert(f.Name())
return true
})
}
sw.filled = append(sw.filled, opts)
sw.numberItems++
if sw.numberItems == sw.limit {
sw.hitLimit = true
return false, nil
}
return keepGoing, nil
}
func (sw *scanWriter) writeObject(opts ScanWriterParams) {
n := len(sw.filled)
sw.pushObject(opts)
if len(sw.filled) > n {
sw.writeFilled(sw.filled[len(sw.filled)-1])
sw.filled = sw.filled[:n]
}
}
func (sw *scanWriter) writeFilled(opts ScanWriterParams) {
2016-03-29 00:16:21 +03:00
switch sw.msg.OutputType {
case JSON:
2016-03-29 00:16:21 +03:00
var wr bytes.Buffer
var jsfields string
if sw.once {
wr.WriteByte(',')
} else {
sw.once = true
}
2022-09-20 03:47:38 +03:00
fieldsOutput := sw.hasFieldsOutput()
if fieldsOutput && sw.fullFields {
if opts.fields.Len() > 0 {
jsfields = `,"fields":{`
var i int
opts.fields.Scan(func(f field.Field) bool {
if !f.Value().IsZero() {
if i > 0 {
jsfields += `,`
2016-03-05 02:08:16 +03:00
}
2022-09-20 03:47:38 +03:00
jsfields += jsonString(f.Name()) + ":" + f.Value().JSON()
i++
2016-03-05 02:08:16 +03:00
}
2022-09-20 03:47:38 +03:00
return true
})
jsfields += `}`
2016-03-05 02:08:16 +03:00
}
2022-09-20 03:47:38 +03:00
} else if fieldsOutput && sw.fkeys.Len() > 0 && !sw.fullFields {
jsfields = `,"fields":[`
var i int
sw.fkeys.Scan(func(name string) bool {
if i > 0 {
jsfields += `,`
}
f := opts.fields.Get(name)
jsfields += f.Value().JSON()
i++
return true
})
jsfields += `]`
2016-03-29 00:16:21 +03:00
}
if sw.output == outputIDs {
if opts.distOutput || opts.distance > 0 {
wr.WriteString(`{"id":` + jsonString(opts.id) +
`,"distance":` + strconv.FormatFloat(opts.distance, 'f', -1, 64) + "}")
} else {
wr.WriteString(jsonString(opts.id))
}
2016-03-29 00:16:21 +03:00
} else {
2017-01-10 19:49:48 +03:00
wr.WriteString(`{"id":` + jsonString(opts.id))
2016-03-29 00:16:21 +03:00
switch sw.output {
case outputObjects:
wr.WriteString(`,"object":` + string(opts.o.AppendJSON(nil)))
2016-03-29 00:16:21 +03:00
case outputPoints:
wr.WriteString(`,"point":` + string(appendJSONSimplePoint(nil, opts.o)))
2016-03-29 00:16:21 +03:00
case outputHashes:
center := opts.o.Center()
p := geohash.EncodeWithPrecision(center.Y, center.X, uint(sw.precision))
2016-03-29 00:16:21 +03:00
wr.WriteString(`,"hash":"` + p + `"`)
case outputBounds:
wr.WriteString(`,"bounds":` + string(appendJSONSimpleBounds(nil, opts.o)))
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
wr.WriteString(jsfields)
if opts.distOutput || opts.distance > 0 {
wr.WriteString(`,"distance":` + strconv.FormatFloat(opts.distance, 'f', -1, 64))
2017-01-10 19:49:48 +03:00
}
2016-03-29 00:16:21 +03:00
wr.WriteString(`}`)
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
sw.wr.Write(wr.Bytes())
case RESP:
2016-03-29 00:16:21 +03:00
vals := make([]resp.Value, 1, 3)
2017-01-10 19:49:48 +03:00
vals[0] = resp.StringValue(opts.id)
2016-03-29 00:16:21 +03:00
if sw.output == outputIDs {
if opts.distOutput || opts.distance > 0 {
vals = append(vals, resp.FloatValue(opts.distance))
sw.values = append(sw.values, resp.ArrayValue(vals))
} else {
sw.values = append(sw.values, vals[0])
}
2016-03-29 00:16:21 +03:00
} else {
switch sw.output {
case outputObjects:
2017-01-10 19:49:48 +03:00
vals = append(vals, resp.StringValue(opts.o.String()))
2016-03-29 00:16:21 +03:00
case outputPoints:
point := opts.o.Center()
z := extractZCoordinate(opts.o)
if z != 0 {
2016-03-29 00:16:21 +03:00
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.FloatValue(point.Y),
resp.FloatValue(point.X),
resp.FloatValue(z),
2016-03-29 00:16:21 +03:00
}))
} else {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.FloatValue(point.Y),
resp.FloatValue(point.X),
}))
}
case outputHashes:
center := opts.o.Center()
p := geohash.EncodeWithPrecision(center.Y, center.X, uint(sw.precision))
2016-03-29 00:16:21 +03:00
vals = append(vals, resp.StringValue(p))
case outputBounds:
bbox := opts.o.Rect()
2016-03-29 00:16:21 +03:00
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Min.Y),
resp.FloatValue(bbox.Min.X),
}),
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Max.Y),
resp.FloatValue(bbox.Max.X),
}),
}))
2016-03-05 02:08:16 +03:00
}
if sw.hasFieldsOutput() {
2022-09-20 03:47:38 +03:00
if opts.fields.Len() > 0 {
var fvals []resp.Value
var i int
opts.fields.Scan(func(f field.Field) bool {
if !f.Value().IsZero() {
fvals = append(fvals, resp.StringValue(f.Name()), resp.StringValue(f.Value().Data()))
i++
}
return true
})
vals = append(vals, resp.ArrayValue(fvals))
2016-03-29 00:16:21 +03:00
}
}
if opts.distOutput || opts.distance > 0 {
2017-01-10 19:49:48 +03:00
vals = append(vals, resp.FloatValue(opts.distance))
}
2016-03-29 00:16:21 +03:00
sw.values = append(sw.values, resp.ArrayValue(vals))
2016-03-05 02:08:16 +03:00
}
}
}