isolated fence method

This commit is contained in:
Josh Baker 2016-03-19 07:16:19 -07:00
parent 865bbf7d19
commit bbb29b9349
8 changed files with 427 additions and 142 deletions

View File

@ -32,6 +32,14 @@ type AOFReader struct {
p int // pointer p int // pointer
} }
type errAOFHook struct {
err error
}
func (err errAOFHook) Error() string {
return fmt.Sprintf("hook: %v", err.err)
}
func (rd *AOFReader) ReadCommand() ([]byte, error) { func (rd *AOFReader) ReadCommand() ([]byte, error) {
if rd.l >= 4 { if rd.l >= 4 {
sz1 := int(binary.LittleEndian.Uint32(rd.buf[rd.p:])) sz1 := int(binary.LittleEndian.Uint32(rd.buf[rd.p:]))
@ -123,7 +131,16 @@ func (c *Controller) loadAOF() error {
} }
return err return err
} }
empty := true
for i := 0; i < len(buf); i++ {
if buf[i] != 0 {
empty = false
break
}
}
if empty {
return nil
}
if _, _, err := c.command(string(buf), nil); err != nil { if _, _, err := c.command(string(buf), nil); err != nil {
return err return err
} }
@ -145,6 +162,33 @@ func (c *Controller) writeAOF(line string, d *commandDetailsT) error {
if err != nil { if err != nil {
return err return err
} }
if d != nil {
// Process hooks
if hm, ok := c.hookcols[d.key]; ok {
for _, hook := range hm {
if err := c.DoHook(hook, d); err != nil {
// There was an error when processing the hook.
// This is a bad thing because we have already written the data to disk.
// But sinced the Controller mutex is currently locked, we have an opportunity
// to revert the written data on disk and return an error code back to the client.
// Not sure if this is the best route, but it's all we have at the moment.
// Instead of truncating the file, which is an expensive and more fault possible
// solution, we will simple overwrite the previous command with Zeros.
blank := make([]byte, len([]byte(line)))
// Seek to origin of the faulty command.
if _, err := c.f.Seek(int64(c.aofsz), 0); err != nil {
return err // really bad
}
if _, err := writeCommand(c.f, blank); err != nil {
return err // really bad
}
return errAOFHook{err}
}
}
}
}
c.aofsz += n c.aofsz += n
// notify aof live connections that we have new data // notify aof live connections that we have new data
@ -152,13 +196,14 @@ func (c *Controller) writeAOF(line string, d *commandDetailsT) error {
c.fcond.Broadcast() c.fcond.Broadcast()
c.fcond.L.Unlock() c.fcond.L.Unlock()
// write to live connection streams
if d != nil { if d != nil {
// write to live connection streams
c.lcond.L.Lock() c.lcond.L.Lock()
c.lstack = append(c.lstack, d) c.lstack = append(c.lstack, d)
c.lcond.Broadcast() c.lcond.Broadcast()
c.lcond.L.Unlock() c.lcond.L.Unlock()
} }
return nil return nil
} }

View File

@ -16,6 +16,9 @@ import (
// checksum performs a simple md5 checksum on the aof file // checksum performs a simple md5 checksum on the aof file
func (c *Controller) checksum(pos, size int64) (sum string, err error) { func (c *Controller) checksum(pos, size int64) (sum string, err error) {
if pos+size > int64(c.aofsz) {
return "", io.EOF
}
var f *os.File var f *os.File
f, err = os.Open(c.f.Name()) f, err = os.Open(c.f.Name())
if err != nil { if err != nil {
@ -25,7 +28,7 @@ func (c *Controller) checksum(pos, size int64) (sum string, err error) {
data := make([]byte, size) data := make([]byte, size)
err = func() error { err = func() error {
if size == 0 { if size == 0 {
n, err := f.Seek(0, 2) n, err := f.Seek(int64(c.aofsz), 0)
if err != nil { if err != nil {
return err return err
} }

View File

@ -60,6 +60,8 @@ type Controller struct {
lcond *sync.Cond lcond *sync.Cond
fcup bool // follow caught up fcup bool // follow caught up
shrinking bool // aof shrinking flag shrinking bool // aof shrinking flag
hooks map[string]*Hook // hook name
hookcols map[string]map[string]*Hook // col key
} }
// ListenAndServe starts a new tile38 server // ListenAndServe starts a new tile38 server
@ -75,6 +77,8 @@ func ListenAndServe(host string, port int, dir string) error {
fcond: sync.NewCond(&sync.Mutex{}), fcond: sync.NewCond(&sync.Mutex{}),
lives: make(map[*liveBuffer]bool), lives: make(map[*liveBuffer]bool),
lcond: sync.NewCond(&sync.Mutex{}), lcond: sync.NewCond(&sync.Mutex{}),
hooks: make(map[string]*Hook),
hookcols: make(map[string]map[string]*Hook),
} }
if err := os.MkdirAll(dir, 0700); err != nil { if err := os.MkdirAll(dir, 0700); err != nil {
return err return err
@ -210,7 +214,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, line string, w io.Wri
default: default:
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
case "set", "del", "drop", "fset", "flushdb": case "set", "del", "drop", "fset", "flushdb", "addhook", "delhook":
// write operations // write operations
write = true write = true
c.mu.Lock() c.mu.Lock()
@ -221,7 +225,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, line string, w io.Wri
if c.config.ReadOnly { if c.config.ReadOnly {
return writeErr(errors.New("read only")) return writeErr(errors.New("read only"))
} }
case "get", "keys", "scan", "nearby", "within", "intersects": case "get", "keys", "scan", "nearby", "within", "intersects", "hooks":
// read operations // read operations
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
@ -248,6 +252,9 @@ func (c *Controller) handleInputCommand(conn *server.Conn, line string, w io.Wri
} }
if write { if write {
if err := c.writeAOF(line, &d); err != nil { if err := c.writeAOF(line, &d); err != nil {
if _, ok := err.(errAOFHook); ok {
return writeErr(err)
}
log.Fatal(err) log.Fatal(err)
return err return err
} }
@ -306,6 +313,14 @@ func (c *Controller) command(line string, w io.Writer) (resp string, d commandDe
case "flushdb": case "flushdb":
d, err = c.cmdFlushDB(nline) d, err = c.cmdFlushDB(nline)
resp = okResp() resp = okResp()
case "addhook":
err = c.cmdAddHook(nline)
resp = okResp()
case "delhook":
err = c.cmdDelHook(nline)
resp = okResp()
case "hooks":
err = c.cmdHooks(nline, w)
case "massinsert": case "massinsert":
if !core.DevMode { if !core.DevMode {
err = fmt.Errorf("unknown command '%s'", cmd) err = fmt.Errorf("unknown command '%s'", cmd)

View File

@ -136,6 +136,8 @@ func (c *Controller) cmdFlushDB(line string) (d commandDetailsT, err error) {
} }
c.cols = btree.New(16) c.cols = btree.New(16)
c.colsm = make(map[string]*collection.Collection) c.colsm = make(map[string]*collection.Collection)
c.hooks = make(map[string]*Hook)
c.hookcols = make(map[string]map[string]*Hook)
d.command = "flushdb" d.command = "flushdb"
return return
} }

136
controller/fence.go Normal file
View File

@ -0,0 +1,136 @@
package controller
import (
"strings"
"github.com/tidwall/tile38/geojson"
)
func (c *Controller) FenceMatch(sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) [][]byte {
glob := fence.glob
if details.command == "drop" {
return [][]byte{[]byte(`{"cmd":"drop"}`)}
}
match := true
if glob != "" && glob != "*" {
match, _ = globMatch(glob, details.id)
}
if !match {
return nil
}
if details.obj == nil || (details.command == "fset" && sw.nofields) {
return nil
}
match = false
detect := "outside"
if fence != nil {
match1 := fenceMatchObject(fence, details.oldObj)
match2 := fenceMatchObject(fence, details.obj)
if match1 && match2 {
match = true
detect = "inside"
} else if match1 && !match2 {
match = true
detect = "exit"
} else if !match1 && match2 {
match = true
detect = "enter"
} else {
// Maybe the old object and new object create a line that crosses the fence.
// Must detect for that possibility.
if details.oldObj != nil {
ls := geojson.LineString{
Coordinates: []geojson.Position{
details.oldObj.CalculatedPoint(),
details.obj.CalculatedPoint(),
},
}
temp := false
if fence.cmd == "within" {
// because we are testing if the line croses the area we need to use
// "intersects" instead of "within".
fence.cmd = "intersects"
temp = true
}
if fenceMatchObject(fence, ls) {
match = true
detect = "cross"
}
if temp {
fence.cmd = "within"
}
}
}
}
if details.command == "del" {
return [][]byte{[]byte(`{"command":"del","id":` + jsonString(details.id) + `}`)}
}
var fmap map[string]int
if mustLock {
c.mu.RLock()
}
col := c.getCol(details.key)
if col != nil {
fmap = col.FieldMap()
}
if mustLock {
c.mu.RUnlock()
}
if fmap == nil {
return nil
}
sw.fmap = fmap
sw.fullFields = true
sw.writeObject(details.id, details.obj, details.fields)
if sw.wr.Len() == 0 {
return nil
}
res := sw.wr.String()
sw.wr.Reset()
if strings.HasPrefix(res, ",") {
res = res[1:]
}
if sw.output == outputIDs {
res = `{"id":` + res + `}`
}
if strings.HasPrefix(res, "{") {
res = `{"command":"` + details.command + `","detect":"` + detect + `",` + res[1:]
}
msgs := [][]byte{[]byte(res)}
switch detect {
case "enter":
msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"inside",`+res[1:]))
case "exit", "cross":
msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"outside",`+res[1:]))
}
return msgs
}
func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool {
if obj == nil {
return false
}
if fence.cmd == "nearby" {
return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters)
} else if fence.cmd == "within" {
if fence.o != nil {
return obj.Within(fence.o)
} else {
return obj.WithinBBox(geojson.BBox{
Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0},
Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0},
})
}
} else if fence.cmd == "intersects" {
if fence.o != nil {
return obj.Intersects(fence.o)
} else {
return obj.IntersectsBBox(geojson.BBox{
Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0},
Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0},
})
}
}
return false
}

201
controller/hooks.go Normal file
View File

@ -0,0 +1,201 @@
package controller
import (
"bytes"
"errors"
"io"
"sort"
"strings"
"time"
"github.com/tidwall/tile38/controller/log"
)
type EndpointProtocol string
const (
HTTP = EndpointProtocol("http")
Disque = EndpointProtocol("disque")
)
type Endpoint struct {
Protocol EndpointProtocol
Original string
}
type Hook struct {
Key string
Name string
Endpoint Endpoint
Command string
Fence *liveFenceSwitches
ScanWriter *scanWriter
}
func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error {
msgs := c.FenceMatch(hook.ScanWriter, hook.Fence, details, false)
for _, msg := range msgs {
println(">>", string(msg))
}
return nil
}
type hooksByName []*Hook
func (a hooksByName) Len() int {
return len(a)
}
func (a hooksByName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}
func (a hooksByName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func parseEndpoint(s string) (Endpoint, error) {
var endpoint Endpoint
endpoint.Original = s
switch {
default:
return endpoint, errors.New("unknown scheme")
case strings.HasPrefix(s, "http:"):
endpoint.Protocol = HTTP
case strings.HasPrefix(s, "https:"):
endpoint.Protocol = HTTP
case strings.HasPrefix(s, "disque:"):
endpoint.Protocol = Disque
}
s = s[strings.Index(s, ":")+1:]
if !strings.HasPrefix(s, "//") {
return endpoint, errors.New("missing the two slashes")
}
s = strings.Split(s[2:], "/")[0]
if s == "" {
return endpoint, errors.New("missing host")
}
return endpoint, nil
}
func (c *Controller) cmdAddHook(line string) (err error) {
//start := time.Now()
var name, value, cmd string
if line, name = token(line); name == "" {
return errInvalidNumberOfArguments
}
if line, value = token(line); value == "" {
return errInvalidNumberOfArguments
}
endpoint, err := parseEndpoint(value)
if err != nil {
log.Errorf("addhook: %v", err)
return errInvalidArgument(value)
}
command := line
if line, cmd = token(line); cmd == "" {
return errInvalidNumberOfArguments
}
cmdlc := strings.ToLower(cmd)
var types []string
switch cmdlc {
default:
return errInvalidArgument(cmd)
case "nearby":
types = nearbyTypes
case "within", "intersects":
types = withinOrIntersectsTypes
}
s, err := c.cmdSearchArgs(cmdlc, line, types)
if err != nil {
return err
}
if !s.fence {
return errors.New("missing FENCE argument")
}
s.cmd = cmdlc
hook := &Hook{
Key: s.key,
Name: name,
Endpoint: endpoint,
Fence: &s,
Command: command,
}
var wr bytes.Buffer
hook.ScanWriter, err = c.newScanWriter(&wr, s.key, s.output, s.precision, s.glob, s.limit, s.wheres, s.nofields)
if err != nil {
return err
}
// delete the previous hook
if h, ok := c.hooks[name]; ok {
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
}
c.hooks[name] = hook
hm, ok := c.hookcols[hook.Key]
if !ok {
hm = make(map[string]*Hook)
c.hookcols[hook.Key] = hm
}
hm[name] = hook
return nil
}
func (c *Controller) cmdDelHook(line string) (err error) {
var name string
if line, name = token(line); name == "" {
return errInvalidNumberOfArguments
}
if line != "" {
return errInvalidNumberOfArguments
}
if h, ok := c.hooks[name]; ok {
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
}
return
}
func (c *Controller) cmdHooks(line string, w io.Writer) (err error) {
start := time.Now()
var pattern string
if line, pattern = token(line); pattern == "" {
return errInvalidNumberOfArguments
}
if line != "" {
return errInvalidNumberOfArguments
}
var hooks []*Hook
for name, hook := range c.hooks {
if ok, err := globMatch(pattern, name); err == nil && ok {
hooks = append(hooks, hook)
} else if err != nil {
return errInvalidArgument(pattern)
}
}
sort.Sort(hooksByName(hooks))
buf := &bytes.Buffer{}
io.WriteString(buf, `{"ok":true,"hooks":[`)
for i, hook := range hooks {
if i > 0 {
io.WriteString(buf, `,`)
}
io.WriteString(buf, `"hook":{`)
io.WriteString(buf, `"name":`+jsonString(hook.Name))
io.WriteString(buf, `,"key":`+jsonString(hook.Key))
io.WriteString(buf, `,"endpoint":`+jsonString(hook.Endpoint.Original))
io.WriteString(buf, `,"command":`+jsonString(hook.Command))
io.WriteString(buf, `}`)
}
io.WriteString(buf, `],"elapsed":"`+time.Now().Sub(start).String()+"\"}")
w.Write(buf.Bytes())
return
}

View File

@ -10,9 +10,7 @@ import (
"sync" "sync"
"github.com/tidwall/tile38/client" "github.com/tidwall/tile38/client"
"github.com/tidwall/tile38/controller/collection"
"github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/geojson"
) )
type liveBuffer struct { type liveBuffer struct {
@ -122,7 +120,6 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc
if err := writeMessage(conn, []byte(client.LiveJSON), websocket); err != nil { if err := writeMessage(conn, []byte(client.LiveJSON), websocket); err != nil {
return nil // nil return is fine here return nil // nil return is fine here
} }
var col *collection.Collection
for { for {
lb.cond.L.Lock() lb.cond.L.Lock()
if mustQuit { if mustQuit {
@ -136,100 +133,11 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc
lb.details = nil lb.details = nil
} }
fence := lb.fence fence := lb.fence
glob := lb.glob
lb.cond.L.Unlock() lb.cond.L.Unlock()
if details.command == "drop" { msgs := c.FenceMatch(sw, fence, details, true)
col = nil for _, msg := range msgs {
if err := writeMessage(conn, []byte(`{"cmd":"drop"}`), websocket); err != nil { if err := writeMessage(conn, msg, websocket); err != nil {
return nil return nil // nil return is fine here
}
} else {
match := true
if glob != "" && glob != "*" {
match, _ = globMatch(glob, details.id)
}
if match {
if details.obj != nil && !(details.command == "fset" && sw.nofields) {
match = false
detect := "outside"
if fence != nil {
match1 := fenceMatchObject(fence, details.oldObj)
match2 := fenceMatchObject(fence, details.obj)
if match1 && match2 {
match = true
detect = "inside"
} else if match1 && !match2 {
match = true
detect = "exit"
} else if !match1 && match2 {
match = true
detect = "enter"
} else {
// Maybe the old object and new object create a line that crosses the fence.
// Must detect for that possibility.
if details.oldObj != nil {
ls := geojson.LineString{
Coordinates: []geojson.Position{
details.oldObj.CalculatedPoint(),
details.obj.CalculatedPoint(),
},
}
temp := false
if fence.cmd == "within" {
// because we are testing if the line croses the area we need to use
// "intersects" instead of "within".
fence.cmd = "intersects"
temp = true
}
if fenceMatchObject(fence, ls) {
match = true
detect = "cross"
}
if temp {
fence.cmd = "within"
}
}
}
}
if match {
if details.command == "del" {
if err := writeMessage(conn, []byte(`{"command":"del","id":`+jsonString(details.id)+`}`), websocket); err != nil {
return nil
}
} else {
var fmap map[string]int
c.mu.RLock()
if col == nil {
col = c.getCol(details.key)
}
if col != nil {
fmap = col.FieldMap()
}
c.mu.RUnlock()
if fmap != nil {
sw.fmap = fmap
sw.fullFields = true
sw.writeObject(details.id, details.obj, details.fields)
if wr.Len() > 0 {
res := wr.String()
wr.Reset()
if strings.HasPrefix(res, ",") {
res = res[1:]
}
if sw.output == outputIDs {
res = `{"id":` + res + `}`
}
if strings.HasPrefix(res, "{") {
res = `{"command":"` + details.command + `","detect":"` + detect + `",` + res[1:]
}
if err := writeMessage(conn, []byte(res), websocket); err != nil {
return nil
}
}
}
}
}
}
} }
} }
lb.cond.L.Lock() lb.cond.L.Lock()
@ -238,31 +146,3 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc
lb.cond.L.Unlock() lb.cond.L.Unlock()
} }
} }
func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool {
if obj == nil {
return false
}
if fence.cmd == "nearby" {
return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters)
} else if fence.cmd == "within" {
if fence.o != nil {
return obj.Within(fence.o)
} else {
return obj.WithinBBox(geojson.BBox{
Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0},
Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0},
})
}
} else if fence.cmd == "intersects" {
if fence.o != nil {
return obj.Intersects(fence.o)
} else {
return obj.IntersectsBBox(geojson.BBox{
Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0},
Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0},
})
}
}
return false
}

View File

@ -212,10 +212,13 @@ func (c *Controller) cmdSearchArgs(cmd, line string, types []string) (s liveFenc
return return
} }
var nearbyTypes = []string{"point"}
var withinOrIntersectsTypes = []string{"geo", "bounds", "hash", "tile", "quadkey", "get"}
func (c *Controller) cmdNearby(line string, w io.Writer) error { func (c *Controller) cmdNearby(line string, w io.Writer) error {
start := time.Now() start := time.Now()
wr := &bytes.Buffer{} wr := &bytes.Buffer{}
s, err := c.cmdSearchArgs("nearby", line, []string{"point"}) s, err := c.cmdSearchArgs("nearby", line, nearbyTypes)
if err != nil { if err != nil {
return err return err
} }
@ -251,7 +254,7 @@ func (c *Controller) cmdIntersects(line string, w io.Writer) error {
func (c *Controller) cmdWithinOrIntersects(cmd string, line string, w io.Writer) error { func (c *Controller) cmdWithinOrIntersects(cmd string, line string, w io.Writer) error {
start := time.Now() start := time.Now()
wr := &bytes.Buffer{} wr := &bytes.Buffer{}
s, err := c.cmdSearchArgs(cmd, line, []string{"geo", "bounds", "hash", "tile", "quadkey", "get"}) s, err := c.cmdSearchArgs(cmd, line, withinOrIntersectsTypes)
if err != nil { if err != nil {
return err return err
} }