From a3725c7a2ac9d502c2cee23a8cfe417bfd2cbd30 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 2 Apr 2016 07:20:30 -0700 Subject: [PATCH] async hooks --- controller/aof.go | 30 ++++++++++----- controller/controller.go | 2 + controller/crud.go | 22 +++++++++-- controller/fence.go | 81 +++++++++++++++++++--------------------- controller/hooks.go | 14 ++++--- controller/live.go | 2 +- controller/scanner.go | 8 ++++ 7 files changed, 95 insertions(+), 64 deletions(-) diff --git a/controller/aof.go b/controller/aof.go index 8dc19423..a878b7b8 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -16,6 +16,8 @@ import ( "github.com/tidwall/tile38/controller/server" ) +const AsyncHooks = true + type errAOFHook struct { err error } @@ -92,16 +94,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { } if c.config.FollowHost == "" { // process hooks, for leader only - if hm, ok := c.hookcols[d.key]; ok { - for _, hook := range hm { - if err := c.DoHook(hook, d); err != nil { - if d.revert != nil { - d.revert() - } - return errAOFHook{err} - } - } - } + return c.processHooks(d) } } data, err := value.MarshalRESP() @@ -126,7 +119,24 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { c.lcond.Broadcast() c.lcond.L.Unlock() } + return nil +} +func (c *Controller) processHooks(d *commandDetailsT) error { + if hm, ok := c.hookcols[d.key]; ok { + for _, hook := range hm { + if AsyncHooks { + go hook.Do(d) + } else { + if err := hook.Do(d); err != nil { + if d.revert != nil { + d.revert() + } + return errAOFHook{err} + } + } + } + } return nil } diff --git a/controller/controller.go b/controller/controller.go index 137f20a5..b68bd6c2 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -35,10 +35,12 @@ type commandDetailsT struct { value float64 obj geojson.Object fields []float64 + fmap map[string]int oldObj geojson.Object oldFields []float64 updated bool revert func() + timestamp time.Time } func (col *collectionT) Less(item btree.Item) bool { diff --git a/controller/crud.go b/controller/crud.go index bfd0d97a..32e7a40d 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -241,6 +241,7 @@ func (c *Controller) cmdDel(msg *server.Message) (res string, d commandDetailsT, } d.command = "del" d.updated = found + d.timestamp = time.Now() switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" @@ -278,6 +279,7 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT d.updated = false } d.command = "drop" + d.timestamp = time.Now() switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" @@ -303,6 +305,7 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai c.hookcols = make(map[string]map[string]*Hook) d.command = "flushdb" d.updated = true + d.timestamp = time.Now() switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" @@ -530,6 +533,12 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, } d.command = "set" d.updated = true // perhaps we should do a diff on the previous object? + fmap := col.FieldMap() + d.fmap = make(map[string]int) + for key, idx := range fmap { + d.fmap[key] = idx + } + d.timestamp = time.Now() switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" @@ -584,19 +593,24 @@ func (c *Controller) cmdFset(msg *server.Message) (res string, d commandDetailsT return } var ok bool - var updated bool - d.obj, d.fields, updated, ok = col.SetField(d.id, d.field, d.value) + d.obj, d.fields, d.updated, ok = col.SetField(d.id, d.field, d.value) if !ok { err = errIDNotFound return } d.command = "fset" - d.updated = updated + d.timestamp = time.Now() + fmap := col.FieldMap() + d.fmap = make(map[string]int) + for key, idx := range fmap { + d.fmap[key] = idx + } + switch msg.OutputType { case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" case server.RESP: - if updated { + if d.updated { res = ":1\r\n" } else { res = ":0\r\n" diff --git a/controller/fence.go b/controller/fence.go index 85410e57..6b5caac9 100644 --- a/controller/fence.go +++ b/controller/fence.go @@ -2,16 +2,17 @@ package controller import ( "strings" - "time" "github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/geojson" ) -func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) []string { +var tmfmt = "2006-01-02T15:04:05.999999999Z07:00" + +func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) []string { glob := fence.glob if details.command == "drop" { - return []string{`{"cmd":"drop"}`} + return []string{`{"cmd":"drop","time":` + details.timestamp.Format(tmfmt) + `}`} } match := true if glob != "" && glob != "*" { @@ -38,51 +39,45 @@ func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenc } else if !match1 && match2 { match = true detect = "enter" + if details.command == "fset" { + detect = "inside" + } } else { - // Maybe the old object and new object create a line that crosses the fence. - // Must detect for that possibility. - if details.oldObj != nil { - ls := geojson.LineString{ - Coordinates: []geojson.Position{ - details.oldObj.CalculatedPoint(), - details.obj.CalculatedPoint(), - }, - } - temp := false - if fence.cmd == "within" { - // because we are testing if the line croses the area we need to use - // "intersects" instead of "within". - fence.cmd = "intersects" - temp = true - } - if fenceMatchObject(fence, ls) { - match = true - detect = "cross" - } - if temp { - fence.cmd = "within" + if details.command != "fset" { + // Maybe the old object and new object create a line that crosses the fence. + // Must detect for that possibility. + if details.oldObj != nil { + ls := geojson.LineString{ + Coordinates: []geojson.Position{ + details.oldObj.CalculatedPoint(), + details.obj.CalculatedPoint(), + }, + } + temp := false + if fence.cmd == "within" { + // because we are testing if the line croses the area we need to use + // "intersects" instead of "within". + fence.cmd = "intersects" + temp = true + } + if fenceMatchObject(fence, ls) { + match = true + detect = "cross" + } + if temp { + fence.cmd = "within" + } } } } } if details.command == "del" { - return []string{`{"command":"del","id":` + jsonString(details.id) + `}`} + return []string{`{"command":"del","id":` + jsonString(details.id) + `,"time":` + details.timestamp.Format(tmfmt) + `}`} } - var fmap map[string]int - if mustLock { - c.mu.RLock() - } - col := c.getCol(details.key) - if col != nil { - fmap = col.FieldMap() - } - if mustLock { - c.mu.RUnlock() - } - if fmap == nil { + if details.fmap == nil { return nil } - sw.fmap = fmap + sw.fmap = details.fmap sw.fullFields = true sw.msg.OutputType = server.JSON sw.writeObject(details.id, details.obj, details.fields) @@ -98,24 +93,24 @@ func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenc res = `{"id":` + res + `}` } jskey := jsonString(details.key) - jstime := time.Now().Format("2006-01-02T15:04:05.999999999Z07:00") + jshookName := jsonString(hookName) ores := res msgs := make([]string, 0, 2) if fence.detect == nil || fence.detect[detect] { if strings.HasPrefix(ores, "{") { - res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + jstime + `","key":` + jskey + `,` + ores[1:] + res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + details.timestamp.Format(tmfmt) + `","key":` + jskey + `,` + ores[1:] } msgs = append(msgs, res) } switch detect { case "enter": if fence.detect == nil || fence.detect["inside"] { - msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+jstime+`","key":`+jskey+`,`+ores[1:]) + msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:]) } case "exit", "cross": if fence.detect == nil || fence.detect["outside"] { - msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+jstime+`","key":`+jskey+`,`+ores[1:]) + msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:]) } } return msgs diff --git a/controller/hooks.go b/controller/hooks.go index 86061fd1..41c29484 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -45,22 +45,22 @@ type Hook struct { ScanWriter *scanWriter } -func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error { +func (hook *Hook) Do(details *commandDetailsT) error { var lerrs []error - msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false) + msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details) nextMessage: for _, msg := range msgs { nextEndpoint: for _, endpoint := range hook.Endpoints { switch endpoint.Protocol { case HTTP: - if err := c.sendHTTPMessage(endpoint, []byte(msg)); err != nil { + if err := sendHTTPMessage(endpoint, []byte(msg)); err != nil { lerrs = append(lerrs, err) continue nextEndpoint } continue nextMessage // sent case Disque: - if err := c.sendDisqueMessage(endpoint, []byte(msg)); err != nil { + if err := sendDisqueMessage(endpoint, []byte(msg)); err != nil { lerrs = append(lerrs, err) continue nextEndpoint } @@ -262,6 +262,7 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai delete(c.hooks, h.Name) } d.updated = true + d.timestamp = time.Now() c.hooks[name] = hook hm, ok := c.hookcols[hook.Key] if !ok { @@ -297,6 +298,7 @@ func (c *Controller) cmdDelHook(msg *server.Message) (res string, d commandDetai delete(c.hooks, h.Name) d.updated = true } + d.timestamp = time.Now() switch msg.OutputType { case server.JSON: @@ -386,7 +388,7 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) { return "", nil } -func (c *Controller) sendHTTPMessage(endpoint Endpoint, msg []byte) error { +func sendHTTPMessage(endpoint Endpoint, msg []byte) error { resp, err := http.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg)) if err != nil { return err @@ -398,7 +400,7 @@ func (c *Controller) sendHTTPMessage(endpoint Endpoint, msg []byte) error { return nil } -func (c *Controller) sendDisqueMessage(endpoint Endpoint, msg []byte) error { +func sendDisqueMessage(endpoint Endpoint, msg []byte) error { addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port) conn, err := DialTimeout(addr, time.Second/4) if err != nil { diff --git a/controller/live.go b/controller/live.go index 03220638..b47d90eb 100644 --- a/controller/live.go +++ b/controller/live.go @@ -160,7 +160,7 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.AnyReaderWrit } fence := lb.fence lb.cond.L.Unlock() - msgs := c.FenceMatch("", sw, fence, details, true) + msgs := FenceMatch("", sw, fence, details) for _, msg := range msgs { if err := writeMessage(conn, []byte(msg), true, connType, websocket); err != nil { return nil // nil return is fine here diff --git a/controller/scanner.go b/controller/scanner.go index abd8e6c7..f6095fe7 100644 --- a/controller/scanner.go +++ b/controller/scanner.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "strconv" + "sync" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/collection" @@ -27,6 +28,7 @@ const ( ) type scanWriter struct { + mu sync.Mutex wr *bytes.Buffer msg *server.Message col *collection.Collection @@ -100,6 +102,8 @@ func (sw *scanWriter) hasFieldsOutput() bool { } func (sw *scanWriter) writeHead() { + sw.mu.Lock() + defer sw.mu.Unlock() switch sw.msg.OutputType { case server.JSON: if len(sw.farr) > 0 && sw.hasFieldsOutput() { @@ -131,6 +135,8 @@ func (sw *scanWriter) writeHead() { } func (sw *scanWriter) writeFoot(cursor uint64) { + sw.mu.Lock() + defer sw.mu.Unlock() if !sw.hitLimit { cursor = 0 } @@ -215,6 +221,8 @@ func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64, } func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64) bool { + sw.mu.Lock() + defer sw.mu.Unlock() keepGoing := true if !sw.globEverything { if sw.globSingle {