package controller import ( "bytes" "errors" "fmt" "net/http" "net/url" "sort" "strconv" "strings" "time" "github.com/garyburd/redigo/redis" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" ) type EndpointProtocol string const ( HTTP = EndpointProtocol("http") Disque = EndpointProtocol("disque") ) type Endpoint struct { Protocol EndpointProtocol Original string Disque struct { Host string Port int QueueName string Options struct { Replicate int } } } type Hook struct { Key string Name string Endpoints []Endpoint Message *server.Message Fence *liveFenceSwitches ScanWriter *scanWriter } func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error { var lerrs []error msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false) for _, msg := range msgs { for _, endpoint := range hook.Endpoints { switch endpoint.Protocol { case HTTP: if err := c.sendHTTPMessage(endpoint, msg); err != nil { lerrs = append(lerrs, err) continue } return nil //sent case Disque: if err := c.sendDisqueMessage(endpoint, msg); err != nil { lerrs = append(lerrs, err) continue } return nil // sent } } } var errmsgs []string for _, err := range lerrs { errmsgs = append(errmsgs, err.Error()) } if len(errmsgs) > 0 { return errors.New("not sent: " + strings.Join(errmsgs, ",")) } return errors.New("not sent") } type hooksByName []*Hook func (a hooksByName) Len() int { return len(a) } func (a hooksByName) Less(i, j int) bool { return a[i].Name < a[j].Name } func (a hooksByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func parseEndpoint(s string) (Endpoint, error) { var endpoint Endpoint endpoint.Original = s switch { default: return endpoint, errors.New("unknown scheme") case strings.HasPrefix(s, "http:"): endpoint.Protocol = HTTP case strings.HasPrefix(s, "https:"): endpoint.Protocol = HTTP case strings.HasPrefix(s, "disque:"): endpoint.Protocol = Disque } s = s[strings.Index(s, ":")+1:] if !strings.HasPrefix(s, "//") { return endpoint, errors.New("missing the two slashes") } sqp := strings.Split(s[2:], "?") sp := strings.Split(sqp[0], "/") s = sp[0] if s == "" { return endpoint, errors.New("missing host") } if endpoint.Protocol == Disque { dp := strings.Split(s, ":") switch len(dp) { default: return endpoint, errors.New("invalid disque url") case 1: endpoint.Disque.Host = dp[0] endpoint.Disque.Port = 7711 case 2: endpoint.Disque.Host = dp[0] n, err := strconv.ParseUint(dp[1], 10, 16) if err != nil { return endpoint, errors.New("invalid disque url") } endpoint.Disque.Port = int(n) } if len(sp) > 1 { var err error endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1]) if err != nil { return endpoint, errors.New("invalid disque queue name") } } if len(sqp) > 1 { m, err := url.ParseQuery(sqp[1]) if err != nil { return endpoint, errors.New("invalid disque url") } for key, val := range m { if len(val) == 0 { continue } switch key { case "replicate": n, err := strconv.ParseUint(val[0], 10, 8) if err != nil { return endpoint, errors.New("invalid disque replicate value") } endpoint.Disque.Options.Replicate = int(n) } } } if endpoint.Disque.QueueName == "" { return endpoint, errors.New("missing disque queue name") } } return endpoint, nil } func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetailsT, err error) { start := time.Now() vs := msg.Values[1:] var name, values, cmd string var ok bool if vs, name, ok = tokenval(vs); !ok || name == "" { return "", d, errInvalidNumberOfArguments } if vs, values, ok = tokenval(vs); !ok || values == "" { return "", d, errInvalidNumberOfArguments } var endpoints []Endpoint for _, value := range strings.Split(values, ",") { endpoint, err := parseEndpoint(value) if err != nil { log.Errorf("sethook: %v", err) return "", d, errInvalidArgument(value) } endpoints = append(endpoints, endpoint) } commandvs := vs if vs, cmd, ok = tokenval(vs); !ok || cmd == "" { return "", d, errInvalidNumberOfArguments } cmdlc := strings.ToLower(cmd) var types []string switch cmdlc { default: return "", d, errInvalidArgument(cmd) case "nearby": types = nearbyTypes case "within", "intersects": types = withinOrIntersectsTypes } s, err := c.cmdSearchArgs(cmdlc, vs, types) if err != nil { return "", d, err } if !s.fence { return "", d, errors.New("missing FENCE argument") } s.cmd = cmdlc cmsg := &server.Message{} *cmsg = *msg cmsg.Values = commandvs cmsg.Command = strings.ToLower(cmsg.Values[0].String()) hook := &Hook{ Key: s.key, Name: name, Endpoints: endpoints, Fence: &s, Message: cmsg, } var wr bytes.Buffer hook.ScanWriter, err = c.newScanWriter(&wr, cmsg, s.key, s.output, s.precision, s.glob, s.limit, s.wheres, s.nofields) if err != nil { return "", d, err } // delete the previous hook if h, ok := c.hooks[name]; ok { // lets see if the previous hook matches the new hook if h.Key == hook.Key && h.Name == hook.Name { if len(h.Endpoints) == len(hook.Endpoints) { match := true for i, endpoint := range h.Endpoints { if endpoint.Original != hook.Endpoints[i].Original { match = false break } } if match && resp.ArrayValue(h.Message.Values).Equals(resp.ArrayValue(hook.Message.Values)) { switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil case server.RESP: return ":0\r\n", d, nil } } } } if hm, ok := c.hookcols[h.Key]; ok { delete(hm, h.Name) } delete(c.hooks, h.Name) } d.updated = true c.hooks[name] = hook hm, ok := c.hookcols[hook.Key] if !ok { hm = make(map[string]*Hook) c.hookcols[hook.Key] = hm } hm[name] = hook switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil case server.RESP: return ":1\r\n", d, nil } return "", d, nil } func (c *Controller) cmdDelHook(msg *server.Message) (res string, d commandDetailsT, err error) { start := time.Now() vs := msg.Values[1:] var name string var ok bool if vs, name, ok = tokenval(vs); !ok || name == "" { return "", d, errInvalidNumberOfArguments } if len(vs) != 0 { return "", d, errInvalidNumberOfArguments } if h, ok := c.hooks[name]; ok { if hm, ok := c.hookcols[h.Key]; ok { delete(hm, h.Name) } delete(c.hooks, h.Name) d.updated = true } switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil case server.RESP: if d.updated { return ":1\r\n", d, nil } else { return ":0\r\n", d, nil } } return } func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) { start := time.Now() vs := msg.Values[1:] var pattern string var ok bool if vs, pattern, ok = tokenval(vs); !ok || pattern == "" { return "", errInvalidNumberOfArguments } if len(vs) != 0 { return "", errInvalidNumberOfArguments } var hooks []*Hook for name, hook := range c.hooks { match, _ := globMatch(pattern, name) if match { hooks = append(hooks, hook) } } sort.Sort(hooksByName(hooks)) switch msg.OutputType { case server.JSON: buf := &bytes.Buffer{} buf.WriteString(`{"ok":true,"hooks":[`) for i, hook := range hooks { if i > 0 { buf.WriteByte(',') } buf.WriteString(`{`) buf.WriteString(`"name":` + jsonString(hook.Name)) buf.WriteString(`,"key":` + jsonString(hook.Key)) buf.WriteString(`,"endpoints":[`) for i, endpoint := range hook.Endpoints { if i > 0 { buf.WriteByte(',') } buf.WriteString(jsonString(endpoint.Original)) } buf.WriteString(`],"command":[`) for i, v := range hook.Message.Values { if i > 0 { buf.WriteString(`,`) } buf.WriteString(jsonString(v.String())) } buf.WriteString(`]}`) } buf.WriteString(`],"elapsed":"` + time.Now().Sub(start).String() + "\"}") return buf.String(), nil case server.RESP: var vals []resp.Value for _, hook := range hooks { var hvals []resp.Value hvals = append(hvals, resp.StringValue(hook.Name)) hvals = append(hvals, resp.StringValue(hook.Key)) var evals []resp.Value for _, endpoint := range hook.Endpoints { evals = append(evals, resp.StringValue(endpoint.Original)) } hvals = append(hvals, resp.ArrayValue(evals)) hvals = append(hvals, resp.ArrayValue(hook.Message.Values)) vals = append(vals, resp.ArrayValue(hvals)) } data, err := resp.ArrayValue(vals).MarshalRESP() if err != nil { return "", err } return string(data), nil } return "", nil } func (c *Controller) sendHTTPMessage(endpoint Endpoint, msg []byte) error { resp, err := http.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != 200 { return fmt.Errorf("enpoint returned status code %d", resp.StatusCode) } return nil } func (c *Controller) sendDisqueMessage(endpoint Endpoint, msg []byte) error { addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port) conn, err := redis.DialTimeout("tcp", addr, time.Second/4, time.Second/4, time.Second/4) if err != nil { return err } defer conn.Close() options := []interface{}{endpoint.Disque.QueueName, msg, 0} replicate := endpoint.Disque.Options.Replicate if replicate > 0 { options = append(options, "REPLICATE") options = append(options, endpoint.Disque.Options.Replicate) } id, err := redis.String(conn.Do("ADDJOB", options...)) if err != nil { return err } p := strings.Split(id, "-") if len(p) != 4 { return errors.New("invalid disque reply") } return nil }