tile38/internal/server/crud.go

1039 lines
21 KiB
Go

package server
import (
"bytes"
"math"
"strconv"
"strings"
"time"
"github.com/mmcloughlin/geohash"
"github.com/tidwall/geojson"
"github.com/tidwall/geojson/geometry"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/internal/collection"
"github.com/tidwall/tile38/internal/field"
"github.com/tidwall/tile38/internal/glob"
"github.com/tidwall/tile38/internal/object"
)
// BOUNDS key
func (s *Server) cmdBOUNDS(msg *Message) (resp.Value, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 2 {
return retrerr(errInvalidNumberOfArguments)
}
key := args[1]
// >> Operation
col, _ := s.cols.Get(key)
if col == nil {
if msg.OutputType == RESP {
return resp.NullValue(), nil
}
return retrerr(errKeyNotFound)
}
// >> Response
vals := make([]resp.Value, 0, 2)
var buf bytes.Buffer
if msg.OutputType == JSON {
buf.WriteString(`{"ok":true`)
}
minX, minY, maxX, maxY := col.Bounds()
bbox := geojson.NewRect(geometry.Rect{
Min: geometry.Point{X: minX, Y: minY},
Max: geometry.Point{X: maxX, Y: maxY},
})
if msg.OutputType == JSON {
buf.WriteString(`,"bounds":`)
buf.WriteString(string(bbox.AppendJSON(nil)))
buf.WriteString(`,"elapsed":"` + time.Since(start).String() + "\"}")
return resp.StringValue(buf.String()), nil
}
// RESP
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.ArrayValue([]resp.Value{
resp.FloatValue(minX),
resp.FloatValue(minY),
}),
resp.ArrayValue([]resp.Value{
resp.FloatValue(maxX),
resp.FloatValue(maxY),
}),
}))
return vals[0], nil
}
// TYPE key
// undocumented return "none" or "hash"
func (s *Server) cmdTYPE(msg *Message) (resp.Value, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 2 {
return retrerr(errInvalidNumberOfArguments)
}
key := args[1]
// >> Operation
col, _ := s.cols.Get(key)
if col == nil {
if msg.OutputType == RESP {
return resp.SimpleStringValue("none"), nil
}
return retrerr(errKeyNotFound)
}
// >> Response
typ := "hash"
if msg.OutputType == JSON {
return resp.StringValue(`{"ok":true,"type":` + jsonString(typ) +
`,"elapsed":"` + time.Since(start).String() + "\"}"), nil
}
return resp.SimpleStringValue(typ), nil
}
// GET key id [WITHFIELDS] [OBJECT|POINT|BOUNDS|(HASH geohash)]
func (s *Server) cmdGET(msg *Message) (resp.Value, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) < 3 {
return retrerr(errInvalidNumberOfArguments)
}
key, id := args[1], args[2]
withfields := false
kind := "object"
var precision int64
for i := 3; i < len(args); i++ {
switch strings.ToLower(args[i]) {
case "withfields":
withfields = true
case "object":
kind = "object"
case "point":
kind = "point"
case "bounds":
kind = "bounds"
case "hash":
kind = "hash"
i++
if i == len(args) {
return retrerr(errInvalidNumberOfArguments)
}
var err error
precision, err = strconv.ParseInt(args[i], 10, 64)
if err != nil || precision < 1 || precision > 12 {
return retrerr(errInvalidArgument(args[i]))
}
default:
return retrerr(errInvalidNumberOfArguments)
}
}
// >> Operation
col, _ := s.cols.Get(key)
if col == nil {
if msg.OutputType == RESP {
return resp.NullValue(), nil
}
return retrerr(errKeyNotFound)
}
o := col.Get(id)
if o == nil {
if msg.OutputType == RESP {
return resp.NullValue(), nil
}
return retrerr(errIDNotFound)
}
// >> Response
vals := make([]resp.Value, 0, 2)
var buf bytes.Buffer
if msg.OutputType == JSON {
buf.WriteString(`{"ok":true`)
}
switch kind {
case "object":
if msg.OutputType == JSON {
buf.WriteString(`,"object":`)
buf.WriteString(string(o.Geo().AppendJSON(nil)))
} else {
vals = append(vals, resp.StringValue(o.Geo().String()))
}
case "point":
if msg.OutputType == JSON {
buf.WriteString(`,"point":`)
buf.Write(appendJSONSimplePoint(nil, o.Geo()))
} else {
point := o.Geo().Center()
z := extractZCoordinate(o.Geo())
if z != 0 {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.StringValue(strconv.FormatFloat(point.Y, 'f', -1, 64)),
resp.StringValue(strconv.FormatFloat(point.X, 'f', -1, 64)),
resp.StringValue(strconv.FormatFloat(z, 'f', -1, 64)),
}))
} else {
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.StringValue(strconv.FormatFloat(point.Y, 'f', -1, 64)),
resp.StringValue(strconv.FormatFloat(point.X, 'f', -1, 64)),
}))
}
}
case "hash":
if msg.OutputType == JSON {
buf.WriteString(`,"hash":`)
}
center := o.Geo().Center()
p := geohash.EncodeWithPrecision(center.Y, center.X, uint(precision))
if msg.OutputType == JSON {
buf.WriteString(`"` + p + `"`)
} else {
vals = append(vals, resp.StringValue(p))
}
case "bounds":
if msg.OutputType == JSON {
buf.WriteString(`,"bounds":`)
buf.Write(appendJSONSimpleBounds(nil, o.Geo()))
} else {
bbox := o.Rect()
vals = append(vals, resp.ArrayValue([]resp.Value{
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Min.Y),
resp.FloatValue(bbox.Min.X),
}),
resp.ArrayValue([]resp.Value{
resp.FloatValue(bbox.Max.Y),
resp.FloatValue(bbox.Max.X),
}),
}))
}
}
if withfields {
nfields := o.Fields().Len()
if nfields > 0 {
fvals := make([]resp.Value, 0, nfields*2)
if msg.OutputType == JSON {
buf.WriteString(`,"fields":{`)
}
var i int
o.Fields().Scan(func(f field.Field) bool {
if msg.OutputType == JSON {
if i > 0 {
buf.WriteString(`,`)
}
buf.WriteString(jsonString(f.Name()) + ":" +
f.Value().JSON())
} else {
fvals = append(fvals, resp.StringValue(f.Name()),
resp.StringValue(f.Value().Data()))
}
i++
return true
})
if msg.OutputType == JSON {
buf.WriteString(`}`)
} else {
vals = append(vals, resp.ArrayValue(fvals))
}
}
}
if msg.OutputType == JSON {
buf.WriteString(`,"elapsed":"` + time.Since(start).String() + "\"}")
return resp.StringValue(buf.String()), nil
}
var oval resp.Value
if withfields {
oval = resp.ArrayValue(vals)
} else {
oval = vals[0]
}
return oval, nil
}
// DEL key id [ERRON404]
func (s *Server) cmdDEL(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) < 3 {
return retwerr(errInvalidNumberOfArguments)
}
key := args[1]
id := args[2]
erron404 := false
for i := 3; i < len(args); i++ {
switch strings.ToLower(args[i]) {
case "erron404":
erron404 = true
default:
return retwerr(errInvalidArgument(args[i]))
}
}
// >> Operation
updated := false
var old *object.Object
col, _ := s.cols.Get(key)
if col != nil {
old = col.Delete(id)
if old != nil {
if col.Count() == 0 {
s.cols.Delete(key)
}
updated = true
} else if erron404 {
return retwerr(errIDNotFound)
}
} else if erron404 {
return retwerr(errKeyNotFound)
}
s.groupDisconnectObject(key, id)
// >> Response
var d commandDetails
d.command = "del"
d.key = key
d.obj = old
d.updated = updated
d.timestamp = time.Now()
var res resp.Value
switch msg.OutputType {
case JSON:
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
if d.updated {
res = resp.IntegerValue(1)
} else {
res = resp.IntegerValue(0)
}
}
return res, d, nil
}
// PDEL key pattern
func (s *Server) cmdPDEL(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 3 {
return retwerr(errInvalidNumberOfArguments)
}
key := args[1]
pattern := args[2]
// >> Operation
now := time.Now()
var children []*commandDetails
col, _ := s.cols.Get(key)
if col != nil {
g := glob.Parse(pattern, false)
var ids []string
iter := func(o *object.Object) bool {
if match, _ := glob.Match(pattern, o.ID()); match {
ids = append(ids, o.ID())
}
return true
}
if g.Limits[0] == "" && g.Limits[1] == "" {
col.Scan(false, nil, msg.Deadline, iter)
} else {
col.ScanRange(g.Limits[0], g.Limits[1],
false, nil, msg.Deadline, iter)
}
for _, id := range ids {
obj := col.Delete(id)
children = append(children, &commandDetails{
command: "del",
updated: true,
timestamp: now,
key: key,
obj: obj,
})
s.groupDisconnectObject(key, id)
}
if col.Count() == 0 {
s.cols.Delete(key)
}
}
// >> Response
var d commandDetails
var res resp.Value
d.command = "pdel"
d.children = children
d.key = key
d.pattern = pattern
d.updated = len(d.children) > 0
d.timestamp = now
d.parent = true
switch msg.OutputType {
case JSON:
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
res = resp.IntegerValue(len(d.children))
}
return res, d, nil
}
// DROP key
func (s *Server) cmdDROP(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 2 {
return retwerr(errInvalidNumberOfArguments)
}
key := args[1]
// >> Operation
col, _ := s.cols.Get(key)
if col != nil {
s.cols.Delete(key)
}
s.groupDisconnectCollection(key)
// >> Response
var res resp.Value
var d commandDetails
d.key = key
d.updated = col != nil
d.command = "drop"
d.timestamp = time.Now()
switch msg.OutputType {
case JSON:
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
if d.updated {
res = resp.IntegerValue(1)
} else {
res = resp.IntegerValue(0)
}
}
return res, d, nil
}
// RENAME key newkey
// RENAMENX key newkey
func (s *Server) cmdRENAME(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 3 {
return retwerr(errInvalidNumberOfArguments)
}
nx := strings.ToLower(args[0]) == "renamenx"
key := args[1]
newKey := args[2]
// >> Operation
col, _ := s.cols.Get(key)
if col == nil {
return retwerr(errKeyNotFound)
}
var ierr error
s.hooks.Ascend(nil, func(v interface{}) bool {
h := v.(*Hook)
if h.Key == key || h.Key == newKey {
ierr = errKeyHasHooksSet
return false
}
return true
})
if ierr != nil {
return retwerr(ierr)
}
var updated bool
newCol, _ := s.cols.Get(newKey)
if newCol == nil {
updated = true
} else if !nx {
s.cols.Delete(newKey)
updated = true
}
if updated {
s.cols.Delete(key)
s.cols.Set(newKey, col)
}
// >> Response
var d commandDetails
var res resp.Value
d.command = "rename"
d.key = key
d.newKey = newKey
d.updated = updated
d.timestamp = time.Now()
switch msg.OutputType {
case JSON:
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
if !nx {
res = resp.SimpleStringValue("OK")
} else if d.updated {
res = resp.IntegerValue(1)
} else {
res = resp.IntegerValue(0)
}
}
return res, d, nil
}
// FLUSHDB
func (s *Server) cmdFLUSHDB(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 1 {
return retwerr(errInvalidNumberOfArguments)
}
// >> Operation
// clear the entire database
s.cols.Clear()
s.groupHooks.Clear()
s.groupObjects.Clear()
s.hookExpires.Clear()
s.hooks.Clear()
s.hooksOut.Clear()
s.hookTree.Clear()
s.hookCross.Clear()
// >> Response
var d commandDetails
d.command = "flushdb"
d.updated = true
d.timestamp = time.Now()
var res resp.Value
if msg.OutputType == JSON {
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
} else {
res = resp.SimpleStringValue("OK")
}
return res, d, nil
}
// SET key id [FIELD name value ...] [EX seconds] [NX|XX]
// (OBJECT geojson)|(POINT lat lon z)|(BOUNDS minlat minlon maxlat maxlon)|
// (HASH geohash)|(STRING value)
func (s *Server) cmdSET(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
if s.config.maxMemory() > 0 && s.outOfMemory.on() {
return retwerr(errOOM)
}
// >> Args
var key string
var id string
var fields []field.Field
var ex int64
var xx bool
var nx bool
var oobj geojson.Object
args := msg.Args
if len(args) < 3 {
return retwerr(errInvalidNumberOfArguments)
}
key, id = args[1], args[2]
for i := 3; i < len(args); i++ {
switch strings.ToLower(args[i]) {
case "field":
if i+2 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
fkey := strings.ToLower(args[i+1])
fval := args[i+2]
i += 2
if isReservedFieldName(fkey) {
return retwerr(errInvalidArgument(fkey))
}
fields = append(fields, field.Make(fkey, fval))
case "ex":
if i+1 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
exval := args[i+1]
i += 1
x, err := strconv.ParseFloat(exval, 64)
if err != nil {
return retwerr(errInvalidArgument(exval))
}
ex = time.Now().UnixNano() + int64(float64(time.Second)*x)
case "nx":
if xx {
return retwerr(errInvalidArgument(args[i]))
}
nx = true
case "xx":
if nx {
return retwerr(errInvalidArgument(args[i]))
}
xx = true
case "string":
if i+1 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
str := args[i+1]
i += 1
oobj = collection.String(str)
case "point":
if i+2 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
slat := args[i+1]
slon := args[i+2]
i += 2
var z float64
var hasZ bool
if i+1 < len(args) {
// probe for possible z coordinate
var err error
z, err = strconv.ParseFloat(args[i+1], 64)
if err == nil {
hasZ = true
i++
}
}
y, err := strconv.ParseFloat(slat, 64)
if err != nil {
return retwerr(errInvalidArgument(slat))
}
x, err := strconv.ParseFloat(slon, 64)
if err != nil {
return retwerr(errInvalidArgument(slon))
}
if !hasZ {
oobj = geojson.NewPoint(geometry.Point{X: x, Y: y})
} else {
oobj = geojson.NewPointZ(geometry.Point{X: x, Y: y}, z)
}
case "bounds":
if i+4 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
var vals [4]float64
for j := 0; j < 4; j++ {
var err error
vals[j], err = strconv.ParseFloat(args[i+1+j], 64)
if err != nil {
return retwerr(errInvalidArgument(args[i+1+j]))
}
}
i += 4
oobj = geojson.NewRect(geometry.Rect{
Min: geometry.Point{X: vals[1], Y: vals[0]},
Max: geometry.Point{X: vals[3], Y: vals[2]},
})
case "hash":
if i+1 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
shash := args[i+1]
i += 1
lat, lon := geohash.Decode(shash)
oobj = geojson.NewPoint(geometry.Point{X: lon, Y: lat})
case "object":
if i+1 >= len(args) {
return retwerr(errInvalidNumberOfArguments)
}
json := args[i+1]
i += 1
var err error
oobj, err = geojson.Parse(json, &s.geomParseOpts)
if err != nil {
return retwerr(err)
}
default:
return retwerr(errInvalidArgument(args[i]))
}
}
if oobj == nil {
return retwerr(errInvalidNumberOfArguments)
}
// >> Operation
nada := func() (resp.Value, commandDetails, error) {
// exclude operation due to 'xx' or 'nx' match
if msg.OutputType == JSON {
if nx {
return retwerr(errIDAlreadyExists)
} else {
return retwerr(errIDNotFound)
}
}
return resp.NullValue(), commandDetails{}, nil
}
col, ok := s.cols.Get(key)
if !ok {
if xx {
return nada()
}
col = collection.New()
s.cols.Set(key, col)
}
if xx || nx {
if col.Get(id) == nil {
if xx {
return nada()
}
} else {
if nx {
return nada()
}
}
}
obj := object.New(id, oobj, ex, field.MakeList(fields))
old, obj := col.SetMerged(obj)
// >> Response
var d commandDetails
d.command = "set"
d.key = key
d.obj = obj
d.old = old
d.updated = true // perhaps we should do a diff on the previous object?
d.timestamp = time.Now()
var res resp.Value
switch msg.OutputType {
default:
case JSON:
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
res = resp.SimpleStringValue("OK")
}
return res, d, nil
}
func retwerr(err error) (resp.Value, commandDetails, error) {
return resp.Value{}, commandDetails{}, err
}
func retrerr(err error) (resp.Value, error) {
return resp.Value{}, err
}
// FSET key id [XX] field value [field value...]
func (s *Server) cmdFSET(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
if s.config.maxMemory() > 0 && s.outOfMemory.on() {
return retwerr(errOOM)
}
// >> Args
var id string
var key string
var xx bool
var fields []field.Field // raw fields
args := msg.Args
if len(args) < 5 {
return retwerr(errInvalidNumberOfArguments)
}
key, id = args[1], args[2]
for i := 3; i < len(args); i++ {
arg := strings.ToLower(args[i])
switch arg {
case "xx":
xx = true
default:
fkey := arg
i++
if i == len(args) {
return retwerr(errInvalidNumberOfArguments)
}
if isReservedFieldName(fkey) {
return retwerr(errInvalidArgument(fkey))
}
fval := args[i]
fields = append(fields, field.Make(fkey, fval))
}
}
// >> Operation
var d commandDetails
var updateCount int
col, ok := s.cols.Get(key)
if !ok {
return retwerr(errKeyNotFound)
}
o := col.Get(id)
ok = o != nil
if !(ok || xx) {
return retwerr(errIDNotFound)
}
if ok {
ofields := o.Fields()
for _, f := range fields {
prev := ofields.Get(f.Name())
if !prev.Value().Equals(f.Value()) {
ofields = ofields.Set(f)
updateCount++
}
}
obj := object.New(id, o.Geo(), o.Expires(), ofields)
col.Set(obj)
d.command = "fset"
d.key = key
d.obj = obj
d.timestamp = time.Now()
d.updated = updateCount > 0
}
// >> Response
var res resp.Value
switch msg.OutputType {
case JSON:
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
res = resp.IntegerValue(updateCount)
}
return res, d, nil
}
// EXPIRE key id seconds
func (s *Server) cmdEXPIRE(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 4 {
return retwerr(errInvalidNumberOfArguments)
}
key, id, svalue := args[1], args[2], args[3]
value, err := strconv.ParseFloat(svalue, 64)
if err != nil {
return retwerr(errInvalidArgument(svalue))
}
// >> Operation
var ok bool
var obj *object.Object
col, _ := s.cols.Get(key)
if col != nil {
// replace the expiration by getting the old object
ex := time.Now().Add(
time.Duration(float64(time.Second) * value)).UnixNano()
o := col.Get(id)
ok = o != nil
if ok {
obj = object.New(id, o.Geo(), ex, o.Fields())
col.Set(obj)
}
}
// >> Response
var d commandDetails
if ok {
d.key = key
d.obj = obj
d.command = "expire"
d.updated = true
d.timestamp = time.Now()
}
var res resp.Value
switch msg.OutputType {
case JSON:
if ok {
res = resp.StringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
} else if col == nil {
return retwerr(errKeyNotFound)
} else {
return retwerr(errIDNotFound)
}
case RESP:
if ok {
res = resp.IntegerValue(1)
} else {
res = resp.IntegerValue(0)
}
}
return res, d, nil
}
// PERSIST key id
func (s *Server) cmdPERSIST(msg *Message) (resp.Value, commandDetails, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 3 {
return retwerr(errInvalidNumberOfArguments)
}
key, id := args[1], args[2]
// >> Operation
col, _ := s.cols.Get(key)
if col == nil {
if msg.OutputType == RESP {
return resp.IntegerValue(0), commandDetails{}, nil
}
return retwerr(errKeyNotFound)
}
o := col.Get(id)
if o == nil {
if msg.OutputType == RESP {
return resp.IntegerValue(0), commandDetails{}, nil
}
return retwerr(errIDNotFound)
}
var obj *object.Object
var cleared bool
if o.Expires() != 0 {
obj = object.New(id, o.Geo(), 0, o.Fields())
col.Set(obj)
cleared = true
}
// >> Response
var res resp.Value
var d commandDetails
d.command = "persist"
d.key = key
d.obj = obj
d.updated = cleared
d.timestamp = time.Now()
switch msg.OutputType {
case JSON:
res = resp.SimpleStringValue(`{"ok":true,"elapsed":"` +
time.Since(start).String() + "\"}")
case RESP:
if cleared {
res = resp.IntegerValue(1)
} else {
res = resp.IntegerValue(0)
}
}
return res, d, nil
}
// TTL key id
func (s *Server) cmdTTL(msg *Message) (resp.Value, error) {
start := time.Now()
// >> Args
args := msg.Args
if len(args) != 3 {
return retrerr(errInvalidNumberOfArguments)
}
key, id := args[1], args[2]
// >> Operation
col, _ := s.cols.Get(key)
if col == nil {
if msg.OutputType == JSON {
return retrerr(errKeyNotFound)
}
return resp.IntegerValue(-2), nil
}
o := col.Get(id)
if o == nil {
if msg.OutputType == JSON {
return retrerr(errIDNotFound)
}
return resp.IntegerValue(-2), nil
}
var ttl float64
if o.Expires() == 0 {
ttl = -1
} else {
now := start.UnixNano()
ttl = math.Max(float64(o.Expires()-now)/float64(time.Second), 0)
}
// >> Response
if msg.OutputType == JSON {
return resp.SimpleStringValue(
`{"ok":true,"ttl":` + strconv.Itoa(int(ttl)) + `,"elapsed":"` +
time.Since(start).String() + "\"}"), nil
}
return resp.IntegerValue(int(ttl)), nil
}