tile38/controller/scanner.go

384 lines
8.3 KiB
Go
Raw Normal View History

2016-03-05 02:08:16 +03:00
package controller
import (
"bytes"
"errors"
"strconv"
2016-04-02 17:20:30 +03:00
"sync"
2016-03-05 02:08:16 +03:00
2016-03-29 00:16:21 +03:00
"github.com/tidwall/resp"
2016-03-06 17:55:00 +03:00
"github.com/tidwall/tile38/controller/collection"
2016-07-12 22:18:16 +03:00
"github.com/tidwall/tile38/controller/glob"
2016-03-29 00:16:21 +03:00
"github.com/tidwall/tile38/controller/server"
2016-03-05 02:08:16 +03:00
"github.com/tidwall/tile38/geojson"
)
const limitItems = 100
const capLimit = 100000
type outputT int
const (
outputUnknown outputT = iota
outputIDs
outputObjects
outputCount
outputPoints
outputHashes
outputBounds
)
type scanWriter struct {
2016-04-02 17:20:30 +03:00
mu sync.Mutex
2016-05-23 23:01:42 +03:00
c *Controller
2016-03-05 02:08:16 +03:00
wr *bytes.Buffer
2016-03-29 00:16:21 +03:00
msg *server.Message
2016-03-05 02:08:16 +03:00
col *collection.Collection
fmap map[string]int
farr []string
fvals []float64
output outputT
wheres []whereT
numberItems uint64
nofields bool
limit uint64
hitLimit bool
once bool
count uint64
precision uint64
glob string
globEverything bool
globSingle bool
fullFields bool
2016-03-29 00:16:21 +03:00
values []resp.Value
2016-03-05 02:08:16 +03:00
}
func (c *Controller) newScanWriter(
2016-07-12 22:18:16 +03:00
wr *bytes.Buffer, msg *server.Message, key string, output outputT, precision uint64, globPattern string, limit uint64, wheres []whereT, nofields bool,
2016-03-05 02:08:16 +03:00
) (
*scanWriter, error,
) {
if limit == 0 {
limit = limitItems
} else if limit > capLimit {
limit = capLimit
}
switch output {
default:
return nil, errors.New("invalid output type")
case outputIDs, outputObjects, outputCount, outputBounds, outputPoints, outputHashes:
}
sw := &scanWriter{
2016-05-23 23:01:42 +03:00
c: c,
2016-03-05 02:08:16 +03:00
wr: wr,
2016-03-29 00:16:21 +03:00
msg: msg,
2016-03-05 02:08:16 +03:00
output: output,
wheres: wheres,
precision: precision,
nofields: nofields,
2016-07-12 22:18:16 +03:00
glob: globPattern,
2016-03-05 02:08:16 +03:00
limit: limit,
}
2016-07-12 22:18:16 +03:00
if globPattern == "*" || globPattern == "" {
2016-03-05 02:08:16 +03:00
sw.globEverything = true
} else {
2016-07-12 22:18:16 +03:00
if !glob.IsGlob(globPattern) {
2016-03-05 02:08:16 +03:00
sw.globSingle = true
}
}
sw.col = c.getCol(key)
if sw.col != nil {
sw.fmap = sw.col.FieldMap()
sw.farr = sw.col.FieldArr()
}
sw.fvals = make([]float64, len(sw.farr))
return sw, nil
}
func (sw *scanWriter) hasFieldsOutput() bool {
switch sw.output {
default:
return false
case outputObjects, outputPoints, outputHashes, outputBounds:
return !sw.nofields
}
}
func (sw *scanWriter) writeHead() {
2016-04-02 17:20:30 +03:00
sw.mu.Lock()
defer sw.mu.Unlock()
2016-03-29 00:16:21 +03:00
switch sw.msg.OutputType {
case server.JSON:
if len(sw.farr) > 0 && sw.hasFieldsOutput() {
sw.wr.WriteString(`,"fields":[`)
for i, field := range sw.farr {
if i > 0 {
sw.wr.WriteByte(',')
}
sw.wr.WriteString(jsonString(field))
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
sw.wr.WriteByte(']')
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
switch sw.output {
case outputIDs:
sw.wr.WriteString(`,"ids":[`)
case outputObjects:
sw.wr.WriteString(`,"objects":[`)
case outputPoints:
sw.wr.WriteString(`,"points":[`)
case outputBounds:
sw.wr.WriteString(`,"bounds":[`)
case outputHashes:
sw.wr.WriteString(`,"hashes":[`)
case outputCount:
2016-03-05 02:08:16 +03:00
2016-03-29 00:16:21 +03:00
}
case server.RESP:
2016-03-05 02:08:16 +03:00
}
}
func (sw *scanWriter) writeFoot(cursor uint64) {
2016-04-02 17:20:30 +03:00
sw.mu.Lock()
defer sw.mu.Unlock()
2016-03-05 02:08:16 +03:00
if !sw.hitLimit {
cursor = 0
}
2016-03-29 00:16:21 +03:00
switch sw.msg.OutputType {
case server.JSON:
switch sw.output {
default:
sw.wr.WriteByte(']')
case outputCount:
2016-03-05 02:08:16 +03:00
2016-03-29 00:16:21 +03:00
}
sw.wr.WriteString(`,"count":` + strconv.FormatUint(sw.count, 10))
sw.wr.WriteString(`,"cursor":` + strconv.FormatUint(cursor, 10))
case server.RESP:
sw.wr.Reset()
2016-05-24 00:21:18 +03:00
var data []byte
var err error
if sw.output == outputCount {
data, err = resp.IntegerValue(int(sw.count)).MarshalRESP()
} else {
values := []resp.Value{
resp.IntegerValue(int(cursor)),
resp.ArrayValue(sw.values),
}
data, err = resp.ArrayValue(values).MarshalRESP()
2016-03-29 00:16:21 +03:00
}
if err != nil {
panic("Eek this is bad. Marshal resp should not fail.")
}
sw.wr.Write(data)
2016-03-05 02:08:16 +03:00
}
}
func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64, bool) {
var z float64
var gotz bool
if !sw.hasFieldsOutput() || sw.fullFields {
for _, where := range sw.wheres {
if where.field == "z" {
if !gotz {
z = o.CalculatedPoint().Z
}
if !where.match(z) {
return sw.fvals, false
}
continue
}
var value float64
idx, ok := sw.fmap[where.field]
if ok {
if len(fields) > idx {
value = fields[idx]
}
}
if !where.match(value) {
return sw.fvals, false
}
}
} else {
for idx := range sw.farr {
var value float64
if len(fields) > idx {
value = fields[idx]
}
sw.fvals[idx] = value
}
for _, where := range sw.wheres {
if where.field == "z" {
if !gotz {
z = o.CalculatedPoint().Z
}
if !where.match(z) {
return sw.fvals, false
}
continue
}
var value float64
idx, ok := sw.fmap[where.field]
if ok {
value = sw.fvals[idx]
}
if !where.match(value) {
return sw.fvals, false
}
}
}
return sw.fvals, true
}
2016-04-03 00:13:20 +03:00
func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64, noLock bool) bool {
if !noLock {
sw.mu.Lock()
defer sw.mu.Unlock()
}
2016-03-05 02:08:16 +03:00
keepGoing := true
if !sw.globEverything {
if sw.globSingle {
if sw.glob != id {
return true
}
keepGoing = false // return current object and stop iterating
} else {
2016-07-12 22:18:16 +03:00
ok, _ := glob.Match(sw.glob, id)
2016-03-05 02:08:16 +03:00
if !ok {
return true
}
}
}
nfields, ok := sw.fieldMatch(fields, o)
if !ok {
return true
}
sw.count++
if sw.output == outputCount {
return true
}
2016-03-29 00:16:21 +03:00
switch sw.msg.OutputType {
case server.JSON:
var wr bytes.Buffer
var jsfields string
if sw.once {
wr.WriteByte(',')
} else {
sw.once = true
}
if sw.hasFieldsOutput() {
if sw.fullFields {
if len(sw.fmap) > 0 {
jsfields = `,"fields":{`
var i int
for field, idx := range sw.fmap {
if len(fields) > idx {
2016-03-30 19:32:38 +03:00
if fields[idx] != 0 {
2016-03-29 00:16:21 +03:00
if i > 0 {
jsfields += `,`
}
jsfields += jsonString(field) + ":" + strconv.FormatFloat(fields[idx], 'f', -1, 64)
i++
2016-03-05 02:08:16 +03:00
}
}
}
2016-03-29 00:16:21 +03:00
jsfields += `}`
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
} else if len(sw.farr) > 0 {
jsfields = `,"fields":[`
for i, field := range nfields {
if i > 0 {
jsfields += ","
}
jsfields += strconv.FormatFloat(field, 'f', -1, 64)
}
jsfields += `]`
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
}
if sw.output == outputIDs {
wr.WriteString(jsonString(id))
} else {
wr.WriteString(`{"id":` + jsonString(id))
switch sw.output {
case outputObjects:
wr.WriteString(`,"object":` + o.JSON())
case outputPoints:
wr.WriteString(`,"point":` + o.CalculatedPoint().ExternalJSON())
case outputHashes:
p, err := o.Geohash(int(sw.precision))
if err != nil {
p = ""
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
wr.WriteString(`,"hash":"` + p + `"`)
case outputBounds:
wr.WriteString(`,"bounds":` + o.CalculatedBBox().ExternalJSON())
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
wr.WriteString(jsfields)
wr.WriteString(`}`)
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
sw.wr.Write(wr.Bytes())
case server.RESP:
vals := make([]resp.Value, 1, 3)
vals[0] = resp.StringValue(id)
if sw.output == outputIDs {
sw.values = append(sw.values, vals[0])
} else {
switch sw.output {
case outputObjects:
2016-07-10 05:44:28 +03:00
vals = append(vals, resp.StringValue(o.String()))
2016-03-29 00:16:21 +03:00
case outputPoints:
point := o.CalculatedPoint()
if point.Z != 0 {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.FloatValue(point.Y),
resp.FloatValue(point.X),
resp.FloatValue(point.Z),
}))
} else {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.FloatValue(point.Y),
resp.FloatValue(point.X),
}))
}
case outputHashes:
p, err := o.Geohash(int(sw.precision))
if err != nil {
p = ""
}
vals = append(vals, resp.StringValue(p))
case outputBounds:
bbox := o.CalculatedBBox()
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Min.Y),
resp.FloatValue(bbox.Min.X),
}),
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Max.Y),
resp.FloatValue(bbox.Max.X),
}),
}))
2016-03-05 02:08:16 +03:00
}
2016-03-29 00:16:21 +03:00
fvs := orderFields(sw.fmap, fields)
if len(fvs) > 0 {
fvals := make([]resp.Value, 0, len(fvs)*2)
for i, fv := range fvs {
fvals = append(fvals, resp.StringValue(fv.field), resp.StringValue(strconv.FormatFloat(fv.value, 'f', -1, 64)))
i++
}
vals = append(vals, resp.ArrayValue(fvals))
}
sw.values = append(sw.values, resp.ArrayValue(vals))
2016-03-05 02:08:16 +03:00
}
}
sw.numberItems++
if sw.numberItems == sw.limit {
sw.hitLimit = true
return false
}
return keepGoing
}