fixed shared string issue

This commit is contained in:
Josh Baker 2016-04-02 14:13:20 -07:00
parent a3725c7a2a
commit 6eb21b890c
9 changed files with 145 additions and 70 deletions

View File

@ -94,7 +94,9 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
}
if c.config.FollowHost == "" {
// process hooks, for leader only
return c.processHooks(d)
if err := c.processHooks(d); err != nil {
return err
}
}
}
data, err := value.MarshalRESP()

59
controller/disque.go Normal file
View File

@ -0,0 +1,59 @@
package controller
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
// TODO: add one connection pool per endpoint. Use Redigo.
// The current implementation is too slow.
var endpointDisqueMu sync.Mutex
type endpointDisqueConn struct {
mu sync.Mutex
}
var endpointDisqueM = make(map[string]*endpointDisqueConn)
func sendDisqueMessage(endpoint Endpoint, msg []byte) error {
endpointDisqueMu.Lock()
conn, ok := endpointDisqueM[endpoint.Original]
if !ok {
conn = &endpointDisqueConn{
//client: &http.Client{Transport: &http.Transport{}},
}
endpointDisqueM[endpoint.Original] = conn
}
endpointDisqueMu.Unlock()
conn.mu.Lock()
defer conn.mu.Unlock()
addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port)
dconn, err := DialTimeout(addr, time.Second/4)
if err != nil {
return err
}
defer dconn.Close()
options := []interface{}{endpoint.Disque.QueueName, msg, 0}
replicate := endpoint.Disque.Options.Replicate
if replicate > 0 {
options = append(options, "REPLICATE")
options = append(options, endpoint.Disque.Options.Replicate)
}
v, err := dconn.Do("ADDJOB", options...)
if err != nil {
return err
}
if v.Error() != nil {
return v.Error()
}
id := v.String()
p := strings.Split(id, "-")
if len(p) != 4 {
return errors.New("invalid disque reply")
}
return nil
}

View File

@ -10,9 +10,11 @@ import (
var tmfmt = "2006-01-02T15:04:05.999999999Z07:00"
func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, details *commandDetailsT) []string {
jshookName := jsonString(hookName)
jstime := jsonString(details.timestamp.Format(tmfmt))
glob := fence.glob
if details.command == "drop" {
return []string{`{"cmd":"drop","time":` + details.timestamp.Format(tmfmt) + `}`}
return []string{`{"cmd":"drop","hook":` + jshookName + `,"time":` + jstime + `}`}
}
match := true
if glob != "" && glob != "*" {
@ -22,7 +24,11 @@ func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
return nil
}
if details.obj == nil || (details.command == "fset" && sw.nofields) {
sw.mu.Lock()
nofields := sw.nofields
sw.mu.Unlock()
if details.obj == nil || (details.command == "fset" && nofields) {
return nil
}
match = false
@ -72,45 +78,51 @@ func FenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, detai
}
}
if details.command == "del" {
return []string{`{"command":"del","id":` + jsonString(details.id) + `,"time":` + details.timestamp.Format(tmfmt) + `}`}
return []string{`{"command":"del","hook":` + jshookName + `,"id":` + jsonString(details.id) + `,"time":` + jstime + `}`}
}
if details.fmap == nil {
return nil
}
sw.mu.Lock()
sw.fmap = details.fmap
sw.fullFields = true
sw.msg.OutputType = server.JSON
sw.writeObject(details.id, details.obj, details.fields)
sw.writeObject(details.id, details.obj, details.fields, true)
if sw.wr.Len() == 0 {
sw.mu.Unlock()
return nil
}
res := sw.wr.String()
resb := make([]byte, len(res))
copy(resb, res)
res = string(resb)
sw.wr.Reset()
if strings.HasPrefix(res, ",") {
res = res[1:]
}
if sw.output == outputIDs {
res = `{"id":` + res + `}`
}
jskey := jsonString(details.key)
sw.mu.Unlock()
jshookName := jsonString(hookName)
if strings.HasPrefix(res, ",") {
res = res[1:]
}
jskey := jsonString(details.key)
ores := res
msgs := make([]string, 0, 2)
if fence.detect == nil || fence.detect[detect] {
if strings.HasPrefix(ores, "{") {
res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"time":"` + details.timestamp.Format(tmfmt) + `","key":` + jskey + `,` + ores[1:]
res = `{"command":"` + details.command + `","detect":"` + detect + `","hook":` + jshookName + `,"key":` + jskey + `,"time":` + jstime + `,` + ores[1:]
}
msgs = append(msgs, res)
}
switch detect {
case "enter":
if fence.detect == nil || fence.detect["inside"] {
msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:])
msgs = append(msgs, `{"command":"`+details.command+`","detect":"inside","hook":`+jshookName+`,"key":`+jskey+`,"time":`+jstime+`,`+ores[1:])
}
case "exit", "cross":
if fence.detect == nil || fence.detect["outside"] {
msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"time":"`+details.timestamp.Format(tmfmt)+`","key":`+jskey+`,`+ores[1:])
msgs = append(msgs, `{"command":"`+details.command+`","detect":"outside","hook":`+jshookName+`,"key":`+jskey+`,"time":`+jstime+`,`+ores[1:])
}
}
return msgs

View File

@ -3,8 +3,6 @@ package controller
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/url"
"sort"
"strconv"
@ -69,16 +67,16 @@ nextMessage:
}
}
if len(lerrs) == 0 {
// log.Notice("YAY")
return nil
}
var errmsgs []string
for _, err := range lerrs {
errmsgs = append(errmsgs, err.Error())
}
if len(errmsgs) > 0 {
return errors.New("not sent: " + strings.Join(errmsgs, ","))
}
return errors.New("not sent")
err := errors.New("not sent: " + strings.Join(errmsgs, ","))
log.Error(err)
return err
}
type hooksByName []*Hook
@ -387,43 +385,3 @@ func (c *Controller) cmdHooks(msg *server.Message) (res string, err error) {
}
return "", nil
}
func sendHTTPMessage(endpoint Endpoint, msg []byte) error {
resp, err := http.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("enpoint returned status code %d", resp.StatusCode)
}
return nil
}
func sendDisqueMessage(endpoint Endpoint, msg []byte) error {
addr := fmt.Sprintf("%s:%d", endpoint.Disque.Host, endpoint.Disque.Port)
conn, err := DialTimeout(addr, time.Second/4)
if err != nil {
return err
}
defer conn.Close()
options := []interface{}{endpoint.Disque.QueueName, msg, 0}
replicate := endpoint.Disque.Options.Replicate
if replicate > 0 {
options = append(options, "REPLICATE")
options = append(options, endpoint.Disque.Options.Replicate)
}
v, err := conn.Do("ADDJOB", options...)
if err != nil {
return err
}
if v.Error() != nil {
return v.Error()
}
id := v.String()
p := strings.Split(id, "-")
if len(p) != 4 {
return errors.New("invalid disque reply")
}
return nil
}

43
controller/http.go Normal file
View File

@ -0,0 +1,43 @@
package controller
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"sync"
)
var endpointHTTPMu sync.Mutex
type endpointHTTPConn struct {
mu sync.Mutex
client *http.Client
}
var endpointHTTPM = make(map[string]*endpointHTTPConn)
func sendHTTPMessage(endpoint Endpoint, msg []byte) error {
endpointHTTPMu.Lock()
conn, ok := endpointHTTPM[endpoint.Original]
if !ok {
conn = &endpointHTTPConn{
client: &http.Client{Transport: &http.Transport{}},
}
endpointHTTPM[endpoint.Original] = conn
}
endpointHTTPMu.Unlock()
conn.mu.Lock()
defer conn.mu.Unlock()
res, err := conn.client.Post(endpoint.Original, "application/json", bytes.NewBuffer(msg))
if err != nil {
return err
}
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("endpoint returned status code %d", res.StatusCode)
}
return nil
}

View File

@ -8,7 +8,6 @@ import (
"net"
"sync"
"github.com/tidwall/tile38/client"
"github.com/tidwall/tile38/controller/log"
"github.com/tidwall/tile38/controller/server"
)
@ -139,7 +138,7 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.AnyReaderWrit
var livemsg []byte
switch outputType {
case server.JSON:
livemsg = []byte(client.LiveJSON)
livemsg = []byte(`{"ok":true,"live":true}`)
case server.RESP:
livemsg = []byte("+OK\r\n")
}

View File

@ -59,16 +59,16 @@ func (c *Controller) cmdScan(msg *server.Message) (res string, err error) {
greaterGlob := sw.glob[:len(sw.glob)-1]
if globIsGlob(greaterGlob) {
s.cursor = sw.col.Scan(s.cursor, func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(id, o, fields)
return sw.writeObject(id, o, fields, false)
})
} else {
s.cursor = sw.col.ScanGreaterOrEqual(sw.glob, s.cursor, func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(id, o, fields)
return sw.writeObject(id, o, fields, false)
})
}
} else {
s.cursor = sw.col.Scan(s.cursor, func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(id, o, fields)
return sw.writeObject(id, o, fields, false)
})
}
}

View File

@ -220,9 +220,11 @@ func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64,
return sw.fvals, true
}
func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64) bool {
sw.mu.Lock()
defer sw.mu.Unlock()
func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64, noLock bool) bool {
if !noLock {
sw.mu.Lock()
defer sw.mu.Unlock()
}
keepGoing := true
if !sw.globEverything {
if sw.globSingle {

View File

@ -239,7 +239,7 @@ func (c *Controller) cmdNearby(msg *server.Message) (res string, err error) {
sw.writeHead()
if sw.col != nil {
s.cursor = sw.col.Nearby(s.cursor, s.sparse, s.lat, s.lon, s.meters, func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(id, o, fields)
return sw.writeObject(id, o, fields, false)
})
}
sw.writeFoot(s.cursor)
@ -279,13 +279,13 @@ func (c *Controller) cmdWithinOrIntersects(cmd string, msg *server.Message) (res
if cmd == "within" {
s.cursor = sw.col.Within(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon,
func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(id, o, fields)
return sw.writeObject(id, o, fields, false)
},
)
} else if cmd == "intersects" {
s.cursor = sw.col.Intersects(s.cursor, s.sparse, s.o, s.minLat, s.minLon, s.maxLat, s.maxLon,
func(id string, o geojson.Object, fields []float64) bool {
return sw.writeObject(id, o, fields)
return sw.writeObject(id, o, fields, false)
},
)
}