From 464d4eb4ed68e6ec105dc0031988ca2ecb99a5bd Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 19 Mar 2016 18:31:59 -0700 Subject: [PATCH] http hook & shrink --- cmd/tile38-server/main.go | 22 +++++++++++++++++ controller/aof.go | 51 +++++++++++++++++++++------------------ controller/fence.go | 12 ++++++--- controller/hooks.go | 19 +++++++++++++-- controller/live.go | 2 +- 5 files changed, 75 insertions(+), 31 deletions(-) diff --git a/cmd/tile38-server/main.go b/cmd/tile38-server/main.go index 5be0e9bf..85bd7abc 100644 --- a/cmd/tile38-server/main.go +++ b/cmd/tile38-server/main.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "os" "runtime" "strconv" @@ -26,6 +27,27 @@ var ( ) func main() { + if len(os.Args) == 3 && os.Args[1] == "--webhook-consumer-port" { + log.Default = log.New(os.Stderr, &log.Config{}) + port, err := strconv.ParseUint(os.Args[2], 10, 16) + if err != nil { + log.Fatal(err) + } + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + data, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Error(err) + return + } + log.HTTPf("http: %s : %s", r.URL.Path, string(data)) + }) + log.Infof("webhook server http://localhost:%d/", port) + if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil { + log.Fatal(err) + } + return + } + // parse non standard args. nargs := []string{os.Args[0]} for i := 1; i < len(os.Args); i++ { diff --git a/controller/aof.go b/controller/aof.go index 31eddf76..9da14a3b 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -158,37 +158,21 @@ func writeCommand(w io.Writer, line []byte) (n int, err error) { } func (c *Controller) writeAOF(line string, d *commandDetailsT) error { - n, err := writeCommand(c.f, []byte(line)) - if err != nil { - return err - } - if d != nil { - // Process hooks + // process hooks if hm, ok := c.hookcols[d.key]; ok { for _, hook := range hm { if err := c.DoHook(hook, d); err != nil { - // There was an error when processing the hook. - // This is a bad thing because we have already written the data to disk. - // But sinced the Controller mutex is currently locked, we have an opportunity - // to revert the written data on disk and return an error code back to the client. - // Not sure if this is the best route, but it's all we have at the moment. - // Instead of truncating the file, which is an expensive and more fault possible - // solution, we will simple overwrite the previous command with Zeros. - blank := make([]byte, len([]byte(line))) - // Seek to origin of the faulty command. - if _, err := c.f.Seek(int64(c.aofsz), 0); err != nil { - return err // really bad - } - if _, err := writeCommand(c.f, blank); err != nil { - return err // really bad - } return errAOFHook{err} } } } } + n, err := writeCommand(c.f, []byte(line)) + if err != nil { + return err + } c.aofsz += n // notify aof live connections that we have new data @@ -395,6 +379,10 @@ func (k *treeKeyBoolT) Less(item btree.Item) bool { // - Has this key been marked 'ignore'? // - Yes, then ignore // - No, Mark key as 'ignore'? +// 'ADDHOOK' +// - Direct copy from memory. +// 'DELHOOK' +// - Direct copy from memory. // 'FLUSHDB' // - Stop shrinking, nothing left to do @@ -409,6 +397,12 @@ func (c *Controller) aofshrink() { endpos := int64(c.aofsz) start := time.Now() log.Infof("aof shrink started at pos %d", endpos) + + var hooks []string + for _, hook := range c.hooks { + hooks = append(hooks, "ADDHOOK "+hook.Name+" "+hook.Endpoint.Original+" "+hook.Command) + } + c.mu.Unlock() var err error defer func() { @@ -584,7 +578,7 @@ reading: break reading // all done case "drop": if line, key = token(line); key == "" { - err = errors.New("drop is missing key") + err = errors.New("DROP is missing key") return } if !keyIgnoreM[key] { @@ -592,14 +586,14 @@ reading: } case "del": if line, key = token(line); key == "" { - err = errors.New("del is missing key") + err = errors.New("DEL is missing key") return } if keyIgnoreM[key] { continue // ignore } if line, id = token(line); id == "" { - err = errors.New("del is missing id") + err = errors.New("DEL is missing id") return } if keyBucketM.Get(&treeKeyBoolT{key}) == nil { @@ -788,4 +782,13 @@ reading: } return true }) + if err == nil { + // add all of the hooks + for _, line := range hooks { + _, err = writeCommand(nf, []byte(line)) + if err != nil { + return + } + } + } } diff --git a/controller/fence.go b/controller/fence.go index 77d73510..974c7a6c 100644 --- a/controller/fence.go +++ b/controller/fence.go @@ -2,11 +2,12 @@ package controller import ( "strings" + "time" "github.com/tidwall/tile38/geojson" ) -func (c *Controller) FenceMatch(sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) [][]byte { +func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) [][]byte { glob := fence.glob if details.command == "drop" { return [][]byte{[]byte(`{"cmd":"drop"}`)} @@ -94,15 +95,18 @@ func (c *Controller) FenceMatch(sw *scanWriter, fence *liveFenceSwitches, detail if sw.output == outputIDs { res = `{"id":` + res + `}` } + jskey := jsonString(details.key) + jstime := time.Now().Format("2006-01-02T15:04:05.999999999Z07:00") + jshookName := jsonString(hookName) if strings.HasPrefix(res, "{") { - res = `{"command":"` + details.command + `","detect":"` + detect + `",` + res[1:] + res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + jstime + `,"key":` + jskey + `,` + res[1:] } msgs := [][]byte{[]byte(res)} switch detect { case "enter": - msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"inside",`+res[1:])) + msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+jstime+`,"key":`+jskey+`,`+res[1:])) case "exit", "cross": - msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"outside",`+res[1:])) + msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+jstime+`,"key":`+jskey+`,`+res[1:])) } return msgs } diff --git a/controller/hooks.go b/controller/hooks.go index e3fb64c8..ee0d0187 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -3,7 +3,9 @@ package controller import ( "bytes" "errors" + "fmt" "io" + "net/http" "sort" "strings" "time" @@ -32,9 +34,22 @@ type Hook struct { } func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error { - msgs := c.FenceMatch(hook.ScanWriter, hook.Fence, details, false) + msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false) for _, msg := range msgs { - println(">>", string(msg)) + switch hook.Endpoint.Protocol { + case HTTP: + resp, err := http.Post(hook.Endpoint.Original, "application/json", bytes.NewBuffer(msg)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("enpoint returned status code %d", resp.StatusCode) + } + return nil + case Disque: + println(">>", string(msg)) + } } return nil } diff --git a/controller/live.go b/controller/live.go index 0c0665e7..4d564397 100644 --- a/controller/live.go +++ b/controller/live.go @@ -134,7 +134,7 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc } fence := lb.fence lb.cond.L.Unlock() - msgs := c.FenceMatch(sw, fence, details, true) + msgs := c.FenceMatch("", sw, fence, details, true) for _, msg := range msgs { if err := writeMessage(conn, msg, websocket); err != nil { return nil // nil return is fine here