mirror of https://github.com/tidwall/tile38.git
697 lines
16 KiB
Go
697 lines
16 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tidwall/buntdb"
|
|
"github.com/tidwall/gjson"
|
|
"github.com/tidwall/resp"
|
|
"github.com/tidwall/tile38/internal/endpoint"
|
|
"github.com/tidwall/tile38/internal/glob"
|
|
"github.com/tidwall/tile38/internal/log"
|
|
)
|
|
|
|
var hookLogSetDefaults = &buntdb.SetOptions{
|
|
Expires: true, // automatically delete after 30 seconds
|
|
TTL: time.Second * 30,
|
|
}
|
|
|
|
type hooksByName []*Hook
|
|
|
|
func (a hooksByName) Len() int {
|
|
return len(a)
|
|
}
|
|
|
|
func (a hooksByName) Less(i, j int) bool {
|
|
return a[i].Name < a[j].Name
|
|
}
|
|
|
|
func (a hooksByName) Swap(i, j int) {
|
|
a[i], a[j] = a[j], a[i]
|
|
}
|
|
|
|
func (s *Server) cmdSetHook(msg *Message, chanCmd bool) (
|
|
res resp.Value, d commandDetails, err error,
|
|
) {
|
|
start := time.Now()
|
|
vs := msg.Args[1:]
|
|
var name, urls, cmd string
|
|
var ok bool
|
|
if vs, name, ok = tokenval(vs); !ok || name == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
var endpoints []string
|
|
if chanCmd {
|
|
endpoints = []string{"local://" + name}
|
|
} else {
|
|
if vs, urls, ok = tokenval(vs); !ok || urls == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
for _, url := range strings.Split(urls, ",") {
|
|
url = strings.TrimSpace(url)
|
|
err := s.epc.Validate(url)
|
|
if err != nil {
|
|
log.Errorf("sethook: %v", err)
|
|
return resp.SimpleStringValue(""), d, errInvalidArgument(url)
|
|
}
|
|
endpoints = append(endpoints, url)
|
|
}
|
|
}
|
|
var commandvs []string
|
|
var cmdlc string
|
|
var types []string
|
|
var expires float64
|
|
var expiresSet bool
|
|
metaMap := make(map[string]string)
|
|
for {
|
|
commandvs = vs
|
|
if vs, cmd, ok = tokenval(vs); !ok || cmd == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
cmdlc = strings.ToLower(cmd)
|
|
switch cmdlc {
|
|
default:
|
|
return NOMessage, d, errInvalidArgument(cmd)
|
|
case "meta":
|
|
var metakey string
|
|
var metaval string
|
|
if vs, metakey, ok = tokenval(vs); !ok || metakey == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
if vs, metaval, ok = tokenval(vs); !ok || metaval == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
metaMap[metakey] = metaval
|
|
continue
|
|
case "ex":
|
|
var s string
|
|
if vs, s, ok = tokenval(vs); !ok || s == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
v, err := strconv.ParseFloat(s, 64)
|
|
if err != nil {
|
|
return NOMessage, d, errInvalidArgument(s)
|
|
}
|
|
expires = v
|
|
expiresSet = true
|
|
continue
|
|
case "nearby":
|
|
types = nearbyTypes
|
|
case "within", "intersects":
|
|
types = withinOrIntersectsTypes
|
|
}
|
|
break
|
|
}
|
|
args, err := s.cmdSearchArgs(true, cmdlc, vs, types)
|
|
defer args.Close()
|
|
if err != nil {
|
|
return NOMessage, d, err
|
|
}
|
|
if !args.fence {
|
|
return NOMessage, d, errors.New("missing FENCE argument")
|
|
}
|
|
args.cmd = cmdlc
|
|
cmsg := &Message{}
|
|
*cmsg = *msg
|
|
cmsg.Args = make([]string, len(commandvs))
|
|
for i := 0; i < len(commandvs); i++ {
|
|
cmsg.Args[i] = commandvs[i]
|
|
}
|
|
metas := make([]FenceMeta, 0, len(metaMap))
|
|
for key, val := range metaMap {
|
|
metas = append(metas, FenceMeta{key, val})
|
|
}
|
|
sort.Sort(hookMetaByName(metas))
|
|
|
|
hook := &Hook{
|
|
Key: args.key,
|
|
Name: name,
|
|
Endpoints: endpoints,
|
|
Fence: &args,
|
|
Message: cmsg,
|
|
epm: s.epc,
|
|
Metas: metas,
|
|
channel: chanCmd,
|
|
cond: sync.NewCond(&sync.Mutex{}),
|
|
counter: &s.statsTotalMsgsSent,
|
|
}
|
|
if expiresSet {
|
|
hook.expires =
|
|
time.Now().Add(time.Duration(expires * float64(time.Second)))
|
|
}
|
|
if !chanCmd {
|
|
hook.db = s.qdb
|
|
}
|
|
var wr bytes.Buffer
|
|
hook.ScanWriter, err = s.newScanWriter(
|
|
&wr, cmsg, args.key, args.output, args.precision, args.glob, false,
|
|
args.cursor, args.limit, args.wheres, args.whereins, args.whereevals,
|
|
args.nofields)
|
|
if err != nil {
|
|
|
|
return NOMessage, d, err
|
|
}
|
|
prevHook := s.hooks[name]
|
|
if prevHook != nil {
|
|
if prevHook.channel != chanCmd {
|
|
return NOMessage, d,
|
|
errors.New("hooks and channels cannot share the same name")
|
|
}
|
|
if prevHook.Equals(hook) {
|
|
// it was a match so we do nothing. But let's signal just
|
|
// for good measure.
|
|
prevHook.Signal()
|
|
if !hook.expires.IsZero() {
|
|
s.hookex.Push(hook)
|
|
}
|
|
switch msg.OutputType {
|
|
case JSON:
|
|
return OKMessage(msg, start), d, nil
|
|
case RESP:
|
|
return resp.IntegerValue(0), d, nil
|
|
}
|
|
}
|
|
prevHook.Close()
|
|
delete(s.hooks, name)
|
|
delete(s.hooksOut, name)
|
|
}
|
|
|
|
d.updated = true
|
|
d.timestamp = time.Now()
|
|
|
|
s.hooks[name] = hook
|
|
if hook.Fence.detect == nil || hook.Fence.detect["outside"] {
|
|
s.hooksOut[name] = hook
|
|
}
|
|
|
|
// remove previous hook from spatial index
|
|
if prevHook != nil && prevHook.Fence != nil && prevHook.Fence.obj != nil {
|
|
rect := prevHook.Fence.obj.Rect()
|
|
s.hookTree.Delete(
|
|
[2]float64{rect.Min.X, rect.Min.Y},
|
|
[2]float64{rect.Max.X, rect.Max.Y},
|
|
prevHook)
|
|
}
|
|
// add hook to spatial index
|
|
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
|
|
rect := hook.Fence.obj.Rect()
|
|
s.hookTree.Insert(
|
|
[2]float64{rect.Min.X, rect.Min.Y},
|
|
[2]float64{rect.Max.X, rect.Max.Y},
|
|
hook)
|
|
}
|
|
|
|
hook.Open() // Opens a goroutine to notify the hook
|
|
if !hook.expires.IsZero() {
|
|
s.hookex.Push(hook)
|
|
}
|
|
switch msg.OutputType {
|
|
case JSON:
|
|
return OKMessage(msg, start), d, nil
|
|
case RESP:
|
|
return resp.IntegerValue(1), d, nil
|
|
}
|
|
return NOMessage, d, nil
|
|
}
|
|
|
|
func (s *Server) cmdDelHook(msg *Message, chanCmd bool) (
|
|
res resp.Value, d commandDetails, err error,
|
|
) {
|
|
start := time.Now()
|
|
vs := msg.Args[1:]
|
|
|
|
var name string
|
|
var ok bool
|
|
if vs, name, ok = tokenval(vs); !ok || name == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
if len(vs) != 0 {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
if hook, ok := s.hooks[name]; ok && hook.channel == chanCmd {
|
|
hook.Close()
|
|
// remove hook from maps
|
|
delete(s.hooks, hook.Name)
|
|
delete(s.hooksOut, hook.Name)
|
|
// remove hook from spatial index
|
|
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
|
|
rect := hook.Fence.obj.Rect()
|
|
s.hookTree.Delete(
|
|
[2]float64{rect.Min.X, rect.Min.Y},
|
|
[2]float64{rect.Max.X, rect.Max.Y},
|
|
hook)
|
|
}
|
|
d.updated = true
|
|
}
|
|
d.timestamp = time.Now()
|
|
|
|
switch msg.OutputType {
|
|
case JSON:
|
|
return OKMessage(msg, start), d, nil
|
|
case RESP:
|
|
if d.updated {
|
|
return resp.IntegerValue(1), d, nil
|
|
}
|
|
return resp.IntegerValue(0), d, nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *Server) cmdPDelHook(msg *Message, channel bool) (
|
|
res resp.Value, d commandDetails, err error,
|
|
) {
|
|
start := time.Now()
|
|
vs := msg.Args[1:]
|
|
|
|
var pattern string
|
|
var ok bool
|
|
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
if len(vs) != 0 {
|
|
return NOMessage, d, errInvalidNumberOfArguments
|
|
}
|
|
|
|
count := 0
|
|
for name, hook := range s.hooks {
|
|
if hook.channel != channel {
|
|
continue
|
|
}
|
|
match, _ := glob.Match(pattern, name)
|
|
if !match {
|
|
continue
|
|
}
|
|
hook.Close()
|
|
// remove hook from maps
|
|
delete(s.hooks, hook.Name)
|
|
delete(s.hooksOut, hook.Name)
|
|
// remove hook from spatial index
|
|
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
|
|
rect := hook.Fence.obj.Rect()
|
|
s.hookTree.Delete(
|
|
[2]float64{rect.Min.X, rect.Min.Y},
|
|
[2]float64{rect.Max.X, rect.Max.Y},
|
|
hook)
|
|
}
|
|
d.updated = true
|
|
count++
|
|
}
|
|
d.timestamp = time.Now()
|
|
|
|
switch msg.OutputType {
|
|
case JSON:
|
|
return OKMessage(msg, start), d, nil
|
|
case RESP:
|
|
return resp.IntegerValue(count), d, nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// possiblyExpireHook will evaluate a hook by it's name for expiration and
|
|
// purge it from the database if needed. This operation is called from an
|
|
// independent goroutine
|
|
func (s *Server) possiblyExpireHook(name string) {
|
|
s.mu.Lock()
|
|
if h, ok := s.hooks[name]; ok {
|
|
if !h.expires.IsZero() && time.Now().After(h.expires) {
|
|
// purge from database
|
|
msg := &Message{}
|
|
if h.channel {
|
|
msg.Args = []string{"delchan", h.Name}
|
|
} else {
|
|
msg.Args = []string{"delhook", h.Name}
|
|
}
|
|
_, d, err := s.cmdDelHook(msg, h.channel)
|
|
if err != nil {
|
|
s.mu.Unlock()
|
|
panic(err)
|
|
}
|
|
if err := s.writeAOF(msg.Args, &d); err != nil {
|
|
s.mu.Unlock()
|
|
panic(err)
|
|
}
|
|
log.Debugf("purged hook %v", h.Name)
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *Server) cmdHooks(msg *Message, channel bool) (
|
|
res resp.Value, err error,
|
|
) {
|
|
start := time.Now()
|
|
vs := msg.Args[1:]
|
|
|
|
var pattern string
|
|
var ok bool
|
|
|
|
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
|
|
return NOMessage, errInvalidNumberOfArguments
|
|
}
|
|
if len(vs) != 0 {
|
|
return NOMessage, errInvalidNumberOfArguments
|
|
}
|
|
|
|
var hooks []*Hook
|
|
for name, hook := range s.hooks {
|
|
if hook.channel != channel {
|
|
continue
|
|
}
|
|
match, _ := glob.Match(pattern, name)
|
|
if match {
|
|
hooks = append(hooks, hook)
|
|
}
|
|
}
|
|
sort.Sort(hooksByName(hooks))
|
|
|
|
switch msg.OutputType {
|
|
case JSON:
|
|
buf := &bytes.Buffer{}
|
|
buf.WriteString(`{"ok":true,`)
|
|
if channel {
|
|
buf.WriteString(`"chans":[`)
|
|
} else {
|
|
buf.WriteString(`"hooks":[`)
|
|
}
|
|
for i, hook := range hooks {
|
|
if i > 0 {
|
|
buf.WriteByte(',')
|
|
}
|
|
buf.WriteString(`{`)
|
|
buf.WriteString(`"name":` + jsonString(hook.Name))
|
|
buf.WriteString(`,"key":` + jsonString(hook.Key))
|
|
if !channel {
|
|
buf.WriteString(`,"endpoints":[`)
|
|
for i, endpoint := range hook.Endpoints {
|
|
if i > 0 {
|
|
buf.WriteByte(',')
|
|
}
|
|
buf.WriteString(jsonString(endpoint))
|
|
}
|
|
buf.WriteString(`]`)
|
|
}
|
|
buf.WriteString(`,"command":[`)
|
|
for i, v := range hook.Message.Args {
|
|
if i > 0 {
|
|
buf.WriteString(`,`)
|
|
}
|
|
buf.WriteString(jsonString(v))
|
|
}
|
|
buf.WriteString(`],"meta":{`)
|
|
for i, meta := range hook.Metas {
|
|
if i > 0 {
|
|
buf.WriteString(`,`)
|
|
}
|
|
buf.WriteString(jsonString(meta.Name))
|
|
buf.WriteString(`:`)
|
|
buf.WriteString(jsonString(meta.Value))
|
|
}
|
|
buf.WriteString(`}}`)
|
|
}
|
|
buf.WriteString(`],"elapsed":"` +
|
|
time.Now().Sub(start).String() + "\"}")
|
|
return resp.StringValue(buf.String()), nil
|
|
case RESP:
|
|
var vals []resp.Value
|
|
for _, hook := range hooks {
|
|
var hvals []resp.Value
|
|
hvals = append(hvals, resp.StringValue(hook.Name))
|
|
hvals = append(hvals, resp.StringValue(hook.Key))
|
|
var evals []resp.Value
|
|
for _, endpoint := range hook.Endpoints {
|
|
evals = append(evals, resp.StringValue(endpoint))
|
|
}
|
|
hvals = append(hvals, resp.ArrayValue(evals))
|
|
avals := make([]resp.Value, len(hook.Message.Args))
|
|
for i := 0; i < len(hook.Message.Args); i++ {
|
|
avals[i] = resp.StringValue(hook.Message.Args[i])
|
|
}
|
|
hvals = append(hvals, resp.ArrayValue(avals))
|
|
var metas []resp.Value
|
|
for _, meta := range hook.Metas {
|
|
metas = append(metas, resp.StringValue(meta.Name))
|
|
metas = append(metas, resp.StringValue(meta.Value))
|
|
}
|
|
hvals = append(hvals, resp.ArrayValue(metas))
|
|
vals = append(vals, resp.ArrayValue(hvals))
|
|
}
|
|
return resp.ArrayValue(vals), nil
|
|
}
|
|
return resp.SimpleStringValue(""), nil
|
|
}
|
|
|
|
// Hook represents a hook.
|
|
type Hook struct {
|
|
cond *sync.Cond
|
|
Key string
|
|
Name string
|
|
Endpoints []string
|
|
Message *Message
|
|
Fence *liveFenceSwitches
|
|
ScanWriter *scanWriter
|
|
Metas []FenceMeta
|
|
db *buntdb.DB
|
|
channel bool
|
|
closed bool
|
|
opened bool
|
|
query string
|
|
epm *endpoint.Manager
|
|
expires time.Time
|
|
counter *aint // counter that grows when a message was sent
|
|
sig int
|
|
}
|
|
|
|
// Expires returns when the hook expires. Required by the expire.Item interface.
|
|
func (h *Hook) Expires() time.Time {
|
|
return h.expires
|
|
}
|
|
|
|
// Equals returns true if two hooks are equal
|
|
func (h *Hook) Equals(hook *Hook) bool {
|
|
if h.Key != hook.Key ||
|
|
h.Name != hook.Name ||
|
|
len(h.Endpoints) != len(hook.Endpoints) ||
|
|
len(h.Metas) != len(hook.Metas) {
|
|
return false
|
|
}
|
|
if !h.expires.Equal(hook.expires) {
|
|
return false
|
|
}
|
|
for i, endpoint := range h.Endpoints {
|
|
if endpoint != hook.Endpoints[i] {
|
|
return false
|
|
}
|
|
}
|
|
for i, meta := range h.Metas {
|
|
if meta.Name != hook.Metas[i].Name ||
|
|
meta.Value != hook.Metas[i].Value {
|
|
return false
|
|
}
|
|
}
|
|
if len(h.Message.Args) != len(hook.Message.Args) {
|
|
return false
|
|
}
|
|
for i := 0; i < len(h.Message.Args); i++ {
|
|
if h.Message.Args[i] != hook.Message.Args[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// FenceMeta is a meta key/value pair for fences
|
|
type FenceMeta struct {
|
|
Name, Value string
|
|
}
|
|
|
|
type hookMetaByName []FenceMeta
|
|
|
|
func (arr hookMetaByName) Len() int {
|
|
return len(arr)
|
|
}
|
|
|
|
func (arr hookMetaByName) Less(a, b int) bool {
|
|
return arr[a].Name < arr[b].Name
|
|
}
|
|
|
|
func (arr hookMetaByName) Swap(a, b int) {
|
|
arr[a], arr[b] = arr[b], arr[a]
|
|
}
|
|
|
|
// Open is called when a hook is first created. It calls the manager
|
|
// function in a goroutine
|
|
func (h *Hook) Open() {
|
|
if h.channel {
|
|
// nothing to open for channels
|
|
return
|
|
}
|
|
h.cond.L.Lock()
|
|
defer h.cond.L.Unlock()
|
|
if h.opened {
|
|
return
|
|
}
|
|
h.opened = true
|
|
h.query = `{"hook":` + jsonString(h.Name) + `}`
|
|
go h.manager()
|
|
}
|
|
|
|
// Close closed the hook and stop the manager function
|
|
func (h *Hook) Close() {
|
|
if h.channel {
|
|
// nothing to close for channels
|
|
return
|
|
}
|
|
h.cond.L.Lock()
|
|
defer h.cond.L.Unlock()
|
|
if h.closed {
|
|
return
|
|
}
|
|
h.closed = true
|
|
h.cond.Broadcast()
|
|
}
|
|
|
|
// Signal can be called at any point to wake up the hook and
|
|
// notify the manager that there may be something new in the queue.
|
|
func (h *Hook) Signal() {
|
|
if h.channel {
|
|
// nothing to signal for channels
|
|
return
|
|
}
|
|
h.cond.L.Lock()
|
|
h.sig++
|
|
h.cond.Broadcast()
|
|
h.cond.L.Unlock()
|
|
}
|
|
|
|
// the manager is a forever loop that calls proc whenever there's a signal.
|
|
// it ends when the "closed" flag is set.
|
|
func (h *Hook) manager() {
|
|
// lock the hook to waiting on signals
|
|
h.cond.L.Lock()
|
|
defer h.cond.L.Unlock()
|
|
var sig int
|
|
for {
|
|
if h.closed {
|
|
// the hook has closed, end manager
|
|
return
|
|
}
|
|
sig = h.sig
|
|
// unlock/logk the hook and send outgoing messages
|
|
if !func() bool {
|
|
h.cond.L.Unlock()
|
|
defer h.cond.L.Lock()
|
|
return h.proc()
|
|
}() {
|
|
// a send failed, try again in a moment
|
|
time.Sleep(time.Second / 2)
|
|
continue
|
|
}
|
|
if sig != h.sig {
|
|
// there was another incoming signal
|
|
continue
|
|
}
|
|
// wait on signal
|
|
h.cond.Wait()
|
|
}
|
|
}
|
|
|
|
// proc processes queued hook logs.
|
|
// returning true will indicate that all log entries have been
|
|
// successfully handled.
|
|
func (h *Hook) proc() (ok bool) {
|
|
var keys, vals []string
|
|
var ttls []time.Duration
|
|
start := time.Now()
|
|
err := h.db.Update(func(tx *buntdb.Tx) error {
|
|
// get keys and vals
|
|
err := tx.AscendGreaterOrEqual("hooks",
|
|
h.query, func(key, val string) bool {
|
|
if strings.HasPrefix(key, hookLogPrefix) {
|
|
// Verify this hooks name matches the one in the notif
|
|
if h.Name == gjson.Get(val, "hook").String() {
|
|
keys = append(keys, key)
|
|
vals = append(vals, val)
|
|
}
|
|
}
|
|
return true
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// delete the keys
|
|
for _, key := range keys {
|
|
ttl, err := tx.TTL(key)
|
|
if err != nil {
|
|
if err != buntdb.ErrNotFound {
|
|
return err
|
|
}
|
|
}
|
|
ttls = append(ttls, ttl)
|
|
_, err = tx.Delete(key)
|
|
if err != nil {
|
|
if err != buntdb.ErrNotFound {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
log.Error(err)
|
|
return false
|
|
}
|
|
|
|
// send each val. on failure reinsert that one and all of the following
|
|
for i, key := range keys {
|
|
val := vals[i]
|
|
idx := stringToUint64(key[len(hookLogPrefix):])
|
|
var sent bool
|
|
for _, endpoint := range h.Endpoints {
|
|
err := h.epm.Send(endpoint, val)
|
|
if err != nil {
|
|
log.Debugf("Endpoint connect/send error: %v: %v: %v",
|
|
idx, endpoint, err)
|
|
continue
|
|
}
|
|
log.Debugf("Endpoint send ok: %v: %v: %v", idx, endpoint, err)
|
|
sent = true
|
|
h.counter.add(1)
|
|
break
|
|
}
|
|
if !sent {
|
|
// failed to send. try to reinsert the remaining.
|
|
// if this fails we lose log entries.
|
|
keys = keys[i:]
|
|
vals = vals[i:]
|
|
ttls = ttls[i:]
|
|
h.db.Update(func(tx *buntdb.Tx) error {
|
|
for i, key := range keys {
|
|
val := vals[i]
|
|
ttl := ttls[i] - time.Since(start)
|
|
if ttl > 0 {
|
|
opts := &buntdb.SetOptions{
|
|
Expires: true,
|
|
TTL: ttl,
|
|
}
|
|
_, _, err := tx.Set(key, val, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|