tile38/controller/hooks.go

428 lines
9.8 KiB
Go
Raw Normal View History

2016-03-19 17:16:19 +03:00
package controller
import (
"bytes"
"errors"
2016-03-20 04:31:59 +03:00
"fmt"
"net/http"
2016-03-20 18:24:20 +03:00
"net/url"
2016-03-19 17:16:19 +03:00
"sort"
2016-03-20 18:24:20 +03:00
"strconv"
2016-03-19 17:16:19 +03:00
"strings"
"time"
2016-03-29 00:16:21 +03:00
"github.com/tidwall/resp"
2016-03-19 17:16:19 +03:00
"github.com/tidwall/tile38/controller/log"
2016-03-29 00:16:21 +03:00
"github.com/tidwall/tile38/controller/server"
2016-03-19 17:16:19 +03:00
)
type EndpointProtocol string
const (
HTTP = EndpointProtocol("http")
Disque = EndpointProtocol("disque")
)
type Endpoint struct {
Protocol EndpointProtocol
Original string
2016-03-20 18:24:20 +03:00
Disque struct {
Host string
Port int
QueueName string
Options struct {
Replicate int
}
}
2016-03-19 17:16:19 +03:00
}
2016-03-20 18:24:20 +03:00
2016-03-19 17:16:19 +03:00
type Hook struct {
Key string
Name string
2016-03-20 18:24:20 +03:00
Endpoints []Endpoint
2016-03-29 22:29:15 +03:00
Message *server.Message
2016-03-19 17:16:19 +03:00
Fence *liveFenceSwitches
ScanWriter *scanWriter
}
func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error {
2016-03-30 19:32:38 +03:00
var lerrs []error
2016-03-20 04:31:59 +03:00
msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false)
2016-04-01 22:46:39 +03:00
nextMessage:
2016-03-19 17:16:19 +03:00
for _, msg := range msgs {
2016-04-01 22:46:39 +03:00
nextEndpoint:
2016-03-20 18:24:20 +03:00
for _, endpoint := range hook.Endpoints {
switch endpoint.Protocol {
case HTTP:
2016-04-01 22:46:39 +03:00
if err := c.sendHTTPMessage(endpoint, []byte(msg)); err != nil {
2016-03-30 19:32:38 +03:00
lerrs = append(lerrs, err)
2016-04-01 22:46:39 +03:00
continue nextEndpoint
2016-03-20 18:24:20 +03:00
}
2016-04-01 22:46:39 +03:00
continue nextMessage // sent
2016-03-20 18:24:20 +03:00
case Disque:
2016-04-01 22:46:39 +03:00
if err := c.sendDisqueMessage(endpoint, []byte(msg)); err != nil {
2016-03-30 19:32:38 +03:00
lerrs = append(lerrs, err)
2016-04-01 22:46:39 +03:00
continue nextEndpoint
2016-03-20 18:24:20 +03:00
}
2016-04-01 22:46:39 +03:00
continue nextMessage // sent
2016-03-20 04:31:59 +03:00
}
}
2016-03-19 17:16:19 +03:00
}
2016-04-01 22:46:39 +03:00
if len(lerrs) == 0 {
return nil
}
2016-03-30 19:32:38 +03:00
var errmsgs []string
for _, err := range lerrs {
errmsgs = append(errmsgs, err.Error())
}
if len(errmsgs) > 0 {
return errors.New("not sent: " + strings.Join(errmsgs, ","))
}
2016-03-20 18:24:20 +03:00
return errors.New("not sent")
2016-03-19 17:16:19 +03:00
}
type hooksByName []*Hook
func (a hooksByName) Len() int {
return len(a)
}
func (a hooksByName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}
func (a hooksByName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func parseEndpoint(s string) (Endpoint, error) {
var endpoint Endpoint
endpoint.Original = s
switch {
default:
return endpoint, errors.New("unknown scheme")
case strings.HasPrefix(s, "http:"):
endpoint.Protocol = HTTP
case strings.HasPrefix(s, "https:"):
endpoint.Protocol = HTTP
case strings.HasPrefix(s, "disque:"):
endpoint.Protocol = Disque
}
s = s[strings.Index(s, ":")+1:]
if !strings.HasPrefix(s, "//") {
return endpoint, errors.New("missing the two slashes")
}
2016-03-20 18:24:20 +03:00
sqp := strings.Split(s[2:], "?")
sp := strings.Split(sqp[0], "/")
s = sp[0]
2016-03-19 17:16:19 +03:00
if s == "" {
return endpoint, errors.New("missing host")
}
2016-03-20 18:24:20 +03:00
if endpoint.Protocol == Disque {
dp := strings.Split(s, ":")
switch len(dp) {
default:
return endpoint, errors.New("invalid disque url")
case 1:
endpoint.Disque.Host = dp[0]
endpoint.Disque.Port = 7711
case 2:
endpoint.Disque.Host = dp[0]
n, err := strconv.ParseUint(dp[1], 10, 16)
if err != nil {
return endpoint, errors.New("invalid disque url")
}
endpoint.Disque.Port = int(n)
}
if len(sp) > 1 {
var err error
endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1])
if err != nil {
return endpoint, errors.New("invalid disque queue name")
}
}
if len(sqp) > 1 {
m, err := url.ParseQuery(sqp[1])
if err != nil {
return endpoint, errors.New("invalid disque url")
}
for key, val := range m {
if len(val) == 0 {
continue
}
switch key {
case "replicate":
n, err := strconv.ParseUint(val[0], 10, 8)
if err != nil {
return endpoint, errors.New("invalid disque replicate value")
}
endpoint.Disque.Options.Replicate = int(n)
}
}
}
if endpoint.Disque.QueueName == "" {
return endpoint, errors.New("missing disque queue name")
}
}
2016-03-19 17:16:19 +03:00
return endpoint, nil
}
2016-03-29 22:29:15 +03:00
func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetailsT, err error) {
start := time.Now()
vs := msg.Values[1:]
2016-03-20 18:24:20 +03:00
var name, values, cmd string
2016-03-29 22:29:15 +03:00
var ok bool
if vs, name, ok = tokenval(vs); !ok || name == "" {
return "", d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
if vs, values, ok = tokenval(vs); !ok || values == "" {
return "", d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-20 18:24:20 +03:00
var endpoints []Endpoint
for _, value := range strings.Split(values, ",") {
endpoint, err := parseEndpoint(value)
if err != nil {
log.Errorf("sethook: %v", err)
2016-03-29 22:29:15 +03:00
return "", d, errInvalidArgument(value)
2016-03-20 18:24:20 +03:00
}
endpoints = append(endpoints, endpoint)
2016-03-19 17:16:19 +03:00
}
2016-04-01 22:46:39 +03:00
2016-03-29 22:29:15 +03:00
commandvs := vs
if vs, cmd, ok = tokenval(vs); !ok || cmd == "" {
return "", d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-04-01 22:46:39 +03:00
2016-03-19 17:16:19 +03:00
cmdlc := strings.ToLower(cmd)
var types []string
switch cmdlc {
default:
2016-03-29 22:29:15 +03:00
return "", d, errInvalidArgument(cmd)
2016-03-19 17:16:19 +03:00
case "nearby":
types = nearbyTypes
case "within", "intersects":
types = withinOrIntersectsTypes
}
2016-03-29 00:16:21 +03:00
s, err := c.cmdSearchArgs(cmdlc, vs, types)
2016-03-19 17:16:19 +03:00
if err != nil {
2016-03-29 22:29:15 +03:00
return "", d, err
2016-03-19 17:16:19 +03:00
}
if !s.fence {
2016-03-29 22:29:15 +03:00
return "", d, errors.New("missing FENCE argument")
2016-03-19 17:16:19 +03:00
}
s.cmd = cmdlc
2016-03-29 22:29:15 +03:00
cmsg := &server.Message{}
*cmsg = *msg
cmsg.Values = commandvs
cmsg.Command = strings.ToLower(cmsg.Values[0].String())
2016-03-19 17:16:19 +03:00
hook := &Hook{
2016-03-20 18:24:20 +03:00
Key: s.key,
Name: name,
Endpoints: endpoints,
Fence: &s,
2016-03-29 22:29:15 +03:00
Message: cmsg,
2016-03-19 17:16:19 +03:00
}
var wr bytes.Buffer
2016-03-29 22:29:15 +03:00
hook.ScanWriter, err = c.newScanWriter(&wr, cmsg, s.key, s.output, s.precision, s.glob, s.limit, s.wheres, s.nofields)
2016-03-19 17:16:19 +03:00
if err != nil {
2016-03-29 22:29:15 +03:00
return "", d, err
2016-03-19 17:16:19 +03:00
}
if h, ok := c.hooks[name]; ok {
2016-03-29 22:29:15 +03:00
// lets see if the previous hook matches the new hook
if h.Key == hook.Key && h.Name == hook.Name {
if len(h.Endpoints) == len(hook.Endpoints) {
match := true
for i, endpoint := range h.Endpoints {
if endpoint.Original != hook.Endpoints[i].Original {
match = false
break
}
}
if match && resp.ArrayValue(h.Message.Values).Equals(resp.ArrayValue(hook.Message.Values)) {
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
return ":0\r\n", d, nil
}
}
}
}
2016-04-01 22:46:39 +03:00
// delete the previous hook
2016-03-19 17:16:19 +03:00
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
}
2016-03-29 22:29:15 +03:00
d.updated = true
2016-03-19 17:16:19 +03:00
c.hooks[name] = hook
hm, ok := c.hookcols[hook.Key]
if !ok {
hm = make(map[string]*Hook)
c.hookcols[hook.Key] = hm
}
hm[name] = hook
2016-03-29 22:29:15 +03:00
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
return ":1\r\n", d, nil
}
return "", d, nil
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
func (c *Controller) cmdDelHook(msg *server.Message) (res string, d commandDetailsT, err error) {
start := time.Now()
vs := msg.Values[1:]
2016-03-19 17:16:19 +03:00
var name string
2016-03-29 22:29:15 +03:00
var ok bool
if vs, name, ok = tokenval(vs); !ok || name == "" {
return "", d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
if len(vs) != 0 {
return "", d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
if h, ok := c.hooks[name]; ok {
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
2016-03-29 22:29:15 +03:00
d.updated = true
}
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
if d.updated {
return ":1\r\n", d, nil
} else {
return ":0\r\n", d, nil
}
2016-03-19 17:16:19 +03:00
}
return
}
2016-03-29 22:29:15 +03:00
func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) {
2016-03-19 17:16:19 +03:00
start := time.Now()
2016-03-29 22:29:15 +03:00
vs := msg.Values[1:]
2016-03-19 17:16:19 +03:00
var pattern string
2016-03-29 22:29:15 +03:00
var ok bool
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
return "", errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
if len(vs) != 0 {
return "", errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
var hooks []*Hook
for name, hook := range c.hooks {
2016-03-29 22:29:15 +03:00
match, _ := globMatch(pattern, name)
if match {
2016-03-19 17:16:19 +03:00
hooks = append(hooks, hook)
}
}
sort.Sort(hooksByName(hooks))
2016-03-29 22:29:15 +03:00
switch msg.OutputType {
case server.JSON:
buf := &bytes.Buffer{}
buf.WriteString(`{"ok":true,"hooks":[`)
for i, hook := range hooks {
2016-03-20 18:24:20 +03:00
if i > 0 {
buf.WriteByte(',')
}
2016-03-29 22:29:15 +03:00
buf.WriteString(`{`)
buf.WriteString(`"name":` + jsonString(hook.Name))
buf.WriteString(`,"key":` + jsonString(hook.Key))
buf.WriteString(`,"endpoints":[`)
for i, endpoint := range hook.Endpoints {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(jsonString(endpoint.Original))
}
buf.WriteString(`],"command":[`)
for i, v := range hook.Message.Values {
if i > 0 {
buf.WriteString(`,`)
}
buf.WriteString(jsonString(v.String()))
}
buf.WriteString(`]}`)
}
buf.WriteString(`],"elapsed":"` + time.Now().Sub(start).String() + "\"}")
return buf.String(), nil
case server.RESP:
var vals []resp.Value
for _, hook := range hooks {
var hvals []resp.Value
hvals = append(hvals, resp.StringValue(hook.Name))
hvals = append(hvals, resp.StringValue(hook.Key))
var evals []resp.Value
for _, endpoint := range hook.Endpoints {
evals = append(evals, resp.StringValue(endpoint.Original))
}
hvals = append(hvals, resp.ArrayValue(evals))
hvals = append(hvals, resp.ArrayValue(hook.Message.Values))
vals = append(vals, resp.ArrayValue(hvals))
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
data, err := resp.ArrayValue(vals).MarshalRESP()
if err != nil {
return "", err
}
return string(data), nil
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
return "", nil
2016-03-19 17:16:19 +03:00
}
2016-03-20 18:24:20 +03:00
func (c *Controller) sendHTTPMessage(endpoint Endpoint, msg []byte) error {
resp, err := http.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("enpoint returned status code %d", resp.StatusCode)
}
return nil
}
func (c *Controller) sendDisqueMessage(endpoint Endpoint, msg []byte) error {
addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port)
2016-04-01 04:20:42 +03:00
conn, err := DialTimeout(addr, time.Second/4)
2016-03-20 18:24:20 +03:00
if err != nil {
return err
}
defer conn.Close()
options := []interface{}{endpoint.Disque.QueueName, msg, 0}
replicate := endpoint.Disque.Options.Replicate
if replicate > 0 {
options = append(options, "REPLICATE")
options = append(options, endpoint.Disque.Options.Replicate)
}
2016-04-01 04:20:42 +03:00
v, err := conn.Do("ADDJOB", options...)
2016-03-20 18:24:20 +03:00
if err != nil {
return err
}
2016-04-01 04:20:42 +03:00
if v.Error() != nil {
return v.Error()
}
id := v.String()
2016-03-20 18:24:20 +03:00
p := strings.Split(id, "-")
if len(p) != 4 {
return errors.New("invalid disque reply")
}
return nil
}