tile38/internal/server/scanner.go

476 lines
11 KiB
Go

package server
import (
"bytes"
"errors"
"math"
"strconv"
"github.com/mmcloughlin/geohash"
"github.com/tidwall/btree"
"github.com/tidwall/geojson"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/internal/clip"
"github.com/tidwall/tile38/internal/collection"
"github.com/tidwall/tile38/internal/field"
"github.com/tidwall/tile38/internal/glob"
"github.com/tidwall/tile38/internal/object"
)
const limitItems = 100
type outputT int
const (
outputUnknown outputT = iota
outputIDs
outputObjects
outputCount
outputPoints
outputHashes
outputBounds
)
type scanWriter struct {
s *Server
wr *bytes.Buffer
name string
msg *Message
col *collection.Collection
fkeys btree.Set[string]
output outputT
wheres []whereT
whereins []whereinT
whereevals []whereevalT
numberIters uint64
numberItems uint64
nofields bool
cursor uint64
limit uint64
hitLimit bool
once bool
count uint64
precision uint64
globs []string
globEverything bool
fullFields bool
values []resp.Value
matchValues bool
respOut resp.Value
filled []ScanWriterParams
}
type ScanWriterParams struct {
obj *object.Object
dist float64
distOutput bool // query or fence requested distance output
noTest bool
ignoreGlobMatch bool
clip geojson.Object
skipTesting bool
}
func (s *Server) newScanWriter(
wr *bytes.Buffer, msg *Message, name string, output outputT,
precision uint64, globs []string, matchValues bool,
cursor, limit uint64, wheres []whereT, whereins []whereinT,
whereevals []whereevalT, nofields bool,
) (
*scanWriter, error,
) {
switch output {
default:
return nil, errors.New("invalid output type")
case outputIDs, outputObjects, outputCount, outputBounds, outputPoints, outputHashes:
}
if limit == 0 {
if output == outputCount {
limit = math.MaxUint64
} else {
limit = limitItems
}
}
sw := &scanWriter{
s: s,
wr: wr,
name: name,
msg: msg,
globs: globs,
limit: limit,
cursor: cursor,
output: output,
nofields: nofields,
precision: precision,
whereevals: whereevals,
matchValues: matchValues,
}
if len(globs) == 0 || (len(globs) == 1 && globs[0] == "*") {
sw.globEverything = true
}
sw.wheres = wheres
sw.whereins = whereins
sw.col, _ = sw.s.cols.Get(sw.name)
return sw, nil
}
func (sw *scanWriter) hasFieldsOutput() bool {
switch sw.output {
default:
return false
case outputObjects, outputPoints, outputHashes, outputBounds:
return !sw.nofields
}
}
func (sw *scanWriter) writeFoot() {
switch sw.msg.OutputType {
case JSON:
if sw.fkeys.Len() > 0 && sw.hasFieldsOutput() {
sw.wr.WriteString(`,"fields":[`)
var i int
sw.fkeys.Scan(func(name string) bool {
if i > 0 {
sw.wr.WriteByte(',')
}
sw.wr.WriteString(jsonString(name))
i++
return true
})
sw.wr.WriteByte(']')
}
switch sw.output {
case outputIDs:
sw.wr.WriteString(`,"ids":[`)
case outputObjects:
sw.wr.WriteString(`,"objects":[`)
case outputPoints:
sw.wr.WriteString(`,"points":[`)
case outputBounds:
sw.wr.WriteString(`,"bounds":[`)
case outputHashes:
sw.wr.WriteString(`,"hashes":[`)
case outputCount:
}
case RESP:
}
for _, opts := range sw.filled {
sw.writeFilled(opts)
}
cursor := sw.numberIters
if !sw.hitLimit {
cursor = 0
}
switch sw.msg.OutputType {
case JSON:
switch sw.output {
default:
sw.wr.WriteByte(']')
case outputCount:
}
sw.wr.WriteString(`,"count":` + strconv.FormatUint(sw.count, 10))
sw.wr.WriteString(`,"cursor":` + strconv.FormatUint(cursor, 10))
case RESP:
if sw.output == outputCount {
sw.respOut = resp.IntegerValue(int(sw.count))
} else {
values := []resp.Value{
resp.IntegerValue(int(cursor)),
resp.ArrayValue(sw.values),
}
sw.respOut = resp.ArrayValue(values)
}
}
}
func extractZCoordinate(o geojson.Object) float64 {
for {
switch g := o.(type) {
case *geojson.Point:
return g.Z()
case *geojson.Feature:
o = g.Base()
default:
return 0
}
}
}
func getFieldValue(o *object.Object, name string) field.Value {
if name == "z" {
z := extractZCoordinate(o.Geo())
return field.ValueOf(strconv.FormatFloat(z, 'f', -1, 64))
}
return o.Fields().Get(name).Value()
}
func (sw *scanWriter) fieldMatch(o *object.Object) (bool, error) {
for _, where := range sw.wheres {
if !where.match(getFieldValue(o, where.name)) {
return false, nil
}
}
for _, wherein := range sw.whereins {
if !wherein.match(getFieldValue(o, wherein.name)) {
return false, nil
}
}
if len(sw.whereevals) > 0 {
fieldNames := make(map[string]field.Value)
if objIsSpatial(o.Geo()) {
z := extractZCoordinate(o.Geo())
fieldNames["z"] = field.ValueOf(strconv.FormatFloat(z, 'f', -1, 64))
}
o.Fields().Scan(func(f field.Field) bool {
fieldNames[f.Name()] = f.Value()
return true
})
for _, whereval := range sw.whereevals {
match, err := whereval.match(fieldNames)
if err != nil {
return false, err
}
if !match {
return false, nil
}
}
}
return true, nil
}
func (sw *scanWriter) globMatch(o *object.Object) (ok, keepGoing bool) {
if sw.globEverything {
return true, true
}
var val string
if sw.matchValues {
val = o.String()
} else {
val = o.ID()
}
for _, pattern := range sw.globs {
ok, _ := glob.Match(pattern, val)
if ok {
return true, true
}
}
return false, true
}
// Increment cursor
func (sw *scanWriter) Offset() uint64 {
return sw.cursor
}
func (sw *scanWriter) Step(n uint64) {
sw.numberIters += n
}
// ok is whether the object passes the test and should be written
// keepGoing is whether there could be more objects to test
func (sw *scanWriter) testObject(o *object.Object,
) (ok, keepGoing bool, err error) {
match, kg := sw.globMatch(o)
if !match {
return false, kg, nil
}
ok, err = sw.fieldMatch(o)
if err != nil {
return false, false, err
}
return ok, true, nil
}
func (sw *scanWriter) pushObject(opts ScanWriterParams) (keepGoing bool, err error) {
keepGoing = true
if !opts.noTest {
var ok bool
var err error
ok, keepGoing, err = sw.testObject(opts.obj)
if err != nil {
return false, err
}
if !ok {
return keepGoing, nil
}
}
sw.count++
if sw.output == outputCount {
return sw.count < sw.limit, nil
}
if opts.clip != nil {
// create a newly clipped object
opts.obj = object.New(
opts.obj.ID(),
clip.Clip(opts.obj.Geo(), opts.clip, &sw.s.geomIndexOpts),
0, opts.obj.Expires(),
opts.obj.Fields(),
)
}
if !sw.fullFields {
opts.obj.Fields().Scan(func(f field.Field) bool {
sw.fkeys.Insert(f.Name())
return true
})
}
sw.filled = append(sw.filled, opts)
sw.numberItems++
if sw.numberItems == sw.limit {
sw.hitLimit = true
return false, nil
}
return keepGoing, nil
}
func (sw *scanWriter) writeObject(opts ScanWriterParams) {
n := len(sw.filled)
sw.pushObject(opts)
if len(sw.filled) > n {
sw.writeFilled(sw.filled[len(sw.filled)-1])
sw.filled = sw.filled[:n]
}
}
func (sw *scanWriter) writeFilled(opts ScanWriterParams) {
switch sw.msg.OutputType {
case JSON:
var wr bytes.Buffer
var jsfields string
if sw.once {
wr.WriteByte(',')
} else {
sw.once = true
}
fieldsOutput := sw.hasFieldsOutput()
if fieldsOutput && sw.fullFields {
if opts.obj.Fields().Len() > 0 {
jsfields = `,"fields":{`
var i int
opts.obj.Fields().Scan(func(f field.Field) bool {
if !f.Value().IsZero() {
if i > 0 {
jsfields += `,`
}
jsfields += jsonString(f.Name()) + ":" + f.Value().JSON()
i++
}
return true
})
jsfields += `}`
}
} else if fieldsOutput && sw.fkeys.Len() > 0 && !sw.fullFields {
jsfields = `,"fields":[`
var i int
sw.fkeys.Scan(func(name string) bool {
if i > 0 {
jsfields += `,`
}
f := opts.obj.Fields().Get(name)
jsfields += f.Value().JSON()
i++
return true
})
jsfields += `]`
}
if sw.output == outputIDs {
if opts.distOutput || opts.dist > 0 {
wr.WriteString(`{"id":` + jsonString(opts.obj.ID()) +
`,"distance":` + strconv.FormatFloat(opts.dist, 'f', -1, 64) + "}")
} else {
wr.WriteString(jsonString(opts.obj.ID()))
}
} else {
wr.WriteString(`{"id":` + jsonString(opts.obj.ID()))
switch sw.output {
case outputObjects:
wr.WriteString(`,"object":` + string(opts.obj.Geo().AppendJSON(nil)))
case outputPoints:
wr.WriteString(`,"point":` + string(appendJSONSimplePoint(nil, opts.obj.Geo())))
case outputHashes:
center := opts.obj.Geo().Center()
p := geohash.EncodeWithPrecision(center.Y, center.X, uint(sw.precision))
wr.WriteString(`,"hash":"` + p + `"`)
case outputBounds:
wr.WriteString(`,"bounds":` + string(appendJSONSimpleBounds(nil, opts.obj.Geo())))
}
wr.WriteString(jsfields)
if opts.distOutput || opts.dist > 0 {
wr.WriteString(`,"distance":` + strconv.FormatFloat(opts.dist, 'f', -1, 64))
}
wr.WriteString(`}`)
}
sw.wr.Write(wr.Bytes())
case RESP:
vals := make([]resp.Value, 1, 3)
vals[0] = resp.StringValue(opts.obj.ID())
if sw.output == outputIDs {
if opts.distOutput || opts.dist > 0 {
vals = append(vals, resp.FloatValue(opts.dist))
sw.values = append(sw.values, resp.ArrayValue(vals))
} else {
sw.values = append(sw.values, vals[0])
}
} else {
switch sw.output {
case outputObjects:
vals = append(vals, resp.StringValue(opts.obj.String()))
case outputPoints:
point := opts.obj.Geo().Center()
z := extractZCoordinate(opts.obj.Geo())
if z != 0 {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.FloatValue(point.Y),
resp.FloatValue(point.X),
resp.FloatValue(z),
}))
} else {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.FloatValue(point.Y),
resp.FloatValue(point.X),
}))
}
case outputHashes:
center := opts.obj.Geo().Center()
p := geohash.EncodeWithPrecision(center.Y, center.X, uint(sw.precision))
vals = append(vals, resp.StringValue(p))
case outputBounds:
bbox := opts.obj.Rect()
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Min.Y),
resp.FloatValue(bbox.Min.X),
}),
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Max.Y),
resp.FloatValue(bbox.Max.X),
}),
}))
}
if sw.hasFieldsOutput() {
var fvals []resp.Value
var i int
opts.obj.Fields().Scan(func(f field.Field) bool {
if !f.Value().IsZero() {
fvals = append(fvals, resp.StringValue(f.Name()), resp.StringValue(f.Value().Data()))
i++
}
return true
})
if len(fvals) > 0 {
vals = append(vals, resp.ArrayValue(fvals))
}
}
if opts.distOutput || opts.dist > 0 {
vals = append(vals, resp.FloatValue(opts.dist))
}
sw.values = append(sw.values, resp.ArrayValue(vals))
}
}
}