mirror of https://github.com/tidwall/tile38.git
http hook & shrink
This commit is contained in:
parent
bbb29b9349
commit
464d4eb4ed
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
|
@ -26,6 +27,27 @@ var (
|
|||
)
|
||||
|
||||
func main() {
|
||||
if len(os.Args) == 3 && os.Args[1] == "--webhook-consumer-port" {
|
||||
log.Default = log.New(os.Stderr, &log.Config{})
|
||||
port, err := strconv.ParseUint(os.Args[2], 10, 16)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
log.HTTPf("http: %s : %s", r.URL.Path, string(data))
|
||||
})
|
||||
log.Infof("webhook server http://localhost:%d/", port)
|
||||
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// parse non standard args.
|
||||
nargs := []string{os.Args[0]}
|
||||
for i := 1; i < len(os.Args); i++ {
|
||||
|
|
|
@ -158,37 +158,21 @@ func writeCommand(w io.Writer, line []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
func (c *Controller) writeAOF(line string, d *commandDetailsT) error {
|
||||
n, err := writeCommand(c.f, []byte(line))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d != nil {
|
||||
// Process hooks
|
||||
// process hooks
|
||||
if hm, ok := c.hookcols[d.key]; ok {
|
||||
for _, hook := range hm {
|
||||
if err := c.DoHook(hook, d); err != nil {
|
||||
// There was an error when processing the hook.
|
||||
// This is a bad thing because we have already written the data to disk.
|
||||
// But sinced the Controller mutex is currently locked, we have an opportunity
|
||||
// to revert the written data on disk and return an error code back to the client.
|
||||
// Not sure if this is the best route, but it's all we have at the moment.
|
||||
// Instead of truncating the file, which is an expensive and more fault possible
|
||||
// solution, we will simple overwrite the previous command with Zeros.
|
||||
blank := make([]byte, len([]byte(line)))
|
||||
// Seek to origin of the faulty command.
|
||||
if _, err := c.f.Seek(int64(c.aofsz), 0); err != nil {
|
||||
return err // really bad
|
||||
}
|
||||
if _, err := writeCommand(c.f, blank); err != nil {
|
||||
return err // really bad
|
||||
}
|
||||
return errAOFHook{err}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
n, err := writeCommand(c.f, []byte(line))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.aofsz += n
|
||||
|
||||
// notify aof live connections that we have new data
|
||||
|
@ -395,6 +379,10 @@ func (k *treeKeyBoolT) Less(item btree.Item) bool {
|
|||
// - Has this key been marked 'ignore'?
|
||||
// - Yes, then ignore
|
||||
// - No, Mark key as 'ignore'?
|
||||
// 'ADDHOOK'
|
||||
// - Direct copy from memory.
|
||||
// 'DELHOOK'
|
||||
// - Direct copy from memory.
|
||||
// 'FLUSHDB'
|
||||
// - Stop shrinking, nothing left to do
|
||||
|
||||
|
@ -409,6 +397,12 @@ func (c *Controller) aofshrink() {
|
|||
endpos := int64(c.aofsz)
|
||||
start := time.Now()
|
||||
log.Infof("aof shrink started at pos %d", endpos)
|
||||
|
||||
var hooks []string
|
||||
for _, hook := range c.hooks {
|
||||
hooks = append(hooks, "ADDHOOK "+hook.Name+" "+hook.Endpoint.Original+" "+hook.Command)
|
||||
}
|
||||
|
||||
c.mu.Unlock()
|
||||
var err error
|
||||
defer func() {
|
||||
|
@ -584,7 +578,7 @@ reading:
|
|||
break reading // all done
|
||||
case "drop":
|
||||
if line, key = token(line); key == "" {
|
||||
err = errors.New("drop is missing key")
|
||||
err = errors.New("DROP is missing key")
|
||||
return
|
||||
}
|
||||
if !keyIgnoreM[key] {
|
||||
|
@ -592,14 +586,14 @@ reading:
|
|||
}
|
||||
case "del":
|
||||
if line, key = token(line); key == "" {
|
||||
err = errors.New("del is missing key")
|
||||
err = errors.New("DEL is missing key")
|
||||
return
|
||||
}
|
||||
if keyIgnoreM[key] {
|
||||
continue // ignore
|
||||
}
|
||||
if line, id = token(line); id == "" {
|
||||
err = errors.New("del is missing id")
|
||||
err = errors.New("DEL is missing id")
|
||||
return
|
||||
}
|
||||
if keyBucketM.Get(&treeKeyBoolT{key}) == nil {
|
||||
|
@ -788,4 +782,13 @@ reading:
|
|||
}
|
||||
return true
|
||||
})
|
||||
if err == nil {
|
||||
// add all of the hooks
|
||||
for _, line := range hooks {
|
||||
_, err = writeCommand(nf, []byte(line))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,11 +2,12 @@ package controller
|
|||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/tile38/geojson"
|
||||
)
|
||||
|
||||
func (c *Controller) FenceMatch(sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) [][]byte {
|
||||
func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) [][]byte {
|
||||
glob := fence.glob
|
||||
if details.command == "drop" {
|
||||
return [][]byte{[]byte(`{"cmd":"drop"}`)}
|
||||
|
@ -94,15 +95,18 @@ func (c *Controller) FenceMatch(sw *scanWriter, fence *liveFenceSwitches, detail
|
|||
if sw.output == outputIDs {
|
||||
res = `{"id":` + res + `}`
|
||||
}
|
||||
jskey := jsonString(details.key)
|
||||
jstime := time.Now().Format("2006-01-02T15:04:05.999999999Z07:00")
|
||||
jshookName := jsonString(hookName)
|
||||
if strings.HasPrefix(res, "{") {
|
||||
res = `{"command":"` + details.command + `","detect":"` + detect + `",` + res[1:]
|
||||
res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + jstime + `,"key":` + jskey + `,` + res[1:]
|
||||
}
|
||||
msgs := [][]byte{[]byte(res)}
|
||||
switch detect {
|
||||
case "enter":
|
||||
msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"inside",`+res[1:]))
|
||||
msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+jstime+`,"key":`+jskey+`,`+res[1:]))
|
||||
case "exit", "cross":
|
||||
msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"outside",`+res[1:]))
|
||||
msgs = append(msgs, []byte(`{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+jstime+`,"key":`+jskey+`,`+res[1:]))
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
|
|
@ -3,7 +3,9 @@ package controller
|
|||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -32,9 +34,22 @@ type Hook struct {
|
|||
}
|
||||
|
||||
func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error {
|
||||
msgs := c.FenceMatch(hook.ScanWriter, hook.Fence, details, false)
|
||||
msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false)
|
||||
for _, msg := range msgs {
|
||||
println(">>", string(msg))
|
||||
switch hook.Endpoint.Protocol {
|
||||
case HTTP:
|
||||
resp, err := http.Post(hook.Endpoint.Original, "application/json", bytes.NewBuffer(msg))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return fmt.Errorf("enpoint returned status code %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
case Disque:
|
||||
println(">>", string(msg))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -134,7 +134,7 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *bufio.Reader, websoc
|
|||
}
|
||||
fence := lb.fence
|
||||
lb.cond.L.Unlock()
|
||||
msgs := c.FenceMatch(sw, fence, details, true)
|
||||
msgs := c.FenceMatch("", sw, fence, details, true)
|
||||
for _, msg := range msgs {
|
||||
if err := writeMessage(conn, msg, websocket); err != nil {
|
||||
return nil // nil return is fine here
|
||||
|
|
Loading…
Reference in New Issue