tile38/internal/controller/hooks.go

644 lines
14 KiB
Go

package controller
import (
"bytes"
"errors"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/tidwall/buntdb"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/internal/endpoint"
"github.com/tidwall/tile38/internal/glob"
"github.com/tidwall/tile38/internal/log"
"github.com/tidwall/tile38/internal/server"
)
var hookLogSetDefaults = &buntdb.SetOptions{
Expires: true, // automatically delete after 30 seconds
TTL: time.Second * 30,
}
type hooksByName []*Hook
func (a hooksByName) Len() int {
return len(a)
}
func (a hooksByName) Less(i, j int) bool {
return a[i].Name < a[j].Name
}
func (a hooksByName) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
res resp.Value, d commandDetailsT, err error,
) {
start := time.Now()
vs := msg.Values[1:]
var name, urls, cmd string
var ok bool
if vs, name, ok = tokenval(vs); !ok || name == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
var endpoints []string
if chanCmd {
endpoints = []string{"local://" + name}
} else {
if vs, urls, ok = tokenval(vs); !ok || urls == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
for _, url := range strings.Split(urls, ",") {
url = strings.TrimSpace(url)
err := c.epc.Validate(url)
if err != nil {
log.Errorf("sethook: %v", err)
return resp.SimpleStringValue(""), d, errInvalidArgument(url)
}
endpoints = append(endpoints, url)
}
}
var commandvs []resp.Value
var cmdlc string
var types []string
var expires float64
var expiresSet bool
metaMap := make(map[string]string)
for {
commandvs = vs
if vs, cmd, ok = tokenval(vs); !ok || cmd == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
cmdlc = strings.ToLower(cmd)
switch cmdlc {
default:
return server.NOMessage, d, errInvalidArgument(cmd)
case "meta":
var metakey string
var metaval string
if vs, metakey, ok = tokenval(vs); !ok || metakey == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
if vs, metaval, ok = tokenval(vs); !ok || metaval == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
metaMap[metakey] = metaval
continue
case "ex":
var s string
if vs, s, ok = tokenval(vs); !ok || s == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return server.NOMessage, d, errInvalidArgument(s)
}
expires = v
expiresSet = true
continue
case "nearby":
types = nearbyTypes
case "within", "intersects":
types = withinOrIntersectsTypes
}
break
}
s, err := c.cmdSearchArgs(true, cmdlc, vs, types)
defer s.Close()
if err != nil {
return server.NOMessage, d, err
}
if !s.fence {
return server.NOMessage, d, errors.New("missing FENCE argument")
}
s.cmd = cmdlc
cmsg := &server.Message{}
*cmsg = *msg
cmsg.Values = make([]resp.Value, len(commandvs))
for i := 0; i < len(commandvs); i++ {
cmsg.Values[i] = commandvs[i]
}
cmsg.Command = strings.ToLower(cmsg.Values[0].String())
metas := make([]FenceMeta, 0, len(metaMap))
for key, val := range metaMap {
metas = append(metas, FenceMeta{key, val})
}
sort.Sort(hookMetaByName(metas))
hook := &Hook{
Key: s.key,
Name: name,
Endpoints: endpoints,
Fence: &s,
Message: cmsg,
epm: c.epc,
Metas: metas,
channel: chanCmd,
cond: sync.NewCond(&sync.Mutex{}),
}
if expiresSet {
hook.expires =
time.Now().Add(time.Duration(expires * float64(time.Second)))
}
if !chanCmd {
hook.db = c.qdb
}
var wr bytes.Buffer
hook.ScanWriter, err = c.newScanWriter(
&wr, cmsg, s.key, s.output, s.precision, s.glob, false,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields)
if err != nil {
return server.NOMessage, d, err
}
if h, ok := c.hooks[name]; ok {
if h.channel != chanCmd {
return server.NOMessage, d,
errors.New("hooks and channels cannot share the same name")
}
if h.Equals(hook) {
// it was a match so we do nothing. But let's signal just
// for good measure.
h.Signal()
if !hook.expires.IsZero() {
c.hookex.Push(hook)
}
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
return resp.IntegerValue(0), d, nil
}
}
h.Close()
// delete the previous hook
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
}
d.updated = true
d.timestamp = time.Now()
c.hooks[name] = hook
hm, ok := c.hookcols[hook.Key]
if !ok {
hm = make(map[string]*Hook)
c.hookcols[hook.Key] = hm
}
hm[name] = hook
hook.Open()
if !hook.expires.IsZero() {
c.hookex.Push(hook)
}
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
return resp.IntegerValue(1), d, nil
}
return server.NOMessage, d, nil
}
func (c *Controller) cmdDelHook(msg *server.Message, chanCmd bool) (
res resp.Value, d commandDetailsT, err error,
) {
start := time.Now()
vs := msg.Values[1:]
var name string
var ok bool
if vs, name, ok = tokenval(vs); !ok || name == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
if len(vs) != 0 {
return server.NOMessage, d, errInvalidNumberOfArguments
}
if h, ok := c.hooks[name]; ok && h.channel == chanCmd {
h.Close()
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
d.updated = true
}
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
if d.updated {
return resp.IntegerValue(1), d, nil
}
return resp.IntegerValue(0), d, nil
}
return
}
func (c *Controller) cmdPDelHook(msg *server.Message, channel bool) (
res resp.Value, d commandDetailsT, err error,
) {
start := time.Now()
vs := msg.Values[1:]
var pattern string
var ok bool
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
if len(vs) != 0 {
return server.NOMessage, d, errInvalidNumberOfArguments
}
count := 0
for name, h := range c.hooks {
if h.channel != channel {
continue
}
match, _ := glob.Match(pattern, name)
if !match {
continue
}
h.Close()
if hm, ok := c.hookcols[h.Key]; ok {
delete(hm, h.Name)
}
delete(c.hooks, h.Name)
d.updated = true
count++
}
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
case server.RESP:
return resp.IntegerValue(count), d, nil
}
return
}
// possiblyExpireHook will evaluate a hook by it's name for expiration and
// purge it from the database if needed. This operation is called from an
// independent goroutine
func (c *Controller) possiblyExpireHook(name string) {
c.mu.Lock()
if h, ok := c.hooks[name]; ok {
if !h.expires.IsZero() && time.Now().After(h.expires) {
// purge from database
msg := &server.Message{}
if h.channel {
msg.Values = resp.MultiBulkValue("delchan", h.Name).Array()
msg.Command = "delchan"
} else {
msg.Values = resp.MultiBulkValue("delhook", h.Name).Array()
msg.Command = "delhook"
}
_, d, err := c.cmdDelHook(msg, h.channel)
if err != nil {
c.mu.Unlock()
panic(err)
}
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
c.mu.Unlock()
panic(err)
}
log.Debugf("purged hook %v", h.Name)
}
}
c.mu.Unlock()
}
func (c *Controller) cmdHooks(msg *server.Message, channel bool) (
res resp.Value, err error,
) {
start := time.Now()
vs := msg.Values[1:]
var pattern string
var ok bool
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
return server.NOMessage, errInvalidNumberOfArguments
}
if len(vs) != 0 {
return server.NOMessage, errInvalidNumberOfArguments
}
var hooks []*Hook
for name, hook := range c.hooks {
if hook.channel != channel {
continue
}
match, _ := glob.Match(pattern, name)
if match {
hooks = append(hooks, hook)
}
}
sort.Sort(hooksByName(hooks))
switch msg.OutputType {
case server.JSON:
buf := &bytes.Buffer{}
buf.WriteString(`{"ok":true,`)
if channel {
buf.WriteString(`"chans":[`)
} else {
buf.WriteString(`"hooks":[`)
}
for i, hook := range hooks {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(`{`)
buf.WriteString(`"name":` + jsonString(hook.Name))
buf.WriteString(`,"key":` + jsonString(hook.Key))
if !channel {
buf.WriteString(`,"endpoints":[`)
for i, endpoint := range hook.Endpoints {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(jsonString(endpoint))
}
}
buf.WriteString(`],"command":[`)
for i, v := range hook.Message.Values {
if i > 0 {
buf.WriteString(`,`)
}
buf.WriteString(jsonString(v.String()))
}
buf.WriteString(`],"meta":{`)
for i, meta := range hook.Metas {
if i > 0 {
buf.WriteString(`,`)
}
buf.WriteString(jsonString(meta.Name))
buf.WriteString(`:`)
buf.WriteString(jsonString(meta.Value))
}
buf.WriteString(`}}`)
}
buf.WriteString(`],"elapsed":"` +
time.Now().Sub(start).String() + "\"}")
return resp.StringValue(buf.String()), nil
case server.RESP:
var vals []resp.Value
for _, hook := range hooks {
var hvals []resp.Value
hvals = append(hvals, resp.StringValue(hook.Name))
hvals = append(hvals, resp.StringValue(hook.Key))
var evals []resp.Value
for _, endpoint := range hook.Endpoints {
evals = append(evals, resp.StringValue(endpoint))
}
hvals = append(hvals, resp.ArrayValue(evals))
hvals = append(hvals, resp.ArrayValue(hook.Message.Values))
var metas []resp.Value
for _, meta := range hook.Metas {
metas = append(metas, resp.StringValue(meta.Name))
metas = append(metas, resp.StringValue(meta.Value))
}
hvals = append(hvals, resp.ArrayValue(metas))
vals = append(vals, resp.ArrayValue(hvals))
}
return resp.ArrayValue(vals), nil
}
return resp.SimpleStringValue(""), nil
}
// Hook represents a hook.
type Hook struct {
cond *sync.Cond
Key string
Name string
Endpoints []string
Message *server.Message
Fence *liveFenceSwitches
ScanWriter *scanWriter
Metas []FenceMeta
db *buntdb.DB
channel bool
closed bool
opened bool
query string
epm *endpoint.Manager
expires time.Time
}
// Expires returns when the hook expires. Required by the expire.Item interface.
func (h *Hook) Expires() time.Time {
return h.expires
}
// Equals returns true if two hooks are equal
func (h *Hook) Equals(hook *Hook) bool {
if h.Key != hook.Key ||
h.Name != hook.Name ||
len(h.Endpoints) != len(hook.Endpoints) ||
len(h.Metas) != len(hook.Metas) {
return false
}
if !h.expires.Equal(hook.expires) {
return false
}
for i, endpoint := range h.Endpoints {
if endpoint != hook.Endpoints[i] {
return false
}
}
for i, meta := range h.Metas {
if meta.Name != hook.Metas[i].Name ||
meta.Value != hook.Metas[i].Value {
return false
}
}
return resp.ArrayValue(h.Message.Values).Equals(
resp.ArrayValue(hook.Message.Values))
}
// FenceMeta is a meta key/value pair for fences
type FenceMeta struct {
Name, Value string
}
type hookMetaByName []FenceMeta
func (arr hookMetaByName) Len() int {
return len(arr)
}
func (arr hookMetaByName) Less(a, b int) bool {
return arr[a].Name < arr[b].Name
}
func (arr hookMetaByName) Swap(a, b int) {
arr[a], arr[b] = arr[b], arr[a]
}
// Open is called when a hook is first created. It calls the manager
// function in a goroutine
func (h *Hook) Open() {
if h.channel {
// nothing to open for channels
return
}
h.cond.L.Lock()
defer h.cond.L.Unlock()
if h.opened {
return
}
h.opened = true
h.query = `{"hook":` + jsonString(h.Name) + `}`
go h.manager()
}
// Close closed the hook and stop the manager function
func (h *Hook) Close() {
if h.channel {
// nothing to close for channels
return
}
h.cond.L.Lock()
defer h.cond.L.Unlock()
if h.closed {
return
}
h.closed = true
h.cond.Broadcast()
}
// Signal can be called at any point to wake up the hook and
// notify the manager that there may be something new in the queue.
func (h *Hook) Signal() {
if h.channel {
// nothing to signal for channels
return
}
h.cond.L.Lock()
h.cond.Broadcast()
h.cond.L.Unlock()
}
// the manager is a forever loop that calls proc whenever there's a signal.
// it ends when the "closed" flag is set.
func (h *Hook) manager() {
for {
h.cond.L.Lock()
for {
if h.closed {
h.cond.L.Unlock()
return
}
if h.proc() {
break
}
h.cond.L.Unlock()
// proc failed. wait half a second and try again
time.Sleep(time.Second / 2)
h.cond.L.Lock()
}
h.cond.Wait()
h.cond.L.Unlock()
}
}
// proc processes queued hook logs.
// returning true will indicate that all log entries have been
// successfully handled.
func (h *Hook) proc() (ok bool) {
var keys, vals []string
var ttls []time.Duration
start := time.Now()
err := h.db.Update(func(tx *buntdb.Tx) error {
// get keys and vals
err := tx.AscendGreaterOrEqual("hooks",
h.query, func(key, val string) bool {
if strings.HasPrefix(key, hookLogPrefix) {
keys = append(keys, key)
vals = append(vals, val)
}
return true
},
)
if err != nil {
return err
}
// delete the keys
for _, key := range keys {
ttl, err := tx.TTL(key)
if err != nil {
if err != buntdb.ErrNotFound {
return err
}
}
ttls = append(ttls, ttl)
_, err = tx.Delete(key)
if err != nil {
if err != buntdb.ErrNotFound {
return err
}
}
}
return nil
})
if err != nil {
log.Error(err)
return false
}
// send each val. on failure reinsert that one and all of the following
for i, key := range keys {
val := vals[i]
idx := stringToUint64(key[len(hookLogPrefix):])
var sent bool
for _, endpoint := range h.Endpoints {
err := h.epm.Send(endpoint, val)
if err != nil {
log.Debugf("Endpoint connect/send error: %v: %v: %v",
idx, endpoint, err)
continue
}
log.Debugf("Endpoint send ok: %v: %v: %v", idx, endpoint, err)
sent = true
break
}
if !sent {
// failed to send. try to reinsert the remaining.
// if this fails we lose log entries.
keys = keys[i:]
vals = vals[i:]
ttls = ttls[i:]
h.db.Update(func(tx *buntdb.Tx) error {
for i, key := range keys {
val := vals[i]
ttl := ttls[i] - time.Since(start)
if ttl > 0 {
opts := &buntdb.SetOptions{
Expires: true,
TTL: ttl,
}
_, _, err := tx.Set(key, val, opts)
if err != nil {
return err
}
}
}
return nil
})
return false
}
}
return true
}