mirror of https://github.com/tidwall/tile38.git
refactor hooks and endpoints
This commit is contained in:
parent
5d48c99612
commit
3862f70cac
|
@ -11,6 +11,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/buntdb"
|
||||
"github.com/tidwall/resp"
|
||||
"github.com/tidwall/tile38/controller/log"
|
||||
"github.com/tidwall/tile38/controller/server"
|
||||
|
@ -94,7 +95,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
|
|||
}
|
||||
if c.config.FollowHost == "" {
|
||||
// process hooks, for leader only
|
||||
if err := c.processHooks(d); err != nil {
|
||||
if err := c.queueHooks(d); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -124,15 +125,66 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) processHooks(d *commandDetailsT) error {
|
||||
func (c *Controller) queueHooks(d *commandDetailsT) error {
|
||||
// big list of all of the messages
|
||||
var hmsgs []string
|
||||
var hooks []*Hook
|
||||
// find the hook by the key
|
||||
if hm, ok := c.hookcols[d.key]; ok {
|
||||
for _, hook := range hm {
|
||||
go hook.Do(d)
|
||||
// match the fence
|
||||
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, d)
|
||||
if len(msgs) > 0 {
|
||||
// append each msg to the big list
|
||||
hmsgs = append(hmsgs, msgs...)
|
||||
hooks = append(hooks, hook)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(hmsgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// queue the message in the buntdb database
|
||||
err := c.qdb.Update(func(tx *buntdb.Tx) error {
|
||||
for _, msg := range hmsgs {
|
||||
c.qidx++ // increment the log id
|
||||
key := hookLogPrefix + uint64ToString(c.qidx)
|
||||
_, _, err := tx.Set(key, msg, hookLogSetDefaults())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("queued hook: %d", c.qidx)
|
||||
}
|
||||
_, _, err := tx.Set("hook:idx", uint64ToString(c.qidx), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// all the messages have been queued.
|
||||
// notify the hooks
|
||||
for _, hook := range hooks {
|
||||
hook.Signal()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Converts string to an integer
|
||||
func stringToUint64(s string) uint64 {
|
||||
n, _ := strconv.ParseUint(s, 10, 64)
|
||||
return n
|
||||
}
|
||||
|
||||
// Converts a uint to a string
|
||||
func uint64ToString(u uint64) string {
|
||||
s := strings.Repeat("0", 20) + strconv.FormatUint(u, 10)
|
||||
return s[len(s)-20:]
|
||||
}
|
||||
|
||||
type liveAOFSwitches struct {
|
||||
pos int64
|
||||
}
|
||||
|
|
|
@ -237,7 +237,7 @@ func (c *Controller) aofshrink() {
|
|||
values := make([]resp.Value, 0, 3+len(hook.Message.Values))
|
||||
endpoints := make([]string, len(hook.Endpoints))
|
||||
for i, endpoint := range hook.Endpoints {
|
||||
endpoints[i] = endpoint.Original
|
||||
endpoints[i] = endpoint
|
||||
}
|
||||
values = append(values, resp.StringValue("sethook"), resp.StringValue(name), resp.StringValue(strings.Join(endpoints, ",")))
|
||||
values = append(values, hook.Message.Values...)
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/tidwall/btree"
|
||||
"github.com/tidwall/buntdb"
|
||||
"github.com/tidwall/resp"
|
||||
"github.com/tidwall/tile38/controller/collection"
|
||||
"github.com/tidwall/tile38/controller/log"
|
||||
|
@ -25,6 +26,8 @@ import (
|
|||
|
||||
var errOOM = errors.New("OOM command not allowed when used memory > 'maxmemory'")
|
||||
|
||||
const hookLogPrefix = "hook:log:"
|
||||
|
||||
type collectionT struct {
|
||||
Key string
|
||||
Collection *collection.Collection
|
||||
|
@ -54,6 +57,8 @@ type Controller struct {
|
|||
host string
|
||||
port int
|
||||
f *os.File
|
||||
qdb *buntdb.DB // hook queue log
|
||||
qidx uint64 // hook queue log last idx
|
||||
cols *btree.BTree
|
||||
aofsz int
|
||||
dir string
|
||||
|
@ -73,6 +78,8 @@ type Controller struct {
|
|||
conns map[*server.Conn]bool
|
||||
started time.Time
|
||||
|
||||
epc *EndpointManager
|
||||
|
||||
statsTotalConns int
|
||||
statsTotalCommands int
|
||||
statsExpired int
|
||||
|
@ -106,6 +113,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener) error
|
|||
expires: make(map[string]map[string]time.Time),
|
||||
started: time.Now(),
|
||||
conns: make(map[*server.Conn]bool),
|
||||
epc: NewEndpointCollection(),
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0700); err != nil {
|
||||
return err
|
||||
|
@ -113,6 +121,31 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener) error
|
|||
if err := c.loadConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
// load the queue before the aof
|
||||
qdb, err := buntdb.Open(path.Join(dir, "queue.db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var qidx uint64
|
||||
if err := qdb.View(func(tx *buntdb.Tx) error {
|
||||
val, err := tx.Get("hook:idx")
|
||||
if err != nil {
|
||||
if err == buntdb.ErrNotFound {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
qidx = stringToUint64(val)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
err = qdb.CreateIndex("hooks", hookLogPrefix+"*", buntdb.IndexJSONCaseSensitive("hook"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.qdb = qdb
|
||||
c.qidx = qidx
|
||||
if err := c.migrateAOF(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// EndpointProtocol is the type of protocol that the endpoint represents.
|
||||
type EndpointProtocol string
|
||||
|
||||
const (
|
||||
HTTP = EndpointProtocol("http") // HTTP
|
||||
Disque = EndpointProtocol("disque") // Disque
|
||||
)
|
||||
|
||||
type EndpointManager struct {
|
||||
mu sync.RWMutex // this is intentionally exposed
|
||||
Endpoints map[string]*Endpoint
|
||||
}
|
||||
|
||||
func NewEndpointCollection() *EndpointManager {
|
||||
return &EndpointManager{
|
||||
Endpoints: make(map[string]*Endpoint),
|
||||
}
|
||||
}
|
||||
|
||||
// Get finds an endpoint based on its url. If the enpoint does not
|
||||
// exist a new only is created.
|
||||
func (epc *EndpointManager) Validate(url string) error {
|
||||
_, err := parseEndpoint(url)
|
||||
return err
|
||||
/*
|
||||
pendpoint := epc.Endpoints[url]
|
||||
if pendpoint == nil {
|
||||
pendpoint = endpoint
|
||||
epc.Endpoints[url] = pendpoint
|
||||
}
|
||||
return pendpoint, nil
|
||||
*/
|
||||
}
|
||||
|
||||
// We use a retain/relase on endoints.
|
||||
// The calls are directed to the collection instead of
|
||||
// endpoint itself to avoid having a circular reference to
|
||||
// the collection.
|
||||
func (epc *EndpointManager) Open(endpoint string) {
|
||||
epc.mu.Lock()
|
||||
defer epc.mu.Unlock()
|
||||
/*
|
||||
ep.referenceCount++
|
||||
if ep.referenceCount == 1 {
|
||||
ep.Open()
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
func (epc *EndpointManager) Close(endpoint string) {
|
||||
epc.mu.Lock()
|
||||
defer epc.mu.Unlock()
|
||||
/*
|
||||
ep.referenceCount--
|
||||
if ep.referenceCount < 0 {
|
||||
panic("reference count below zero")
|
||||
}
|
||||
if ep.referenceCount == 0 {
|
||||
ep.Close()
|
||||
delete(epc.Endpoints, ep.Original)
|
||||
}
|
||||
*/
|
||||
}
|
||||
func (epc *EndpointManager) Send(endpoint, val string) error {
|
||||
epc.mu.Lock()
|
||||
defer epc.mu.Unlock()
|
||||
return errors.New("unavailable")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Endpoint represents an endpoint.
|
||||
type Endpoint struct {
|
||||
Protocol EndpointProtocol
|
||||
Original string
|
||||
Disque struct {
|
||||
Host string
|
||||
Port int
|
||||
QueueName string
|
||||
Options struct {
|
||||
Replicate int
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func (ep *Endpoint) Open() {
|
||||
ep.mu.Lock()
|
||||
defer ep.mu.Unlock()
|
||||
println("open " + ep.Original)
|
||||
// Even though open is called we should wait until the a messages
|
||||
// is sent before establishing a network connection.
|
||||
}
|
||||
|
||||
func (ep *Endpoint) Close() {
|
||||
ep.mu.Lock()
|
||||
defer ep.mu.Unlock()
|
||||
println("close " + ep.Original)
|
||||
// Make sure to forece close the network connection here.
|
||||
}
|
||||
|
||||
func (ep *Endpoint) Send() error {
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
func parseEndpoint(s string) (Endpoint, error) {
|
||||
var endpoint Endpoint
|
||||
endpoint.Original = s
|
||||
switch {
|
||||
default:
|
||||
return endpoint, errors.New("unknown scheme")
|
||||
case strings.HasPrefix(s, "http:"):
|
||||
endpoint.Protocol = HTTP
|
||||
case strings.HasPrefix(s, "https:"):
|
||||
endpoint.Protocol = HTTP
|
||||
case strings.HasPrefix(s, "disque:"):
|
||||
endpoint.Protocol = Disque
|
||||
}
|
||||
s = s[strings.Index(s, ":")+1:]
|
||||
if !strings.HasPrefix(s, "//") {
|
||||
return endpoint, errors.New("missing the two slashes")
|
||||
}
|
||||
sqp := strings.Split(s[2:], "?")
|
||||
sp := strings.Split(sqp[0], "/")
|
||||
s = sp[0]
|
||||
if s == "" {
|
||||
return endpoint, errors.New("missing host")
|
||||
}
|
||||
if endpoint.Protocol == Disque {
|
||||
dp := strings.Split(s, ":")
|
||||
switch len(dp) {
|
||||
default:
|
||||
return endpoint, errors.New("invalid disque url")
|
||||
case 1:
|
||||
endpoint.Disque.Host = dp[0]
|
||||
endpoint.Disque.Port = 7711
|
||||
case 2:
|
||||
endpoint.Disque.Host = dp[0]
|
||||
n, err := strconv.ParseUint(dp[1], 10, 16)
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque url")
|
||||
}
|
||||
endpoint.Disque.Port = int(n)
|
||||
}
|
||||
if len(sp) > 1 {
|
||||
var err error
|
||||
endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1])
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque queue name")
|
||||
}
|
||||
}
|
||||
if len(sqp) > 1 {
|
||||
m, err := url.ParseQuery(sqp[1])
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque url")
|
||||
}
|
||||
for key, val := range m {
|
||||
if len(val) == 0 {
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "replicate":
|
||||
n, err := strconv.ParseUint(val[0], 10, 8)
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque replicate value")
|
||||
}
|
||||
endpoint.Disque.Options.Replicate = int(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
if endpoint.Disque.QueueName == "" {
|
||||
return endpoint, errors.New("missing disque queue name")
|
||||
}
|
||||
|
||||
}
|
||||
return endpoint, nil
|
||||
}
|
|
@ -2,86 +2,30 @@ package controller
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tidwall/buntdb"
|
||||
"github.com/tidwall/resp"
|
||||
"github.com/tidwall/tile38/controller/glob"
|
||||
"github.com/tidwall/tile38/controller/log"
|
||||
"github.com/tidwall/tile38/controller/server"
|
||||
)
|
||||
|
||||
// EndpointProtocol is the type of protocol that the endpoint represents.
|
||||
type EndpointProtocol string
|
||||
const hookLogTTL = time.Second * 30
|
||||
|
||||
const (
|
||||
HTTP = EndpointProtocol("http") // HTTP
|
||||
Disque = EndpointProtocol("disque") // Disque
|
||||
)
|
||||
|
||||
// Endpoint represents an endpoint.
|
||||
type Endpoint struct {
|
||||
Protocol EndpointProtocol
|
||||
Original string
|
||||
Disque struct {
|
||||
Host string
|
||||
Port int
|
||||
QueueName string
|
||||
Options struct {
|
||||
Replicate int
|
||||
func hookLogSetDefaults() *buntdb.SetOptions {
|
||||
if hookLogTTL > 0 {
|
||||
return &buntdb.SetOptions{
|
||||
Expires: true, // automatically delete after 30 seconds
|
||||
TTL: hookLogTTL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Hook represents a hook.
|
||||
type Hook struct {
|
||||
Key string
|
||||
Name string
|
||||
Endpoints []Endpoint
|
||||
Message *server.Message
|
||||
Fence *liveFenceSwitches
|
||||
ScanWriter *scanWriter
|
||||
}
|
||||
|
||||
// Do performs a hook.
|
||||
func (hook *Hook) Do(details *commandDetailsT) error {
|
||||
var lerrs []error
|
||||
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details)
|
||||
nextMessage:
|
||||
for _, msg := range msgs {
|
||||
nextEndpoint:
|
||||
for _, endpoint := range hook.Endpoints {
|
||||
switch endpoint.Protocol {
|
||||
case HTTP:
|
||||
if err := sendHTTPMessage(endpoint, []byte(msg)); err != nil {
|
||||
lerrs = append(lerrs, err)
|
||||
continue nextEndpoint
|
||||
}
|
||||
continue nextMessage // sent
|
||||
case Disque:
|
||||
if err := sendDisqueMessage(endpoint, []byte(msg)); err != nil {
|
||||
lerrs = append(lerrs, err)
|
||||
continue nextEndpoint
|
||||
}
|
||||
continue nextMessage // sent
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(lerrs) == 0 {
|
||||
// log.Notice("YAY")
|
||||
return nil
|
||||
}
|
||||
var errmsgs []string
|
||||
for _, err := range lerrs {
|
||||
errmsgs = append(errmsgs, err.Error())
|
||||
}
|
||||
err := errors.New("not sent: " + strings.Join(errmsgs, ","))
|
||||
log.Error(err)
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
type hooksByName []*Hook
|
||||
|
@ -98,100 +42,27 @@ func (a hooksByName) Swap(i, j int) {
|
|||
a[i], a[j] = a[j], a[i]
|
||||
}
|
||||
|
||||
func parseEndpoint(s string) (Endpoint, error) {
|
||||
var endpoint Endpoint
|
||||
endpoint.Original = s
|
||||
switch {
|
||||
default:
|
||||
return endpoint, errors.New("unknown scheme")
|
||||
case strings.HasPrefix(s, "http:"):
|
||||
endpoint.Protocol = HTTP
|
||||
case strings.HasPrefix(s, "https:"):
|
||||
endpoint.Protocol = HTTP
|
||||
case strings.HasPrefix(s, "disque:"):
|
||||
endpoint.Protocol = Disque
|
||||
}
|
||||
s = s[strings.Index(s, ":")+1:]
|
||||
if !strings.HasPrefix(s, "//") {
|
||||
return endpoint, errors.New("missing the two slashes")
|
||||
}
|
||||
sqp := strings.Split(s[2:], "?")
|
||||
sp := strings.Split(sqp[0], "/")
|
||||
s = sp[0]
|
||||
if s == "" {
|
||||
return endpoint, errors.New("missing host")
|
||||
}
|
||||
if endpoint.Protocol == Disque {
|
||||
|
||||
dp := strings.Split(s, ":")
|
||||
switch len(dp) {
|
||||
default:
|
||||
return endpoint, errors.New("invalid disque url")
|
||||
case 1:
|
||||
endpoint.Disque.Host = dp[0]
|
||||
endpoint.Disque.Port = 7711
|
||||
case 2:
|
||||
endpoint.Disque.Host = dp[0]
|
||||
n, err := strconv.ParseUint(dp[1], 10, 16)
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque url")
|
||||
}
|
||||
endpoint.Disque.Port = int(n)
|
||||
}
|
||||
if len(sp) > 1 {
|
||||
var err error
|
||||
endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1])
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque queue name")
|
||||
}
|
||||
}
|
||||
if len(sqp) > 1 {
|
||||
m, err := url.ParseQuery(sqp[1])
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque url")
|
||||
}
|
||||
for key, val := range m {
|
||||
if len(val) == 0 {
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "replicate":
|
||||
n, err := strconv.ParseUint(val[0], 10, 8)
|
||||
if err != nil {
|
||||
return endpoint, errors.New("invalid disque replicate value")
|
||||
}
|
||||
endpoint.Disque.Options.Replicate = int(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
if endpoint.Disque.QueueName == "" {
|
||||
return endpoint, errors.New("missing disque queue name")
|
||||
}
|
||||
|
||||
}
|
||||
return endpoint, nil
|
||||
}
|
||||
|
||||
func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetailsT, err error) {
|
||||
start := time.Now()
|
||||
|
||||
vs := msg.Values[1:]
|
||||
var name, values, cmd string
|
||||
var name, urls, cmd string
|
||||
var ok bool
|
||||
if vs, name, ok = tokenval(vs); !ok || name == "" {
|
||||
return "", d, errInvalidNumberOfArguments
|
||||
}
|
||||
if vs, values, ok = tokenval(vs); !ok || values == "" {
|
||||
if vs, urls, ok = tokenval(vs); !ok || urls == "" {
|
||||
return "", d, errInvalidNumberOfArguments
|
||||
}
|
||||
var endpoints []Endpoint
|
||||
for _, value := range strings.Split(values, ",") {
|
||||
endpoint, err := parseEndpoint(value)
|
||||
var endpoints []string
|
||||
for _, url := range strings.Split(urls, ",") {
|
||||
url = strings.TrimSpace(url)
|
||||
err := c.epc.Validate(url)
|
||||
if err != nil {
|
||||
log.Errorf("sethook: %v", err)
|
||||
return "", d, errInvalidArgument(value)
|
||||
return "", d, errInvalidArgument(url)
|
||||
}
|
||||
endpoints = append(endpoints, endpoint)
|
||||
endpoints = append(endpoints, url)
|
||||
}
|
||||
|
||||
commandvs := vs
|
||||
|
@ -229,7 +100,11 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai
|
|||
Endpoints: endpoints,
|
||||
Fence: &s,
|
||||
Message: cmsg,
|
||||
db: c.qdb,
|
||||
epm: c.epc,
|
||||
}
|
||||
hook.cond = sync.NewCond(&hook.mu)
|
||||
|
||||
var wr bytes.Buffer
|
||||
hook.ScanWriter, err = c.newScanWriter(&wr, cmsg, s.key, s.output, s.precision, s.glob, false, s.limit, s.wheres, s.nofields)
|
||||
if err != nil {
|
||||
|
@ -242,12 +117,15 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai
|
|||
if len(h.Endpoints) == len(hook.Endpoints) {
|
||||
match := true
|
||||
for i, endpoint := range h.Endpoints {
|
||||
if endpoint.Original != hook.Endpoints[i].Original {
|
||||
if endpoint != hook.Endpoints[i] {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match && resp.ArrayValue(h.Message.Values).Equals(resp.ArrayValue(hook.Message.Values)) {
|
||||
// it was a match so we do nothing. But let's signal just
|
||||
// for good measure.
|
||||
h.Signal()
|
||||
switch msg.OutputType {
|
||||
case server.JSON:
|
||||
return server.OKMessage(msg, start), d, nil
|
||||
|
@ -257,7 +135,7 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
h.Close()
|
||||
// delete the previous hook
|
||||
if hm, ok := c.hookcols[h.Key]; ok {
|
||||
delete(hm, h.Name)
|
||||
|
@ -273,6 +151,7 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai
|
|||
c.hookcols[hook.Key] = hm
|
||||
}
|
||||
hm[name] = hook
|
||||
hook.Open()
|
||||
switch msg.OutputType {
|
||||
case server.JSON:
|
||||
return server.OKMessage(msg, start), d, nil
|
||||
|
@ -295,6 +174,7 @@ func (c *Controller) cmdDelHook(msg *server.Message) (res string, d commandDetai
|
|||
return "", d, errInvalidNumberOfArguments
|
||||
}
|
||||
if h, ok := c.hooks[name]; ok {
|
||||
h.Close()
|
||||
if hm, ok := c.hookcols[h.Key]; ok {
|
||||
delete(hm, h.Name)
|
||||
}
|
||||
|
@ -353,7 +233,7 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) {
|
|||
if i > 0 {
|
||||
buf.WriteByte(',')
|
||||
}
|
||||
buf.WriteString(jsonString(endpoint.Original))
|
||||
buf.WriteString(jsonString(endpoint))
|
||||
}
|
||||
buf.WriteString(`],"command":[`)
|
||||
for i, v := range hook.Message.Values {
|
||||
|
@ -375,7 +255,7 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) {
|
|||
hvals = append(hvals, resp.StringValue(hook.Key))
|
||||
var evals []resp.Value
|
||||
for _, endpoint := range hook.Endpoints {
|
||||
evals = append(evals, resp.StringValue(endpoint.Original))
|
||||
evals = append(evals, resp.StringValue(endpoint))
|
||||
}
|
||||
hvals = append(hvals, resp.ArrayValue(evals))
|
||||
hvals = append(hvals, resp.ArrayValue(hook.Message.Values))
|
||||
|
@ -389,3 +269,199 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) {
|
|||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Hook represents a hook.
|
||||
type Hook struct {
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
Key string
|
||||
Name string
|
||||
Endpoints []string
|
||||
Message *server.Message
|
||||
Fence *liveFenceSwitches
|
||||
ScanWriter *scanWriter
|
||||
db *buntdb.DB
|
||||
closed bool
|
||||
opened bool
|
||||
query string
|
||||
epm *EndpointManager
|
||||
}
|
||||
|
||||
// Open is called when a hook is first created. It calls the manager
|
||||
// function in a goroutine
|
||||
func (h *Hook) Open() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.opened {
|
||||
return
|
||||
}
|
||||
h.opened = true
|
||||
b, _ := json.Marshal(h.Name)
|
||||
h.query = `{"hook":` + string(b) + `}`
|
||||
go h.manager()
|
||||
}
|
||||
|
||||
// Close closed the hook and stop the manager function
|
||||
func (h *Hook) Close() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.closed {
|
||||
return
|
||||
}
|
||||
h.closed = true
|
||||
h.cond.Broadcast()
|
||||
}
|
||||
|
||||
// Signal can be called at any point to wake up the hook and
|
||||
// notify the manager that there may be something new in the queue.
|
||||
func (h *Hook) Signal() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.cond.Broadcast()
|
||||
}
|
||||
|
||||
// the manager is a forever loop that calls proc whenever there's a signal.
|
||||
// it ends when the "closed" flag is set.
|
||||
func (h *Hook) manager() {
|
||||
for {
|
||||
h.mu.Lock()
|
||||
for {
|
||||
if h.closed {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if h.proc() {
|
||||
break
|
||||
}
|
||||
h.mu.Unlock()
|
||||
time.Sleep(time.Second / 4)
|
||||
h.mu.Lock()
|
||||
}
|
||||
h.cond.Wait()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// proc processes queued hook logs. returning true will indicate that
|
||||
// there's more to do and proc() should be called again asap.
|
||||
func (h *Hook) proc() (ok bool) {
|
||||
var keys, vals []string
|
||||
var ttls []time.Duration
|
||||
err := h.db.Update(func(tx *buntdb.Tx) error {
|
||||
// get keys and vals
|
||||
err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool {
|
||||
if strings.HasPrefix(key, hookLogPrefix) {
|
||||
keys = append(keys, key)
|
||||
vals = append(vals, val)
|
||||
}
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// delete the keys
|
||||
for _, key := range keys {
|
||||
if hookLogTTL > 0 {
|
||||
ttl, err := tx.TTL(key)
|
||||
if err != nil {
|
||||
if err != buntdb.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
ttls = append(ttls, ttl)
|
||||
}
|
||||
_, err = tx.Delete(key)
|
||||
if err != nil {
|
||||
if err != buntdb.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return false
|
||||
}
|
||||
|
||||
// send each val. on failure reinsert that one and all of the following
|
||||
for i, key := range keys {
|
||||
val := vals[i]
|
||||
idx := stringToUint64(key[len(hookLogPrefix):])
|
||||
var sent bool
|
||||
for _, endpoint := range h.Endpoints {
|
||||
err := h.epm.Send(endpoint, val)
|
||||
if err != nil {
|
||||
log.Debugf("could not send log: %v: %v: %v", idx, endpoint, err)
|
||||
continue
|
||||
}
|
||||
sent = true
|
||||
break
|
||||
}
|
||||
if !sent {
|
||||
// failed to send. try to reinsert the remaining. if this fails we lose log entries.
|
||||
keys = keys[i:]
|
||||
vals = vals[i:]
|
||||
if hookLogTTL > 0 {
|
||||
ttls = ttls[i:]
|
||||
}
|
||||
h.db.Update(func(tx *buntdb.Tx) error {
|
||||
for i, key := range keys {
|
||||
val := vals[i]
|
||||
var opts *buntdb.SetOptions
|
||||
if hookLogTTL > 0 {
|
||||
opts = &buntdb.SetOptions{
|
||||
Expires: true,
|
||||
TTL: ttls[i],
|
||||
}
|
||||
}
|
||||
_, _, err := tx.Set(key, val, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
/*
|
||||
// Do performs a hook.
|
||||
func (hook *Hook) Do(details *commandDetailsT) error {
|
||||
var lerrs []error
|
||||
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details)
|
||||
nextMessage:
|
||||
for _, msg := range msgs {
|
||||
nextEndpoint:
|
||||
for _, endpoint := range hook.Endpoints {
|
||||
switch endpoint.Protocol {
|
||||
case HTTP:
|
||||
if err := sendHTTPMessage(endpoint, []byte(msg)); err != nil {
|
||||
lerrs = append(lerrs, err)
|
||||
continue nextEndpoint
|
||||
}
|
||||
continue nextMessage // sent
|
||||
case Disque:
|
||||
if err := sendDisqueMessage(endpoint, []byte(msg)); err != nil {
|
||||
lerrs = append(lerrs, err)
|
||||
continue nextEndpoint
|
||||
}
|
||||
continue nextMessage // sent
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(lerrs) == 0 {
|
||||
// log.Notice("YAY")
|
||||
return nil
|
||||
}
|
||||
var errmsgs []string
|
||||
for _, err := range lerrs {
|
||||
errmsgs = append(errmsgs, err.Error())
|
||||
}
|
||||
err := errors.New("not sent: " + strings.Join(errmsgs, ","))
|
||||
log.Error(err)
|
||||
return err
|
||||
}*/
|
||||
|
|
|
@ -1,13 +1,21 @@
|
|||
package controller
|
||||
|
||||
import "encoding/json"
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/tidwall/cast"
|
||||
)
|
||||
|
||||
func jsonString(s string) string {
|
||||
for i := 0; i < len(s); i++ {
|
||||
if s[i] < 32 || s[i] > 126 {
|
||||
if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 {
|
||||
d, _ := json.Marshal(s)
|
||||
return string(d)
|
||||
}
|
||||
}
|
||||
return `"` + s + `"`
|
||||
b := make([]byte, len(s)+2)
|
||||
b[0] = '"'
|
||||
copy(b[1:], cast.ToBytes(s))
|
||||
b[len(b)-1] = '"'
|
||||
return cast.ToString(b)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue