From 3862f70cac74d961113d73672ace37eaf45a31a3 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sun, 11 Sep 2016 07:49:48 -0700 Subject: [PATCH] refactor hooks and endpoints --- controller/aof.go | 58 +++++- controller/aofshrink.go | 2 +- controller/controller.go | 33 ++++ controller/endpoint.go | 186 +++++++++++++++++++ controller/hooks.go | 378 +++++++++++++++++++++++---------------- controller/json.go | 14 +- 6 files changed, 513 insertions(+), 158 deletions(-) create mode 100644 controller/endpoint.go diff --git a/controller/aof.go b/controller/aof.go index 5f7361ba..c588419f 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/tidwall/buntdb" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" @@ -94,7 +95,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { } if c.config.FollowHost == "" { // process hooks, for leader only - if err := c.processHooks(d); err != nil { + if err := c.queueHooks(d); err != nil { return err } } @@ -124,15 +125,66 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { return nil } -func (c *Controller) processHooks(d *commandDetailsT) error { +func (c *Controller) queueHooks(d *commandDetailsT) error { + // big list of all of the messages + var hmsgs []string + var hooks []*Hook + // find the hook by the key if hm, ok := c.hookcols[d.key]; ok { for _, hook := range hm { - go hook.Do(d) + // match the fence + msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, d) + if len(msgs) > 0 { + // append each msg to the big list + hmsgs = append(hmsgs, msgs...) + hooks = append(hooks, hook) + } } } + if len(hmsgs) == 0 { + return nil + } + + // queue the message in the buntdb database + err := c.qdb.Update(func(tx *buntdb.Tx) error { + for _, msg := range hmsgs { + c.qidx++ // increment the log id + key := hookLogPrefix + uint64ToString(c.qidx) + _, _, err := tx.Set(key, msg, hookLogSetDefaults()) + if err != nil { + return err + } + log.Debugf("queued hook: %d", c.qidx) + } + _, _, err := tx.Set("hook:idx", uint64ToString(c.qidx), nil) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + // all the messages have been queued. + // notify the hooks + for _, hook := range hooks { + hook.Signal() + } return nil } +// Converts string to an integer +func stringToUint64(s string) uint64 { + n, _ := strconv.ParseUint(s, 10, 64) + return n +} + +// Converts a uint to a string +func uint64ToString(u uint64) string { + s := strings.Repeat("0", 20) + strconv.FormatUint(u, 10) + return s[len(s)-20:] +} + type liveAOFSwitches struct { pos int64 } diff --git a/controller/aofshrink.go b/controller/aofshrink.go index 0fa38100..45a13963 100644 --- a/controller/aofshrink.go +++ b/controller/aofshrink.go @@ -237,7 +237,7 @@ func (c *Controller) aofshrink() { values := make([]resp.Value, 0, 3+len(hook.Message.Values)) endpoints := make([]string, len(hook.Endpoints)) for i, endpoint := range hook.Endpoints { - endpoints[i] = endpoint.Original + endpoints[i] = endpoint } values = append(values, resp.StringValue("sethook"), resp.StringValue(name), resp.StringValue(strings.Join(endpoints, ","))) values = append(values, hook.Message.Values...) diff --git a/controller/controller.go b/controller/controller.go index dbf4c8b8..92ebb9c1 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -15,6 +15,7 @@ import ( "time" "github.com/tidwall/btree" + "github.com/tidwall/buntdb" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/collection" "github.com/tidwall/tile38/controller/log" @@ -25,6 +26,8 @@ import ( var errOOM = errors.New("OOM command not allowed when used memory > 'maxmemory'") +const hookLogPrefix = "hook:log:" + type collectionT struct { Key string Collection *collection.Collection @@ -54,6 +57,8 @@ type Controller struct { host string port int f *os.File + qdb *buntdb.DB // hook queue log + qidx uint64 // hook queue log last idx cols *btree.BTree aofsz int dir string @@ -73,6 +78,8 @@ type Controller struct { conns map[*server.Conn]bool started time.Time + epc *EndpointManager + statsTotalConns int statsTotalCommands int statsExpired int @@ -106,6 +113,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener) error expires: make(map[string]map[string]time.Time), started: time.Now(), conns: make(map[*server.Conn]bool), + epc: NewEndpointCollection(), } if err := os.MkdirAll(dir, 0700); err != nil { return err @@ -113,6 +121,31 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener) error if err := c.loadConfig(); err != nil { return err } + // load the queue before the aof + qdb, err := buntdb.Open(path.Join(dir, "queue.db")) + if err != nil { + return err + } + var qidx uint64 + if err := qdb.View(func(tx *buntdb.Tx) error { + val, err := tx.Get("hook:idx") + if err != nil { + if err == buntdb.ErrNotFound { + return nil + } + return err + } + qidx = stringToUint64(val) + return nil + }); err != nil { + return err + } + err = qdb.CreateIndex("hooks", hookLogPrefix+"*", buntdb.IndexJSONCaseSensitive("hook")) + if err != nil { + return err + } + c.qdb = qdb + c.qidx = qidx if err := c.migrateAOF(); err != nil { return err } diff --git a/controller/endpoint.go b/controller/endpoint.go new file mode 100644 index 00000000..c891c82f --- /dev/null +++ b/controller/endpoint.go @@ -0,0 +1,186 @@ +package controller + +import ( + "errors" + "net/url" + "strconv" + "strings" + "sync" +) + +// EndpointProtocol is the type of protocol that the endpoint represents. +type EndpointProtocol string + +const ( + HTTP = EndpointProtocol("http") // HTTP + Disque = EndpointProtocol("disque") // Disque +) + +type EndpointManager struct { + mu sync.RWMutex // this is intentionally exposed + Endpoints map[string]*Endpoint +} + +func NewEndpointCollection() *EndpointManager { + return &EndpointManager{ + Endpoints: make(map[string]*Endpoint), + } +} + +// Get finds an endpoint based on its url. If the enpoint does not +// exist a new only is created. +func (epc *EndpointManager) Validate(url string) error { + _, err := parseEndpoint(url) + return err + /* + pendpoint := epc.Endpoints[url] + if pendpoint == nil { + pendpoint = endpoint + epc.Endpoints[url] = pendpoint + } + return pendpoint, nil + */ +} + +// We use a retain/relase on endoints. +// The calls are directed to the collection instead of +// endpoint itself to avoid having a circular reference to +// the collection. +func (epc *EndpointManager) Open(endpoint string) { + epc.mu.Lock() + defer epc.mu.Unlock() + /* + ep.referenceCount++ + if ep.referenceCount == 1 { + ep.Open() + } + */ +} + +func (epc *EndpointManager) Close(endpoint string) { + epc.mu.Lock() + defer epc.mu.Unlock() + /* + ep.referenceCount-- + if ep.referenceCount < 0 { + panic("reference count below zero") + } + if ep.referenceCount == 0 { + ep.Close() + delete(epc.Endpoints, ep.Original) + } + */ +} +func (epc *EndpointManager) Send(endpoint, val string) error { + epc.mu.Lock() + defer epc.mu.Unlock() + return errors.New("unavailable") + return nil +} + +// Endpoint represents an endpoint. +type Endpoint struct { + Protocol EndpointProtocol + Original string + Disque struct { + Host string + Port int + QueueName string + Options struct { + Replicate int + } + } +} + +/* +func (ep *Endpoint) Open() { + ep.mu.Lock() + defer ep.mu.Unlock() + println("open " + ep.Original) + // Even though open is called we should wait until the a messages + // is sent before establishing a network connection. +} + +func (ep *Endpoint) Close() { + ep.mu.Lock() + defer ep.mu.Unlock() + println("close " + ep.Original) + // Make sure to forece close the network connection here. +} + +func (ep *Endpoint) Send() error { + return nil +} +*/ +func parseEndpoint(s string) (Endpoint, error) { + var endpoint Endpoint + endpoint.Original = s + switch { + default: + return endpoint, errors.New("unknown scheme") + case strings.HasPrefix(s, "http:"): + endpoint.Protocol = HTTP + case strings.HasPrefix(s, "https:"): + endpoint.Protocol = HTTP + case strings.HasPrefix(s, "disque:"): + endpoint.Protocol = Disque + } + s = s[strings.Index(s, ":")+1:] + if !strings.HasPrefix(s, "//") { + return endpoint, errors.New("missing the two slashes") + } + sqp := strings.Split(s[2:], "?") + sp := strings.Split(sqp[0], "/") + s = sp[0] + if s == "" { + return endpoint, errors.New("missing host") + } + if endpoint.Protocol == Disque { + dp := strings.Split(s, ":") + switch len(dp) { + default: + return endpoint, errors.New("invalid disque url") + case 1: + endpoint.Disque.Host = dp[0] + endpoint.Disque.Port = 7711 + case 2: + endpoint.Disque.Host = dp[0] + n, err := strconv.ParseUint(dp[1], 10, 16) + if err != nil { + return endpoint, errors.New("invalid disque url") + } + endpoint.Disque.Port = int(n) + } + if len(sp) > 1 { + var err error + endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1]) + if err != nil { + return endpoint, errors.New("invalid disque queue name") + } + } + if len(sqp) > 1 { + m, err := url.ParseQuery(sqp[1]) + if err != nil { + return endpoint, errors.New("invalid disque url") + } + for key, val := range m { + if len(val) == 0 { + continue + } + switch key { + case "replicate": + n, err := strconv.ParseUint(val[0], 10, 8) + if err != nil { + return endpoint, errors.New("invalid disque replicate value") + } + endpoint.Disque.Options.Replicate = int(n) + } + } + } + if endpoint.Disque.QueueName == "" { + return endpoint, errors.New("missing disque queue name") + } + + } + return endpoint, nil +} diff --git a/controller/hooks.go b/controller/hooks.go index a9dd2492..1a33e22a 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -2,86 +2,30 @@ package controller import ( "bytes" + "encoding/json" "errors" - "net/url" "sort" - "strconv" "strings" + "sync" "time" + "github.com/tidwall/buntdb" "github.com/tidwall/resp" "github.com/tidwall/tile38/controller/glob" "github.com/tidwall/tile38/controller/log" "github.com/tidwall/tile38/controller/server" ) -// EndpointProtocol is the type of protocol that the endpoint represents. -type EndpointProtocol string +const hookLogTTL = time.Second * 30 -const ( - HTTP = EndpointProtocol("http") // HTTP - Disque = EndpointProtocol("disque") // Disque -) - -// Endpoint represents an endpoint. -type Endpoint struct { - Protocol EndpointProtocol - Original string - Disque struct { - Host string - Port int - QueueName string - Options struct { - Replicate int +func hookLogSetDefaults() *buntdb.SetOptions { + if hookLogTTL > 0 { + return &buntdb.SetOptions{ + Expires: true, // automatically delete after 30 seconds + TTL: hookLogTTL, } } -} - -// Hook represents a hook. -type Hook struct { - Key string - Name string - Endpoints []Endpoint - Message *server.Message - Fence *liveFenceSwitches - ScanWriter *scanWriter -} - -// Do performs a hook. -func (hook *Hook) Do(details *commandDetailsT) error { - var lerrs []error - msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details) -nextMessage: - for _, msg := range msgs { - nextEndpoint: - for _, endpoint := range hook.Endpoints { - switch endpoint.Protocol { - case HTTP: - if err := sendHTTPMessage(endpoint, []byte(msg)); err != nil { - lerrs = append(lerrs, err) - continue nextEndpoint - } - continue nextMessage // sent - case Disque: - if err := sendDisqueMessage(endpoint, []byte(msg)); err != nil { - lerrs = append(lerrs, err) - continue nextEndpoint - } - continue nextMessage // sent - } - } - } - if len(lerrs) == 0 { - // log.Notice("YAY") - return nil - } - var errmsgs []string - for _, err := range lerrs { - errmsgs = append(errmsgs, err.Error()) - } - err := errors.New("not sent: " + strings.Join(errmsgs, ",")) - log.Error(err) - return err + return nil } type hooksByName []*Hook @@ -98,100 +42,27 @@ func (a hooksByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func parseEndpoint(s string) (Endpoint, error) { - var endpoint Endpoint - endpoint.Original = s - switch { - default: - return endpoint, errors.New("unknown scheme") - case strings.HasPrefix(s, "http:"): - endpoint.Protocol = HTTP - case strings.HasPrefix(s, "https:"): - endpoint.Protocol = HTTP - case strings.HasPrefix(s, "disque:"): - endpoint.Protocol = Disque - } - s = s[strings.Index(s, ":")+1:] - if !strings.HasPrefix(s, "//") { - return endpoint, errors.New("missing the two slashes") - } - sqp := strings.Split(s[2:], "?") - sp := strings.Split(sqp[0], "/") - s = sp[0] - if s == "" { - return endpoint, errors.New("missing host") - } - if endpoint.Protocol == Disque { - - dp := strings.Split(s, ":") - switch len(dp) { - default: - return endpoint, errors.New("invalid disque url") - case 1: - endpoint.Disque.Host = dp[0] - endpoint.Disque.Port = 7711 - case 2: - endpoint.Disque.Host = dp[0] - n, err := strconv.ParseUint(dp[1], 10, 16) - if err != nil { - return endpoint, errors.New("invalid disque url") - } - endpoint.Disque.Port = int(n) - } - if len(sp) > 1 { - var err error - endpoint.Disque.QueueName, err = url.QueryUnescape(sp[1]) - if err != nil { - return endpoint, errors.New("invalid disque queue name") - } - } - if len(sqp) > 1 { - m, err := url.ParseQuery(sqp[1]) - if err != nil { - return endpoint, errors.New("invalid disque url") - } - for key, val := range m { - if len(val) == 0 { - continue - } - switch key { - case "replicate": - n, err := strconv.ParseUint(val[0], 10, 8) - if err != nil { - return endpoint, errors.New("invalid disque replicate value") - } - endpoint.Disque.Options.Replicate = int(n) - } - } - } - if endpoint.Disque.QueueName == "" { - return endpoint, errors.New("missing disque queue name") - } - - } - return endpoint, nil -} - func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetailsT, err error) { start := time.Now() vs := msg.Values[1:] - var name, values, cmd string + var name, urls, cmd string var ok bool if vs, name, ok = tokenval(vs); !ok || name == "" { return "", d, errInvalidNumberOfArguments } - if vs, values, ok = tokenval(vs); !ok || values == "" { + if vs, urls, ok = tokenval(vs); !ok || urls == "" { return "", d, errInvalidNumberOfArguments } - var endpoints []Endpoint - for _, value := range strings.Split(values, ",") { - endpoint, err := parseEndpoint(value) + var endpoints []string + for _, url := range strings.Split(urls, ",") { + url = strings.TrimSpace(url) + err := c.epc.Validate(url) if err != nil { log.Errorf("sethook: %v", err) - return "", d, errInvalidArgument(value) + return "", d, errInvalidArgument(url) } - endpoints = append(endpoints, endpoint) + endpoints = append(endpoints, url) } commandvs := vs @@ -229,7 +100,11 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai Endpoints: endpoints, Fence: &s, Message: cmsg, + db: c.qdb, + epm: c.epc, } + hook.cond = sync.NewCond(&hook.mu) + var wr bytes.Buffer hook.ScanWriter, err = c.newScanWriter(&wr, cmsg, s.key, s.output, s.precision, s.glob, false, s.limit, s.wheres, s.nofields) if err != nil { @@ -242,12 +117,15 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai if len(h.Endpoints) == len(hook.Endpoints) { match := true for i, endpoint := range h.Endpoints { - if endpoint.Original != hook.Endpoints[i].Original { + if endpoint != hook.Endpoints[i] { match = false break } } if match && resp.ArrayValue(h.Message.Values).Equals(resp.ArrayValue(hook.Message.Values)) { + // it was a match so we do nothing. But let's signal just + // for good measure. + h.Signal() switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil @@ -257,7 +135,7 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai } } } - + h.Close() // delete the previous hook if hm, ok := c.hookcols[h.Key]; ok { delete(hm, h.Name) @@ -273,6 +151,7 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai c.hookcols[hook.Key] = hm } hm[name] = hook + hook.Open() switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil @@ -295,6 +174,7 @@ func (c *Controller) cmdDelHook(msg *server.Message) (res string, d commandDetai return "", d, errInvalidNumberOfArguments } if h, ok := c.hooks[name]; ok { + h.Close() if hm, ok := c.hookcols[h.Key]; ok { delete(hm, h.Name) } @@ -353,7 +233,7 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) { if i > 0 { buf.WriteByte(',') } - buf.WriteString(jsonString(endpoint.Original)) + buf.WriteString(jsonString(endpoint)) } buf.WriteString(`],"command":[`) for i, v := range hook.Message.Values { @@ -375,7 +255,7 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) { hvals = append(hvals, resp.StringValue(hook.Key)) var evals []resp.Value for _, endpoint := range hook.Endpoints { - evals = append(evals, resp.StringValue(endpoint.Original)) + evals = append(evals, resp.StringValue(endpoint)) } hvals = append(hvals, resp.ArrayValue(evals)) hvals = append(hvals, resp.ArrayValue(hook.Message.Values)) @@ -389,3 +269,199 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) { } return "", nil } + +// Hook represents a hook. +type Hook struct { + mu sync.Mutex + cond *sync.Cond + Key string + Name string + Endpoints []string + Message *server.Message + Fence *liveFenceSwitches + ScanWriter *scanWriter + db *buntdb.DB + closed bool + opened bool + query string + epm *EndpointManager +} + +// Open is called when a hook is first created. It calls the manager +// function in a goroutine +func (h *Hook) Open() { + h.mu.Lock() + defer h.mu.Unlock() + if h.opened { + return + } + h.opened = true + b, _ := json.Marshal(h.Name) + h.query = `{"hook":` + string(b) + `}` + go h.manager() +} + +// Close closed the hook and stop the manager function +func (h *Hook) Close() { + h.mu.Lock() + defer h.mu.Unlock() + if h.closed { + return + } + h.closed = true + h.cond.Broadcast() +} + +// Signal can be called at any point to wake up the hook and +// notify the manager that there may be something new in the queue. +func (h *Hook) Signal() { + h.mu.Lock() + defer h.mu.Unlock() + h.cond.Broadcast() +} + +// the manager is a forever loop that calls proc whenever there's a signal. +// it ends when the "closed" flag is set. +func (h *Hook) manager() { + for { + h.mu.Lock() + for { + if h.closed { + h.mu.Unlock() + return + } + if h.proc() { + break + } + h.mu.Unlock() + time.Sleep(time.Second / 4) + h.mu.Lock() + } + h.cond.Wait() + h.mu.Unlock() + } +} + +// proc processes queued hook logs. returning true will indicate that +// there's more to do and proc() should be called again asap. +func (h *Hook) proc() (ok bool) { + var keys, vals []string + var ttls []time.Duration + err := h.db.Update(func(tx *buntdb.Tx) error { + // get keys and vals + err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool { + if strings.HasPrefix(key, hookLogPrefix) { + keys = append(keys, key) + vals = append(vals, val) + } + return true + }) + if err != nil { + return err + } + // delete the keys + for _, key := range keys { + if hookLogTTL > 0 { + ttl, err := tx.TTL(key) + if err != nil { + if err != buntdb.ErrNotFound { + return err + } + } + ttls = append(ttls, ttl) + } + _, err = tx.Delete(key) + if err != nil { + if err != buntdb.ErrNotFound { + return err + } + } + } + return nil + }) + if err != nil { + log.Error(err) + return false + } + + // send each val. on failure reinsert that one and all of the following + for i, key := range keys { + val := vals[i] + idx := stringToUint64(key[len(hookLogPrefix):]) + var sent bool + for _, endpoint := range h.Endpoints { + err := h.epm.Send(endpoint, val) + if err != nil { + log.Debugf("could not send log: %v: %v: %v", idx, endpoint, err) + continue + } + sent = true + break + } + if !sent { + // failed to send. try to reinsert the remaining. if this fails we lose log entries. + keys = keys[i:] + vals = vals[i:] + if hookLogTTL > 0 { + ttls = ttls[i:] + } + h.db.Update(func(tx *buntdb.Tx) error { + for i, key := range keys { + val := vals[i] + var opts *buntdb.SetOptions + if hookLogTTL > 0 { + opts = &buntdb.SetOptions{ + Expires: true, + TTL: ttls[i], + } + } + _, _, err := tx.Set(key, val, opts) + if err != nil { + return err + } + } + return nil + }) + return false + } + } + return true +} + +/* +// Do performs a hook. +func (hook *Hook) Do(details *commandDetailsT) error { + var lerrs []error + msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details) +nextMessage: + for _, msg := range msgs { + nextEndpoint: + for _, endpoint := range hook.Endpoints { + switch endpoint.Protocol { + case HTTP: + if err := sendHTTPMessage(endpoint, []byte(msg)); err != nil { + lerrs = append(lerrs, err) + continue nextEndpoint + } + continue nextMessage // sent + case Disque: + if err := sendDisqueMessage(endpoint, []byte(msg)); err != nil { + lerrs = append(lerrs, err) + continue nextEndpoint + } + continue nextMessage // sent + } + } + } + if len(lerrs) == 0 { + // log.Notice("YAY") + return nil + } + var errmsgs []string + for _, err := range lerrs { + errmsgs = append(errmsgs, err.Error()) + } + err := errors.New("not sent: " + strings.Join(errmsgs, ",")) + log.Error(err) + return err +}*/ diff --git a/controller/json.go b/controller/json.go index dd1bb148..0c8fc35a 100644 --- a/controller/json.go +++ b/controller/json.go @@ -1,13 +1,21 @@ package controller -import "encoding/json" +import ( + "encoding/json" + + "github.com/tidwall/cast" +) func jsonString(s string) string { for i := 0; i < len(s); i++ { - if s[i] < 32 || s[i] > 126 { + if s[i] < ' ' || s[i] == '\\' || s[i] == '"' || s[i] > 126 { d, _ := json.Marshal(s) return string(d) } } - return `"` + s + `"` + b := make([]byte, len(s)+2) + b[0] = '"' + copy(b[1:], cast.ToBytes(s)) + b[len(b)-1] = '"' + return cast.ToString(b) }