diff --git a/controller/aof.go b/controller/aof.go index 007b86e9..31eddf76 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -32,6 +32,14 @@ type AOFReader struct { p int // pointer } +type errAOFHook struct { + err error +} + +func (err errAOFHook) Error() string { + return fmt.Sprintf("hook: %v", err.err) +} + func (rd *AOFReader) ReadCommand() ([]byte, error) { if rd.l >= 4 { sz1 := int(binary.LittleEndian.Uint32(rd.buf[rd.p:])) @@ -123,7 +131,16 @@ func (c *Controller) loadAOF() error { } return err } - + empty := true + for i := 0; i < len(buf); i++ { + if buf[i] != 0 { + empty = false + break + } + } + if empty { + return nil + } if _, _, err := c.command(string(buf), nil); err != nil { return err } @@ -145,6 +162,33 @@ func (c *Controller) writeAOF(line string, d *commandDetailsT) error { if err != nil { return err } + + if d != nil { + // Process hooks + if hm, ok := c.hookcols[d.key]; ok { + for _, hook := range hm { + if err := c.DoHook(hook, d); err != nil { + // There was an error when processing the hook. + // This is a bad thing because we have already written the data to disk. + // But sinced the Controller mutex is currently locked, we have an opportunity + // to revert the written data on disk and return an error code back to the client. + // Not sure if this is the best route, but it's all we have at the moment. + // Instead of truncating the file, which is an expensive and more fault possible + // solution, we will simple overwrite the previous command with Zeros. + blank := make([]byte, len([]byte(line))) + // Seek to origin of the faulty command. + if _, err := c.f.Seek(int64(c.aofsz), 0); err != nil { + return err // really bad + } + if _, err := writeCommand(c.f, blank); err != nil { + return err // really bad + } + return errAOFHook{err} + } + } + } + } + c.aofsz += n // notify aof live connections that we have new data @@ -152,13 +196,14 @@ func (c *Controller) writeAOF(line string, d *commandDetailsT) error { c.fcond.Broadcast() c.fcond.L.Unlock() - // write to live connection streams if d != nil { + // write to live connection streams c.lcond.L.Lock() c.lstack = append(c.lstack, d) c.lcond.Broadcast() c.lcond.L.Unlock() } + return nil } diff --git a/controller/checksum.go b/controller/checksum.go index 28828be8..b2df1c9d 100644 --- a/controller/checksum.go +++ b/controller/checksum.go @@ -16,6 +16,9 @@ import ( // checksum performs a simple md5 checksum on the aof file func (c *Controller) checksum(pos, size int64) (sum string, err error) { + if pos+size > int64(c.aofsz) { + return "", io.EOF + } var f *os.File f, err = os.Open(c.f.Name()) if err != nil { @@ -25,7 +28,7 @@ func (c *Controller) checksum(pos, size int64) (sum string, err error) { data := make([]byte, size) err = func() error { if size == 0 { - n, err := f.Seek(0, 2) + n, err := f.Seek(int64(c.aofsz), 0) if err != nil { return err } diff --git a/controller/controller.go b/controller/controller.go index bc9ff863..082a185b 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -58,23 +58,27 @@ type Controller struct { lstack []*commandDetailsT lives map[*liveBuffer]bool lcond *sync.Cond - fcup bool // follow caught up - shrinking bool // aof shrinking flag + fcup bool // follow caught up + shrinking bool // aof shrinking flag + hooks map[string]*Hook // hook name + hookcols map[string]map[string]*Hook // col key } // ListenAndServe starts a new tile38 server func ListenAndServe(host string, port int, dir string) error { log.Infof("Server started, Tile38 version %s, git %s", core.Version, core.GitSHA) c := &Controller{ - host: host, - port: port, - dir: dir, - cols: btree.New(16), - colsm: make(map[string]*collection.Collection), - follows: make(map[*bytes.Buffer]bool), - fcond: sync.NewCond(&sync.Mutex{}), - lives: make(map[*liveBuffer]bool), - lcond: sync.NewCond(&sync.Mutex{}), + host: host, + port: port, + dir: dir, + cols: btree.New(16), + colsm: make(map[string]*collection.Collection), + follows: make(map[*bytes.Buffer]bool), + fcond: sync.NewCond(&sync.Mutex{}), + lives: make(map[*liveBuffer]bool), + lcond: sync.NewCond(&sync.Mutex{}), + hooks: make(map[string]*Hook), + hookcols: make(map[string]map[string]*Hook), } if err := os.MkdirAll(dir, 0700); err != nil { return err @@ -210,7 +214,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, line string, w io.Wri default: c.mu.RLock() defer c.mu.RUnlock() - case "set", "del", "drop", "fset", "flushdb": + case "set", "del", "drop", "fset", "flushdb", "addhook", "delhook": // write operations write = true c.mu.Lock() @@ -221,7 +225,7 @@ func (c *Controller) handleInputCommand(conn *server.Conn, line string, w io.Wri if c.config.ReadOnly { return writeErr(errors.New("read only")) } - case "get", "keys", "scan", "nearby", "within", "intersects": + case "get", "keys", "scan", "nearby", "within", "intersects", "hooks": // read operations c.mu.RLock() defer c.mu.RUnlock() @@ -248,6 +252,9 @@ func (c *Controller) handleInputCommand(conn *server.Conn, line string, w io.Wri } if write { if err := c.writeAOF(line, &d); err != nil { + if _, ok := err.(errAOFHook); ok { + return writeErr(err) + } log.Fatal(err) return err } @@ -306,6 +313,14 @@ func (c *Controller) command(line string, w io.Writer) (resp string, d commandDe case "flushdb": d, err = c.cmdFlushDB(nline) resp = okResp() + case "addhook": + err = c.cmdAddHook(nline) + resp = okResp() + case "delhook": + err = c.cmdDelHook(nline) + resp = okResp() + case "hooks": + err = c.cmdHooks(nline, w) case "massinsert": if !core.DevMode { err = fmt.Errorf("unknown command '%s'", cmd) diff --git a/controller/crud.go b/controller/crud.go index a1b19c2a..d1ab9e84 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -136,6 +136,8 @@ func (c *Controller) cmdFlushDB(line string) (d commandDetailsT, err error) { } c.cols = btree.New(16) c.colsm = make(map[string]*collection.Collection) + c.hooks = make(map[string]*Hook) + c.hookcols = make(map[string]map[string]*Hook) d.command = "flushdb" return } diff --git a/controller/fence.go b/controller/fence.go new file mode 100644 index 00000000..77d73510 --- /dev/null +++ b/controller/fence.go @@ -0,0 +1,136 @@ +package controller + +import ( + "strings" + + "github.com/tidwall/tile38/geojson" +) + +func (c *Controller) FenceMatch(sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) [][]byte { + glob := fence.glob + if details.command == "drop" { + return [][]byte{[]byte(`{"cmd":"drop"}`)} + } + match := true + if glob != "" && glob != "*" { + match, _ = globMatch(glob, details.id) + } + if !match { + return nil + } + + if details.obj == nil || (details.command == "fset" && sw.nofields) { + return nil + } + match = false + detect := "outside" + if fence != nil { + match1 := fenceMatchObject(fence, details.oldObj) + match2 := fenceMatchObject(fence, details.obj) + if match1 && match2 { + match = true + detect = "inside" + } else if match1 && !match2 { + match = true + detect = "exit" + } else if !match1 && match2 { + match = true + detect = "enter" + } else { + // Maybe the old object and new object create a line that crosses the fence. + // Must detect for that possibility. + if details.oldObj != nil { + ls := geojson.LineString{ + Coordinates: []geojson.Position{ + details.oldObj.CalculatedPoint(), + details.obj.CalculatedPoint(), + }, + } + temp := false + if fence.cmd == "within" { + // because we are testing if the line croses the area we need to use + // "intersects" instead of "within". + fence.cmd = "intersects" + temp = true + } + if fenceMatchObject(fence, ls) { + match = true + detect = "cross" + } + if temp { + fence.cmd = "within" + } + } + } + } + if details.command == "del" { + return [][]byte{[]byte(`{"command":"del","id":` + jsonString(details.id) + `}`)} + } + var fmap map[string]int + if mustLock { + c.mu.RLock() + } + col := c.getCol(details.key) + if col != nil { + fmap = col.FieldMap() + } + if mustLock { + c.mu.RUnlock() + } + if fmap == nil { + return nil + } + sw.fmap = fmap + sw.fullFields = true + sw.writeObject(details.id, details.obj, details.fields) + if sw.wr.Len() == 0 { + return nil + } + res := sw.wr.String() + sw.wr.Reset() + if strings.HasPrefix(res, ",") { + res = res[1:] + } + if sw.output == outputIDs { + res = `{"id":` + res + `}` + } + if strings.HasPrefix(res, "{") { + res = `{"command":"` + details.command + `","detect":"` + detect + `",` + res[1:] + } + msgs := [][]byte{[]byte(res)} + switch detect { + case "enter": + msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"inside",`+res[1:])) + case "exit", "cross": + msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"outside",`+res[1:])) + } + return msgs +} + +func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool { + if obj == nil { + return false + } + if fence.cmd == "nearby" { + return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters) + } else if fence.cmd == "within" { + if fence.o != nil { + return obj.Within(fence.o) + } else { + return obj.WithinBBox(geojson.BBox{ + Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0}, + Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0}, + }) + } + } else if fence.cmd == "intersects" { + if fence.o != nil { + return obj.Intersects(fence.o) + } else { + return obj.IntersectsBBox(geojson.BBox{ + Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0}, + Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0}, + }) + } + } + return false +} diff --git a/controller/hooks.go b/controller/hooks.go new file mode 100644 index 00000000..e3fb64c8 --- /dev/null +++ b/controller/hooks.go @@ -0,0 +1,201 @@ +package controller + +import ( + "bytes" + "errors" + "io" + "sort" + "strings" + "time" + + "github.com/tidwall/tile38/controller/log" +) + +type EndpointProtocol string + +const ( + HTTP = EndpointProtocol("http") + Disque = EndpointProtocol("disque") +) + +type Endpoint struct { + Protocol EndpointProtocol + Original string +} +type Hook struct { + Key string + Name string + Endpoint Endpoint + Command string + Fence *liveFenceSwitches + ScanWriter *scanWriter +} + +func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error { + msgs := c.FenceMatch(hook.ScanWriter, hook.Fence, details, false) + for _, msg := range msgs { + println(">>", string(msg)) + } + return nil +} + +type hooksByName []*Hook + +func (a hooksByName) Len() int { + return len(a) +} + +func (a hooksByName) Less(i, j int) bool { + return a[i].Name < a[j].Name +} + +func (a hooksByName) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func parseEndpoint(s string) (Endpoint, error) { + var endpoint Endpoint + endpoint.Original = s + switch { + default: + return endpoint, errors.New("unknown scheme") + case strings.HasPrefix(s, "http:"): + endpoint.Protocol = HTTP + case strings.HasPrefix(s, "https:"): + endpoint.Protocol = HTTP + case strings.HasPrefix(s, "disque:"): + endpoint.Protocol = Disque + } + s = s[strings.Index(s, ":")+1:] + if !strings.HasPrefix(s, "//") { + return endpoint, errors.New("missing the two slashes") + } + s = strings.Split(s[2:], "/")[0] + if s == "" { + return endpoint, errors.New("missing host") + } + return endpoint, nil +} + +func (c *Controller) cmdAddHook(line string) (err error) { + //start := time.Now() + var name, value, cmd string + if line, name = token(line); name == "" { + return errInvalidNumberOfArguments + } + if line, value = token(line); value == "" { + return errInvalidNumberOfArguments + } + endpoint, err := parseEndpoint(value) + if err != nil { + log.Errorf("addhook: %v", err) + return errInvalidArgument(value) + } + command := line + if line, cmd = token(line); cmd == "" { + return errInvalidNumberOfArguments + } + cmdlc := strings.ToLower(cmd) + var types []string + switch cmdlc { + default: + return errInvalidArgument(cmd) + case "nearby": + types = nearbyTypes + case "within", "intersects": + types = withinOrIntersectsTypes + } + s, err := c.cmdSearchArgs(cmdlc, line, types) + if err != nil { + return err + } + if !s.fence { + return errors.New("missing FENCE argument") + } + s.cmd = cmdlc + hook := &Hook{ + Key: s.key, + Name: name, + Endpoint: endpoint, + Fence: &s, + Command: command, + } + var wr bytes.Buffer + hook.ScanWriter, err = c.newScanWriter(&wr, s.key, s.output, s.precision, s.glob, s.limit, s.wheres, s.nofields) + if err != nil { + return err + } + + // delete the previous hook + if h, ok := c.hooks[name]; ok { + if hm, ok := c.hookcols[h.Key]; ok { + delete(hm, h.Name) + } + delete(c.hooks, h.Name) + } + c.hooks[name] = hook + hm, ok := c.hookcols[hook.Key] + if !ok { + hm = make(map[string]*Hook) + c.hookcols[hook.Key] = hm + } + hm[name] = hook + return nil +} + +func (c *Controller) cmdDelHook(line string) (err error) { + var name string + if line, name = token(line); name == "" { + return errInvalidNumberOfArguments + } + if line != "" { + return errInvalidNumberOfArguments + } + if h, ok := c.hooks[name]; ok { + if hm, ok := c.hookcols[h.Key]; ok { + delete(hm, h.Name) + } + delete(c.hooks, h.Name) + } + return +} + +func (c *Controller) cmdHooks(line string, w io.Writer) (err error) { + start := time.Now() + + var pattern string + if line, pattern = token(line); pattern == "" { + return errInvalidNumberOfArguments + } + if line != "" { + return errInvalidNumberOfArguments + } + + var hooks []*Hook + for name, hook := range c.hooks { + if ok, err := globMatch(pattern, name); err == nil && ok { + hooks = append(hooks, hook) + } else if err != nil { + return errInvalidArgument(pattern) + } + } + sort.Sort(hooksByName(hooks)) + + buf := &bytes.Buffer{} + io.WriteString(buf, `{"ok":true,"hooks":[`) + for i, hook := range hooks { + if i > 0 { + io.WriteString(buf, `,`) + } + io.WriteString(buf, `"hook":{`) + io.WriteString(buf, `"name":`+jsonString(hook.Name)) + io.WriteString(buf, `,"key":`+jsonString(hook.Key)) + io.WriteString(buf, `,"endpoint":`+jsonString(hook.Endpoint.Original)) + io.WriteString(buf, `,"command":`+jsonString(hook.Command)) + io.WriteString(buf, `}`) + } + io.WriteString(buf, `],"elapsed":"`+time.Now().Sub(start).String()+"\"}") + + w.Write(buf.Bytes()) + return +} diff --git a/controller/live.go b/controller/live.go index b0830150..0c0665e7 100644 --- a/controller/live.go +++ b/controller/live.go @@ -10,9 +10,7 @@ import ( "sync" "github.com/tidwall/tile38/client" - "github.com/tidwall/tile38/controller/collection" "github.com/tidwall/tile38/controller/log" - "github.com/tidwall/tile38/geojson" ) type liveBuffer struct { @@ -122,7 +120,6 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc if err := writeMessage(conn, []byte(client.LiveJSON), websocket); err != nil { return nil // nil return is fine here } - var col *collection.Collection for { lb.cond.L.Lock() if mustQuit { @@ -136,100 +133,11 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc lb.details = nil } fence := lb.fence - glob := lb.glob lb.cond.L.Unlock() - if details.command == "drop" { - col = nil - if err := writeMessage(conn, []byte(`{"cmd":"drop"}`), websocket); err != nil { - return nil - } - } else { - match := true - if glob != "" && glob != "*" { - match, _ = globMatch(glob, details.id) - } - if match { - if details.obj != nil && !(details.command == "fset" && sw.nofields) { - match = false - detect := "outside" - if fence != nil { - match1 := fenceMatchObject(fence, details.oldObj) - match2 := fenceMatchObject(fence, details.obj) - if match1 && match2 { - match = true - detect = "inside" - } else if match1 && !match2 { - match = true - detect = "exit" - } else if !match1 && match2 { - match = true - detect = "enter" - } else { - // Maybe the old object and new object create a line that crosses the fence. - // Must detect for that possibility. - if details.oldObj != nil { - ls := geojson.LineString{ - Coordinates: []geojson.Position{ - details.oldObj.CalculatedPoint(), - details.obj.CalculatedPoint(), - }, - } - temp := false - if fence.cmd == "within" { - // because we are testing if the line croses the area we need to use - // "intersects" instead of "within". - fence.cmd = "intersects" - temp = true - } - if fenceMatchObject(fence, ls) { - match = true - detect = "cross" - } - if temp { - fence.cmd = "within" - } - } - } - } - if match { - if details.command == "del" { - if err := writeMessage(conn, []byte(`{"command":"del","id":`+jsonString(details.id)+`}`), websocket); err != nil { - return nil - } - } else { - var fmap map[string]int - c.mu.RLock() - if col == nil { - col = c.getCol(details.key) - } - if col != nil { - fmap = col.FieldMap() - } - c.mu.RUnlock() - if fmap != nil { - sw.fmap = fmap - sw.fullFields = true - sw.writeObject(details.id, details.obj, details.fields) - if wr.Len() > 0 { - res := wr.String() - wr.Reset() - if strings.HasPrefix(res, ",") { - res = res[1:] - } - if sw.output == outputIDs { - res = `{"id":` + res + `}` - } - if strings.HasPrefix(res, "{") { - res = `{"command":"` + details.command + `","detect":"` + detect + `",` + res[1:] - } - if err := writeMessage(conn, []byte(res), websocket); err != nil { - return nil - } - } - } - } - } - } + msgs := c.FenceMatch(sw, fence, details, true) + for _, msg := range msgs { + if err := writeMessage(conn, msg, websocket); err != nil { + return nil // nil return is fine here } } lb.cond.L.Lock() @@ -238,31 +146,3 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc lb.cond.L.Unlock() } } - -func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool { - if obj == nil { - return false - } - if fence.cmd == "nearby" { - return obj.Nearby(geojson.Position{X: fence.lon, Y: fence.lat, Z: 0}, fence.meters) - } else if fence.cmd == "within" { - if fence.o != nil { - return obj.Within(fence.o) - } else { - return obj.WithinBBox(geojson.BBox{ - Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0}, - Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0}, - }) - } - } else if fence.cmd == "intersects" { - if fence.o != nil { - return obj.Intersects(fence.o) - } else { - return obj.IntersectsBBox(geojson.BBox{ - Min: geojson.Position{X: fence.minLon, Y: fence.minLat, Z: 0}, - Max: geojson.Position{X: fence.maxLon, Y: fence.maxLat, Z: 0}, - }) - } - } - return false -} diff --git a/controller/search.go b/controller/search.go index d1443a5e..75ebf558 100644 --- a/controller/search.go +++ b/controller/search.go @@ -212,10 +212,13 @@ func (c *Controller) cmdSearchArgs(cmd, line string, types []string) (s liveFenc return } +var nearbyTypes = []string{"point"} +var withinOrIntersectsTypes = []string{"geo", "bounds", "hash", "tile", "quadkey", "get"} + func (c *Controller) cmdNearby(line string, w io.Writer) error { start := time.Now() wr := &bytes.Buffer{} - s, err := c.cmdSearchArgs("nearby", line, []string{"point"}) + s, err := c.cmdSearchArgs("nearby", line, nearbyTypes) if err != nil { return err } @@ -251,7 +254,7 @@ func (c *Controller) cmdIntersects(line string, w io.Writer) error { func (c *Controller) cmdWithinOrIntersects(cmd string, line string, w io.Writer) error { start := time.Now() wr := &bytes.Buffer{} - s, err := c.cmdSearchArgs(cmd, line, []string{"geo", "bounds", "hash", "tile", "quadkey", "get"}) + s, err := c.cmdSearchArgs(cmd, line, withinOrIntersectsTypes) if err != nil { return err }