tile38/internal/server/hooks.go

736 lines
17 KiB
Go
Raw Normal View History

package server
2016-03-19 17:16:19 +03:00
import (
"bytes"
"errors"
"sort"
2018-08-14 03:36:02 +03:00
"strconv"
2016-03-19 17:16:19 +03:00
"strings"
2016-09-11 17:49:48 +03:00
"sync"
"sync/atomic"
2016-03-19 17:16:19 +03:00
"time"
2016-09-11 17:49:48 +03:00
"github.com/tidwall/buntdb"
2019-02-23 01:58:13 +03:00
"github.com/tidwall/gjson"
2016-03-29 00:16:21 +03:00
"github.com/tidwall/resp"
"github.com/tidwall/tile38/internal/endpoint"
"github.com/tidwall/tile38/internal/glob"
"github.com/tidwall/tile38/internal/log"
2016-03-19 17:16:19 +03:00
)
2018-04-18 01:40:11 +03:00
var hookLogSetDefaults = &buntdb.SetOptions{
Expires: true, // automatically delete after 30 seconds
TTL: time.Second * 30,
2016-03-19 17:16:19 +03:00
}
func byHookName(a, b interface{}) bool {
return a.(*Hook).Name < b.(*Hook).Name
2016-03-19 17:16:19 +03:00
}
func (s *Server) cmdSetHook(msg *Message) (
2018-11-24 01:53:33 +03:00
res resp.Value, d commandDetails, err error,
2018-08-14 03:05:30 +03:00
) {
channel := msg.Command() == "setchan"
2016-03-29 22:29:15 +03:00
start := time.Now()
vs := msg.Args[1:]
2016-09-11 17:49:48 +03:00
var name, urls, cmd string
2016-03-29 22:29:15 +03:00
var ok bool
if vs, name, ok = tokenval(vs); !ok || name == "" {
return NOMessage, d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-09-11 17:49:48 +03:00
var endpoints []string
if channel {
2018-08-14 03:05:30 +03:00
endpoints = []string{"local://" + name}
} else {
if vs, urls, ok = tokenval(vs); !ok || urls == "" {
return NOMessage, d, errInvalidNumberOfArguments
2018-08-14 03:05:30 +03:00
}
for _, url := range strings.Split(urls, ",") {
url = strings.TrimSpace(url)
err := s.epc.Validate(url)
2018-08-14 03:05:30 +03:00
if err != nil {
log.Errorf("sethook: %v", err)
return resp.SimpleStringValue(""), d, errInvalidArgument(url)
}
endpoints = append(endpoints, url)
2016-03-20 18:24:20 +03:00
}
2016-03-19 17:16:19 +03:00
}
var commandvs []string
var cmdlc string
var types map[string]bool
2018-08-14 03:36:02 +03:00
var expires float64
var expiresSet bool
metaMap := make(map[string]string)
for {
commandvs = vs
if vs, cmd, ok = tokenval(vs); !ok || cmd == "" {
return NOMessage, d, errInvalidNumberOfArguments
}
cmdlc = strings.ToLower(cmd)
switch cmdlc {
default:
return NOMessage, d, errInvalidArgument(cmd)
case "meta":
var metakey string
var metaval string
if vs, metakey, ok = tokenval(vs); !ok || metakey == "" {
return NOMessage, d, errInvalidNumberOfArguments
}
if vs, metaval, ok = tokenval(vs); !ok || metaval == "" {
return NOMessage, d, errInvalidNumberOfArguments
}
metaMap[metakey] = metaval
continue
2018-08-14 03:36:02 +03:00
case "ex":
var s string
if vs, s, ok = tokenval(vs); !ok || s == "" {
return NOMessage, d, errInvalidNumberOfArguments
2018-08-14 03:36:02 +03:00
}
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return NOMessage, d, errInvalidArgument(s)
2018-08-14 03:36:02 +03:00
}
expires = v
expiresSet = true
continue
case "nearby":
types = nearbyTypes
case "within", "intersects":
types = withinOrIntersectsTypes
}
break
2016-03-19 17:16:19 +03:00
}
args, err := s.cmdSearchArgs(true, cmdlc, vs, types)
if args.usingLua() {
defer args.Close()
}
2016-03-19 17:16:19 +03:00
if err != nil {
return NOMessage, d, err
2016-03-19 17:16:19 +03:00
}
if !args.fence {
return NOMessage, d, errors.New("missing FENCE argument")
2016-03-19 17:16:19 +03:00
}
args.cmd = cmdlc
cmsg := &Message{}
2016-03-29 22:29:15 +03:00
*cmsg = *msg
cmsg.Args = make([]string, len(commandvs))
2022-09-13 03:06:27 +03:00
copy(cmsg.Args, commandvs)
metas := make([]FenceMeta, 0, len(metaMap))
for key, val := range metaMap {
metas = append(metas, FenceMeta{key, val})
}
sort.Sort(hookMetaByName(metas))
2016-03-19 17:16:19 +03:00
hook := &Hook{
Key: args.key,
2016-03-20 18:24:20 +03:00
Name: name,
Endpoints: endpoints,
Fence: &args,
2016-03-29 22:29:15 +03:00
Message: cmsg,
epm: s.epc,
Metas: metas,
channel: channel,
2018-08-14 03:05:30 +03:00
cond: sync.NewCond(&sync.Mutex{}),
counter: &s.statsTotalMsgsSent,
2018-08-14 03:05:30 +03:00
}
2018-08-14 03:36:02 +03:00
if expiresSet {
hook.expires =
time.Now().Add(time.Duration(expires * float64(time.Second)))
}
if !channel {
hook.db = s.qdb
2016-03-19 17:16:19 +03:00
}
var wr bytes.Buffer
hook.ScanWriter, err = s.newScanWriter(
&wr, cmsg, args.key, args.output, args.precision, args.globs, false,
args.cursor, args.limit, args.wheres, args.whereins, args.whereevals,
args.nofields)
2016-03-19 17:16:19 +03:00
if err != nil {
2018-08-14 03:36:02 +03:00
return NOMessage, d, err
2016-03-19 17:16:19 +03:00
}
prevHook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook)
2018-11-24 04:15:14 +03:00
if prevHook != nil {
if prevHook.channel != channel {
return NOMessage, d,
2018-08-14 03:05:30 +03:00
errors.New("hooks and channels cannot share the same name")
}
2018-11-24 04:15:14 +03:00
if prevHook.Equals(hook) {
// it was a match so we do nothing. But let's signal just
// for good measure.
2018-11-24 04:15:14 +03:00
prevHook.Signal()
2018-08-14 03:36:02 +03:00
if !hook.expires.IsZero() {
2021-09-12 19:55:58 +03:00
s.hookExpires.Set(hook)
2018-08-14 03:36:02 +03:00
}
switch msg.OutputType {
case JSON:
return OKMessage(msg, start), d, nil
case RESP:
2017-10-05 18:20:40 +03:00
return resp.IntegerValue(0), d, nil
2016-03-29 22:29:15 +03:00
}
}
2018-11-24 04:15:14 +03:00
prevHook.Close()
s.hooks.Delete(prevHook)
s.hooksOut.Delete(prevHook)
2021-09-12 19:55:58 +03:00
if !prevHook.expires.IsZero() {
s.hookExpires.Delete(prevHook)
}
s.groupDisconnectHook(name)
2016-03-19 17:16:19 +03:00
}
2018-08-14 03:36:02 +03:00
2016-03-29 22:29:15 +03:00
d.updated = true
2016-04-02 17:20:30 +03:00
d.timestamp = time.Now()
2018-11-24 04:15:14 +03:00
s.hooks.Set(hook)
2018-11-24 04:15:14 +03:00
if hook.Fence.detect == nil || hook.Fence.detect["outside"] {
s.hooksOut.Set(hook)
2016-03-19 17:16:19 +03:00
}
2018-11-24 04:15:14 +03:00
// remove previous hook from spatial index
if prevHook != nil && prevHook.Fence != nil && prevHook.Fence.obj != nil {
rect := prevHook.Fence.obj.Rect()
s.hookTree.Delete(
2019-09-13 04:42:53 +03:00
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
2018-11-24 04:15:14 +03:00
prevHook)
if prevHook.Fence.detect["cross"] {
s.hookCross.Delete(
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
prevHook)
}
2018-11-24 04:15:14 +03:00
}
// add hook to spatial index
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
rect := hook.Fence.obj.Rect()
s.hookTree.Insert(
2019-09-13 04:42:53 +03:00
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
2018-11-24 04:15:14 +03:00
hook)
if hook.Fence.detect["cross"] {
s.hookCross.Insert(
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
hook)
}
2018-11-24 04:15:14 +03:00
}
2019-01-09 10:23:53 +03:00
hook.Open() // Opens a goroutine to notify the hook
2018-08-14 03:36:02 +03:00
if !hook.expires.IsZero() {
2021-09-12 19:55:58 +03:00
s.hookExpires.Set(hook)
2018-08-14 03:36:02 +03:00
}
2016-03-29 22:29:15 +03:00
switch msg.OutputType {
case JSON:
return OKMessage(msg, start), d, nil
case RESP:
2017-10-05 18:20:40 +03:00
return resp.IntegerValue(1), d, nil
2016-03-29 22:29:15 +03:00
}
return NOMessage, d, nil
2016-03-19 17:16:19 +03:00
}
2021-09-12 19:55:58 +03:00
func byHookExpires(a, b interface{}) bool {
ha := a.(*Hook)
hb := b.(*Hook)
if ha.expires.Before(hb.expires) {
return true
}
if ha.expires.After(hb.expires) {
return false
}
return ha.Name < hb.Name
}
func (s *Server) cmdDelHook(msg *Message) (
2018-11-24 01:53:33 +03:00
res resp.Value, d commandDetails, err error,
2018-08-14 03:05:30 +03:00
) {
channel := msg.Command() == "delchan"
2016-03-29 22:29:15 +03:00
start := time.Now()
vs := msg.Args[1:]
2016-03-29 22:29:15 +03:00
2016-03-19 17:16:19 +03:00
var name string
2016-03-29 22:29:15 +03:00
var ok bool
if vs, name, ok = tokenval(vs); !ok || name == "" {
return NOMessage, d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
if len(vs) != 0 {
return NOMessage, d, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
hook, _ := s.hooks.Get(&Hook{Name: name}).(*Hook)
if hook != nil && hook.channel == channel {
2018-11-24 04:15:14 +03:00
hook.Close()
// remove hook from maps
s.hooks.Delete(hook)
s.hooksOut.Delete(hook)
2021-09-12 19:55:58 +03:00
if !hook.expires.IsZero() {
s.hookExpires.Delete(hook)
}
// remove any hook / object connections
s.groupDisconnectHook(hook.Name)
2018-11-24 04:15:14 +03:00
// remove hook from spatial index
if hook.Fence != nil && hook.Fence.obj != nil {
2018-11-24 04:15:14 +03:00
rect := hook.Fence.obj.Rect()
s.hookTree.Delete(
2019-09-13 04:42:53 +03:00
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
2018-11-24 04:15:14 +03:00
hook)
if hook.Fence.detect["cross"] {
s.hookCross.Delete(
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
hook)
}
2018-11-24 04:15:14 +03:00
}
d.updated = true
2016-03-29 22:29:15 +03:00
}
2016-04-02 17:20:30 +03:00
d.timestamp = time.Now()
2016-03-29 22:29:15 +03:00
switch msg.OutputType {
case JSON:
return OKMessage(msg, start), d, nil
case RESP:
2016-03-29 22:29:15 +03:00
if d.updated {
2017-10-05 18:20:40 +03:00
return resp.IntegerValue(1), d, nil
2016-03-29 22:29:15 +03:00
}
2017-10-05 18:20:40 +03:00
return resp.IntegerValue(0), d, nil
2016-03-19 17:16:19 +03:00
}
return
}
func (s *Server) cmdPDelHook(msg *Message) (
2018-11-24 01:53:33 +03:00
res resp.Value, d commandDetails, err error,
2018-08-14 03:05:30 +03:00
) {
channel := msg.Command() == "pdelchan"
2016-09-12 05:20:53 +03:00
start := time.Now()
vs := msg.Args[1:]
2016-09-12 05:20:53 +03:00
var pattern string
var ok bool
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
return NOMessage, d, errInvalidNumberOfArguments
2016-09-12 05:20:53 +03:00
}
if len(vs) != 0 {
return NOMessage, d, errInvalidNumberOfArguments
2016-09-12 05:20:53 +03:00
}
count := 0
var hooks []*Hook
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
hooks = append(hooks, hook)
return true
})
for _, hook := range hooks {
if hook.channel != channel {
2018-08-14 03:05:30 +03:00
continue
}
hook.Close()
// remove hook from maps
s.hooks.Delete(hook)
s.hooksOut.Delete(hook)
2021-09-12 19:55:58 +03:00
if !hook.expires.IsZero() {
s.hookExpires.Delete(hook)
}
// remove any hook / object connections
s.groupDisconnectHook(hook.Name)
// remove hook from spatial index
if hook.Fence != nil && hook.Fence.obj != nil {
rect := hook.Fence.obj.Rect()
s.hookTree.Delete(
2019-09-13 04:42:53 +03:00
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
hook)
if hook.Fence.detect["cross"] {
s.hookCross.Delete(
[2]float64{rect.Min.X, rect.Min.Y},
[2]float64{rect.Max.X, rect.Max.Y},
hook)
}
}
2018-08-14 03:05:30 +03:00
d.updated = true
count++
2016-09-12 05:20:53 +03:00
}
d.timestamp = time.Now()
switch msg.OutputType {
case JSON:
return OKMessage(msg, start), d, nil
case RESP:
2017-10-05 18:20:40 +03:00
return resp.IntegerValue(count), d, nil
2016-09-12 05:20:53 +03:00
}
return
}
func (s *Server) forEachHookByPattern(
pattern string, channel bool, iter func(hook *Hook) bool,
) {
g := glob.Parse(pattern, false)
hasUpperLimit := g.Limits[1] != ""
s.hooks.Ascend(&Hook{Name: g.Limits[0]}, func(v interface{}) bool {
hook := v.(*Hook)
if hasUpperLimit && hook.Name > g.Limits[1] {
return false
}
if hook.channel == channel {
match, _ := glob.Match(pattern, hook.Name)
if match {
return iter(hook)
}
}
return true
})
}
func (s *Server) cmdHooks(msg *Message) (
2018-08-14 03:05:30 +03:00
res resp.Value, err error,
) {
channel := msg.Command() == "chans"
2016-03-19 17:16:19 +03:00
start := time.Now()
vs := msg.Args[1:]
2016-03-19 17:16:19 +03:00
var pattern string
2016-03-29 22:29:15 +03:00
var ok bool
2017-10-05 18:20:40 +03:00
2016-03-29 22:29:15 +03:00
if vs, pattern, ok = tokenval(vs); !ok || pattern == "" {
return NOMessage, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
if len(vs) != 0 {
return NOMessage, errInvalidNumberOfArguments
2016-03-19 17:16:19 +03:00
}
2016-03-29 22:29:15 +03:00
switch msg.OutputType {
case JSON:
2016-03-29 22:29:15 +03:00
buf := &bytes.Buffer{}
2018-08-14 03:05:30 +03:00
buf.WriteString(`{"ok":true,`)
if channel {
buf.WriteString(`"chans":[`)
} else {
buf.WriteString(`"hooks":[`)
}
var i int
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
2021-09-12 19:03:05 +03:00
var ttl = -1
if !hook.expires.IsZero() {
ttl = int(hook.expires.Sub(start).Seconds())
if ttl < 0 {
ttl = 0
}
}
2016-03-20 18:24:20 +03:00
if i > 0 {
buf.WriteByte(',')
}
2016-03-29 22:29:15 +03:00
buf.WriteString(`{`)
buf.WriteString(`"name":` + jsonString(hook.Name))
buf.WriteString(`,"key":` + jsonString(hook.Key))
2021-09-12 19:03:05 +03:00
buf.WriteString(`,"ttl":` + strconv.Itoa(ttl))
2018-08-14 03:05:30 +03:00
if !channel {
buf.WriteString(`,"endpoints":[`)
for i, endpoint := range hook.Endpoints {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(jsonString(endpoint))
2016-03-29 22:29:15 +03:00
}
buf.WriteString(`]`)
2016-03-29 22:29:15 +03:00
}
buf.WriteString(`,"command":[`)
for i, v := range hook.Message.Args {
2016-03-29 22:29:15 +03:00
if i > 0 {
buf.WriteString(`,`)
}
buf.WriteString(jsonString(v))
2016-03-29 22:29:15 +03:00
}
buf.WriteString(`],"meta":{`)
for i, meta := range hook.Metas {
if i > 0 {
buf.WriteString(`,`)
}
buf.WriteString(jsonString(meta.Name))
buf.WriteString(`:`)
buf.WriteString(jsonString(meta.Value))
}
buf.WriteString(`}}`)
i++
return true
})
2018-08-14 03:05:30 +03:00
buf.WriteString(`],"elapsed":"` +
time.Since(start).String() + "\"}")
2017-10-05 18:20:40 +03:00
return resp.StringValue(buf.String()), nil
case RESP:
2016-03-29 22:29:15 +03:00
var vals []resp.Value
s.forEachHookByPattern(pattern, channel, func(hook *Hook) bool {
2016-03-29 22:29:15 +03:00
var hvals []resp.Value
hvals = append(hvals, resp.StringValue(hook.Name))
hvals = append(hvals, resp.StringValue(hook.Key))
var evals []resp.Value
for _, endpoint := range hook.Endpoints {
2016-09-11 17:49:48 +03:00
evals = append(evals, resp.StringValue(endpoint))
2016-03-29 22:29:15 +03:00
}
hvals = append(hvals, resp.ArrayValue(evals))
avals := make([]resp.Value, len(hook.Message.Args))
for i := 0; i < len(hook.Message.Args); i++ {
avals[i] = resp.StringValue(hook.Message.Args[i])
}
hvals = append(hvals, resp.ArrayValue(avals))
var metas []resp.Value
for _, meta := range hook.Metas {
metas = append(metas, resp.StringValue(meta.Name))
metas = append(metas, resp.StringValue(meta.Value))
}
hvals = append(hvals, resp.ArrayValue(metas))
2016-03-29 22:29:15 +03:00
vals = append(vals, resp.ArrayValue(hvals))
return true
})
2017-10-05 18:20:40 +03:00
return resp.ArrayValue(vals), nil
2016-03-19 17:16:19 +03:00
}
2017-10-05 18:20:40 +03:00
return resp.SimpleStringValue(""), nil
2016-03-19 17:16:19 +03:00
}
2016-09-11 17:49:48 +03:00
// Hook represents a hook.
type Hook struct {
cond *sync.Cond
Key string
Name string
Endpoints []string
Message *Message
2016-09-11 17:49:48 +03:00
Fence *liveFenceSwitches
ScanWriter *scanWriter
Metas []FenceMeta
2016-09-11 17:49:48 +03:00
db *buntdb.DB
2018-08-14 03:05:30 +03:00
channel bool
2016-09-11 17:49:48 +03:00
closed bool
opened bool
query string
2018-04-19 19:25:39 +03:00
epm *endpoint.Manager
2018-08-14 03:05:30 +03:00
expires time.Time
counter *atomic.Int64 // counter that grows when a message was sent
sig int
2016-09-11 17:49:48 +03:00
}
2018-08-14 03:36:02 +03:00
// Expires returns when the hook expires. Required by the expire.Item interface.
func (h *Hook) Expires() time.Time {
return h.expires
}
2018-08-14 03:05:30 +03:00
// Equals returns true if two hooks are equal
func (h *Hook) Equals(hook *Hook) bool {
if h.Key != hook.Key ||
h.Name != hook.Name ||
len(h.Endpoints) != len(hook.Endpoints) ||
len(h.Metas) != len(hook.Metas) {
return false
}
2018-08-14 03:36:02 +03:00
if !h.expires.Equal(hook.expires) {
return false
}
for i, endpoint := range h.Endpoints {
if endpoint != hook.Endpoints[i] {
return false
}
}
for i, meta := range h.Metas {
if meta.Name != hook.Metas[i].Name ||
meta.Value != hook.Metas[i].Value {
return false
}
}
if len(h.Message.Args) != len(hook.Message.Args) {
return false
}
for i := 0; i < len(h.Message.Args); i++ {
if h.Message.Args[i] != hook.Message.Args[i] {
return false
}
}
return true
}
2018-08-14 03:05:30 +03:00
// FenceMeta is a meta key/value pair for fences
type FenceMeta struct {
Name, Value string
}
type hookMetaByName []FenceMeta
func (arr hookMetaByName) Len() int {
return len(arr)
}
func (arr hookMetaByName) Less(a, b int) bool {
return arr[a].Name < arr[b].Name
}
func (arr hookMetaByName) Swap(a, b int) {
arr[a], arr[b] = arr[b], arr[a]
}
2016-09-11 17:49:48 +03:00
// Open is called when a hook is first created. It calls the manager
// function in a goroutine
func (h *Hook) Open() {
2018-08-14 03:05:30 +03:00
if h.channel {
// nothing to open for channels
return
}
h.cond.L.Lock()
defer h.cond.L.Unlock()
2016-09-11 17:49:48 +03:00
if h.opened {
return
}
h.opened = true
2018-08-14 03:05:30 +03:00
h.query = `{"hook":` + jsonString(h.Name) + `}`
2016-09-11 17:49:48 +03:00
go h.manager()
}
// Close closed the hook and stop the manager function
func (h *Hook) Close() {
2018-08-14 03:05:30 +03:00
if h.channel {
// nothing to close for channels
return
}
h.cond.L.Lock()
defer h.cond.L.Unlock()
2016-09-11 17:49:48 +03:00
if h.closed {
return
}
h.closed = true
h.cond.Broadcast()
}
// Signal can be called at any point to wake up the hook and
// notify the manager that there may be something new in the queue.
func (h *Hook) Signal() {
2018-08-14 03:05:30 +03:00
if h.channel {
// nothing to signal for channels
return
}
h.cond.L.Lock()
h.sig++
2016-09-11 17:49:48 +03:00
h.cond.Broadcast()
2018-08-14 03:05:30 +03:00
h.cond.L.Unlock()
2016-09-11 17:49:48 +03:00
}
// the manager is a forever loop that calls proc whenever there's a signal.
// it ends when the "closed" flag is set.
func (h *Hook) manager() {
// lock the hook to waiting on signals
h.cond.L.Lock()
defer h.cond.L.Unlock()
var sig int
2016-09-11 17:49:48 +03:00
for {
if h.closed {
// the hook has closed, end manager
return
}
sig = h.sig
// unlock/logk the hook and send outgoing messages
if !func() bool {
2018-08-14 03:05:30 +03:00
h.cond.L.Unlock()
defer h.cond.L.Lock()
return h.proc()
}() {
// a send failed, try again in a moment
2018-08-14 03:05:30 +03:00
time.Sleep(time.Second / 2)
continue
}
if sig != h.sig {
// there was another incoming signal
continue
2016-09-11 17:49:48 +03:00
}
// wait on signal
2016-09-11 17:49:48 +03:00
h.cond.Wait()
}
}
2016-09-12 05:01:24 +03:00
// proc processes queued hook logs.
// returning true will indicate that all log entries have been
// successfully handled.
2016-09-11 17:49:48 +03:00
func (h *Hook) proc() (ok bool) {
var keys, vals []string
var ttls []time.Duration
2018-04-18 01:40:11 +03:00
start := time.Now()
2016-09-11 17:49:48 +03:00
err := h.db.Update(func(tx *buntdb.Tx) error {
// get keys and vals
2018-08-14 03:05:30 +03:00
err := tx.AscendGreaterOrEqual("hooks",
h.query, func(key, val string) bool {
if strings.HasPrefix(key, hookLogPrefix) {
2019-02-23 01:58:13 +03:00
// Verify this hooks name matches the one in the notif
if h.Name == gjson.Get(val, "hook").String() {
keys = append(keys, key)
vals = append(vals, val)
}
2018-08-14 03:05:30 +03:00
}
return true
},
)
2016-09-11 17:49:48 +03:00
if err != nil {
return err
}
2016-09-12 05:01:24 +03:00
2016-09-11 17:49:48 +03:00
// delete the keys
for _, key := range keys {
2018-04-18 01:40:11 +03:00
ttl, err := tx.TTL(key)
if err != nil {
if err != buntdb.ErrNotFound {
return err
2016-09-11 17:49:48 +03:00
}
}
2018-04-18 01:40:11 +03:00
ttls = append(ttls, ttl)
2016-09-11 17:49:48 +03:00
_, err = tx.Delete(key)
if err != nil {
if err != buntdb.ErrNotFound {
return err
}
}
}
return nil
})
if err != nil {
log.Error(err)
return false
}
// send each val. on failure reinsert that one and all of the following
for i, key := range keys {
val := vals[i]
idx := stringToUint64(key[len(hookLogPrefix):])
var sent bool
for _, endpoint := range h.Endpoints {
err := h.epm.Send(endpoint, val)
if err != nil {
2018-08-14 03:05:30 +03:00
log.Debugf("Endpoint connect/send error: %v: %v: %v",
idx, endpoint, err)
2016-09-11 17:49:48 +03:00
continue
}
2017-08-03 13:05:24 +03:00
log.Debugf("Endpoint send ok: %v: %v: %v", idx, endpoint, err)
2016-09-11 17:49:48 +03:00
sent = true
h.counter.Add(1)
2016-09-11 17:49:48 +03:00
break
}
if !sent {
2018-08-14 03:05:30 +03:00
// failed to send. try to reinsert the remaining.
// if this fails we lose log entries.
2016-09-11 17:49:48 +03:00
keys = keys[i:]
vals = vals[i:]
2018-04-18 01:40:11 +03:00
ttls = ttls[i:]
2016-09-11 17:49:48 +03:00
h.db.Update(func(tx *buntdb.Tx) error {
for i, key := range keys {
val := vals[i]
2018-04-18 01:40:11 +03:00
ttl := ttls[i] - time.Since(start)
if ttl > 0 {
opts := &buntdb.SetOptions{
2016-09-11 17:49:48 +03:00
Expires: true,
2018-04-18 01:40:11 +03:00
TTL: ttl,
}
_, _, err := tx.Set(key, val, opts)
if err != nil {
return err
2016-09-11 17:49:48 +03:00
}
}
}
return nil
})
return false
}
}
return true
}