async hooks

This commit is contained in:
Josh Baker 2016-04-02 07:20:30 -07:00
parent cdc2bbee73
commit a3725c7a2a
7 changed files with 95 additions and 64 deletions

View File

@ -16,6 +16,8 @@ import (
"github.com/tidwall/tile38/controller/server"
)
const AsyncHooks = true
type errAOFHook struct {
err error
}
@ -92,16 +94,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
}
if c.config.FollowHost == "" {
// process hooks, for leader only
if hm, ok := c.hookcols[d.key]; ok {
for _, hook := range hm {
if err := c.DoHook(hook, d); err != nil {
if d.revert != nil {
d.revert()
}
return errAOFHook{err}
}
}
}
return c.processHooks(d)
}
}
data, err := value.MarshalRESP()
@ -126,7 +119,24 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
c.lcond.Broadcast()
c.lcond.L.Unlock()
}
return nil
}
func (c *Controller) processHooks(d *commandDetailsT) error {
if hm, ok := c.hookcols[d.key]; ok {
for _, hook := range hm {
if AsyncHooks {
go hook.Do(d)
} else {
if err := hook.Do(d); err != nil {
if d.revert != nil {
d.revert()
}
return errAOFHook{err}
}
}
}
}
return nil
}

View File

@ -35,10 +35,12 @@ type commandDetailsT struct {
value float64
obj geojson.Object
fields []float64
fmap map[string]int
oldObj geojson.Object
oldFields []float64
updated bool
revert func()
timestamp time.Time
}
func (col *collectionT) Less(item btree.Item) bool {

View File

@ -241,6 +241,7 @@ func (c *Controller) cmdDel(msg *server.Message) (res string, d commandDetailsT,
}
d.command = "del"
d.updated = found
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
@ -278,6 +279,7 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT
d.updated = false
}
d.command = "drop"
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
@ -303,6 +305,7 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai
c.hookcols = make(map[string]map[string]*Hook)
d.command = "flushdb"
d.updated = true
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
@ -530,6 +533,12 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT,
}
d.command = "set"
d.updated = true // perhaps we should do a diff on the previous object?
fmap := col.FieldMap()
d.fmap = make(map[string]int)
for key, idx := range fmap {
d.fmap[key] = idx
}
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
@ -584,19 +593,24 @@ func (c *Controller) cmdFset(msg *server.Message) (res string, d commandDetailsT
return
}
var ok bool
var updated bool
d.obj, d.fields, updated, ok = col.SetField(d.id, d.field, d.value)
d.obj, d.fields, d.updated, ok = col.SetField(d.id, d.field, d.value)
if !ok {
err = errIDNotFound
return
}
d.command = "fset"
d.updated = updated
d.timestamp = time.Now()
fmap := col.FieldMap()
d.fmap = make(map[string]int)
for key, idx := range fmap {
d.fmap[key] = idx
}
switch msg.OutputType {
case server.JSON:
res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}"
case server.RESP:
if updated {
if d.updated {
res = ":1\r\n"
} else {
res = ":0\r\n"

View File

@ -2,16 +2,17 @@ package controller
import (
"strings"
"time"
"github.com/tidwall/tile38/controller/server"
"github.com/tidwall/tile38/geojson"
)
func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT, mustLock bool) []string {
var tmfmt = "2006-01-02T15:04:05.999999999Z07:00"
func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) []string {
glob := fence.glob
if details.command == "drop" {
return []string{`{"cmd":"drop"}`}
return []string{`{"cmd":"drop","time":` + details.timestamp.Format(tmfmt) + `}`}
}
match := true
if glob != "" && glob != "*" {
@ -38,51 +39,45 @@ func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenc
} else if !match1 && match2 {
match = true
detect = "enter"
if details.command == "fset" {
detect = "inside"
}
} else {
// Maybe the old object and new object create a line that crosses the fence.
// Must detect for that possibility.
if details.oldObj != nil {
ls := geojson.LineString{
Coordinates: []geojson.Position{
details.oldObj.CalculatedPoint(),
details.obj.CalculatedPoint(),
},
}
temp := false
if fence.cmd == "within" {
// because we are testing if the line croses the area we need to use
// "intersects" instead of "within".
fence.cmd = "intersects"
temp = true
}
if fenceMatchObject(fence, ls) {
match = true
detect = "cross"
}
if temp {
fence.cmd = "within"
if details.command != "fset" {
// Maybe the old object and new object create a line that crosses the fence.
// Must detect for that possibility.
if details.oldObj != nil {
ls := geojson.LineString{
Coordinates: []geojson.Position{
details.oldObj.CalculatedPoint(),
details.obj.CalculatedPoint(),
},
}
temp := false
if fence.cmd == "within" {
// because we are testing if the line croses the area we need to use
// "intersects" instead of "within".
fence.cmd = "intersects"
temp = true
}
if fenceMatchObject(fence, ls) {
match = true
detect = "cross"
}
if temp {
fence.cmd = "within"
}
}
}
}
}
if details.command == "del" {
return []string{`{"command":"del","id":` + jsonString(details.id) + `}`}
return []string{`{"command":"del","id":` + jsonString(details.id) + `,"time":` + details.timestamp.Format(tmfmt) + `}`}
}
var fmap map[string]int
if mustLock {
c.mu.RLock()
}
col := c.getCol(details.key)
if col != nil {
fmap = col.FieldMap()
}
if mustLock {
c.mu.RUnlock()
}
if fmap == nil {
if details.fmap == nil {
return nil
}
sw.fmap = fmap
sw.fmap = details.fmap
sw.fullFields = true
sw.msg.OutputType = server.JSON
sw.writeObject(details.id, details.obj, details.fields)
@ -98,24 +93,24 @@ func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenc
res = `{"id":` + res + `}`
}
jskey := jsonString(details.key)
jstime := time.Now().Format("2006-01-02T15:04:05.999999999Z07:00")
jshookName := jsonString(hookName)
ores := res
msgs := make([]string, 0, 2)
if fence.detect == nil || fence.detect[detect] {
if strings.HasPrefix(ores, "{") {
res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + jstime + `","key":` + jskey + `,` + ores[1:]
res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + details.timestamp.Format(tmfmt) + `","key":` + jskey + `,` + ores[1:]
}
msgs = append(msgs, res)
}
switch detect {
case "enter":
if fence.detect == nil || fence.detect["inside"] {
msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+jstime+`","key":`+jskey+`,`+ores[1:])
msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:])
}
case "exit", "cross":
if fence.detect == nil || fence.detect["outside"] {
msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+jstime+`","key":`+jskey+`,`+ores[1:])
msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:])
}
}
return msgs

View File

@ -45,22 +45,22 @@ type Hook struct {
ScanWriter *scanWriter
}
func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error {
func (hook *Hook) Do(details *commandDetailsT) error {
var lerrs []error
msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false)
msgs := FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details)
nextMessage:
for _, msg := range msgs {
nextEndpoint:
for _, endpoint := range hook.Endpoints {
switch endpoint.Protocol {
case HTTP:
if err := c.sendHTTPMessage(endpoint, []byte(msg)); err != nil {
if err := sendHTTPMessage(endpoint, []byte(msg)); err != nil {
lerrs = append(lerrs, err)
continue nextEndpoint
}
continue nextMessage // sent
case Disque:
if err := c.sendDisqueMessage(endpoint, []byte(msg)); err != nil {
if err := sendDisqueMessage(endpoint, []byte(msg)); err != nil {
lerrs = append(lerrs, err)
continue nextEndpoint
}
@ -262,6 +262,7 @@ func (c *Controller) cmdSetHook(msg *server.Message) (res string, d commandDetai
delete(c.hooks, h.Name)
}
d.updated = true
d.timestamp = time.Now()
c.hooks[name] = hook
hm, ok := c.hookcols[hook.Key]
if !ok {
@ -297,6 +298,7 @@ func (c *Controller) cmdDelHook(msg *server.Message) (res string, d commandDetai
delete(c.hooks, h.Name)
d.updated = true
}
d.timestamp = time.Now()
switch msg.OutputType {
case server.JSON:
@ -386,7 +388,7 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) {
return "", nil
}
func (c *Controller) sendHTTPMessage(endpoint Endpoint, msg []byte) error {
func sendHTTPMessage(endpoint Endpoint, msg []byte) error {
resp, err := http.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg))
if err != nil {
return err
@ -398,7 +400,7 @@ func (c *Controller) sendHTTPMessage(endpoint Endpoint, msg []byte) error {
return nil
}
func (c *Controller) sendDisqueMessage(endpoint Endpoint, msg []byte) error {
func sendDisqueMessage(endpoint Endpoint, msg []byte) error {
addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port)
conn, err := DialTimeout(addr, time.Second/4)
if err != nil {

View File

@ -160,7 +160,7 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.AnyReaderWrit
}
fence := lb.fence
lb.cond.L.Unlock()
msgs := c.FenceMatch("", sw, fence, details, true)
msgs := FenceMatch("", sw, fence, details)
for _, msg := range msgs {
if err := writeMessage(conn, []byte(msg), true, connType, websocket); err != nil {
return nil // nil return is fine here

View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"strconv"
"sync"
"github.com/tidwall/resp"
"github.com/tidwall/tile38/controller/collection"
@ -27,6 +28,7 @@ const (
)
type scanWriter struct {
mu sync.Mutex
wr *bytes.Buffer
msg *server.Message
col *collection.Collection
@ -100,6 +102,8 @@ func (sw *scanWriter) hasFieldsOutput() bool {
}
func (sw *scanWriter) writeHead() {
sw.mu.Lock()
defer sw.mu.Unlock()
switch sw.msg.OutputType {
case server.JSON:
if len(sw.farr) > 0 && sw.hasFieldsOutput() {
@ -131,6 +135,8 @@ func (sw *scanWriter) writeHead() {
}
func (sw *scanWriter) writeFoot(cursor uint64) {
sw.mu.Lock()
defer sw.mu.Unlock()
if !sw.hitLimit {
cursor = 0
}
@ -215,6 +221,8 @@ func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64,
}
func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64) bool {
sw.mu.Lock()
defer sw.mu.Unlock()
keepGoing := true
if !sw.globEverything {
if sw.globSingle {