Merge branch 'master' into Ext-server-stats

This commit is contained in:
tidwall 2018-11-29 15:22:48 -08:00
commit f795567f9c
5 changed files with 40 additions and 15 deletions

View File

@ -133,6 +133,10 @@ func refusedErrorString(addr string) string {
var groupsM = make(map[string][]string) var groupsM = make(map[string][]string)
func jsonOK(msg []byte) bool {
return gjson.GetBytes(msg, "ok").Bool()
}
func main() { func main() {
if !parseArgs() { if !parseArgs() {
return return
@ -335,13 +339,14 @@ func main() {
output = "resp" output = "resp"
} }
case "output json": case "output json":
if strings.HasPrefix(string(msg), `{"ok":true`) { if jsonOK(msg) {
output = "json" output = "json"
} }
} }
mustOutput := true mustOutput := true
if oneCommand == "" && !strings.HasPrefix(string(msg), `{"ok":true`) {
if oneCommand == "" && !jsonOK(msg) {
var cerr connError var cerr connError
if err := json.Unmarshal(msg, &cerr); err == nil { if err := json.Unmarshal(msg, &cerr); err == nil {
fmt.Fprintln(os.Stderr, "(error) "+cerr.Err) fmt.Fprintln(os.Stderr, "(error) "+cerr.Err)
@ -361,6 +366,7 @@ func main() {
} }
fmt.Fprintln(os.Stdout, string(msg)) fmt.Fprintln(os.Stdout, string(msg))
} else { } else {
msg = bytes.TrimSpace(msg)
if raw { if raw {
fmt.Fprintln(os.Stdout, string(msg)) fmt.Fprintln(os.Stdout, string(msg))
} else { } else {

View File

@ -63,7 +63,7 @@ type Endpoint struct {
Kafka struct { Kafka struct {
Host string Host string
Port int Port int
QueueName string TopicName string
} }
AMQP struct { AMQP struct {
URI string URI string
@ -157,8 +157,8 @@ func (epc *Manager) Validate(url string) error {
func (epc *Manager) Send(endpoint, msg string) error { func (epc *Manager) Send(endpoint, msg string) error {
for { for {
epc.mu.Lock() epc.mu.Lock()
conn, ok := epc.conns[endpoint] conn, exists := epc.conns[endpoint]
if !ok || conn.Expired() { if !exists || conn.Expired() {
ep, err := parseEndpoint(endpoint) ep, err := parseEndpoint(endpoint)
if err != nil { if err != nil {
epc.mu.Unlock() epc.mu.Unlock()
@ -370,14 +370,14 @@ func parseEndpoint(s string) (Endpoint, error) {
// Parsing Kafka queue name // Parsing Kafka queue name
if len(sp) > 1 { if len(sp) > 1 {
var err error var err error
endpoint.Kafka.QueueName, err = url.QueryUnescape(sp[1]) endpoint.Kafka.TopicName, err = url.QueryUnescape(sp[1])
if err != nil { if err != nil {
return endpoint, errors.New("invalid kafka topic name") return endpoint, errors.New("invalid kafka topic name")
} }
} }
// Throw error if we not provide any queue name // Throw error if we not provide any queue name
if endpoint.Kafka.QueueName == "" { if endpoint.Kafka.TopicName == "" {
return endpoint, errors.New("missing kafka topic name") return endpoint, errors.New("missing kafka topic name")
} }
} }

View File

@ -3,6 +3,7 @@ package endpoint
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/tidwall/gjson"
"sync" "sync"
"time" "time"
@ -71,8 +72,14 @@ func (conn *KafkaConn) Send(msg string) error {
conn.conn = c conn.conn = c
} }
// parse json again to get out info for our kafka key
key := gjson.Get(msg, "key")
id := gjson.Get(msg, "id")
keyValue := fmt.Sprintf("%s-%s", key.String(), id.String())
message := &sarama.ProducerMessage{ message := &sarama.ProducerMessage{
Topic: conn.ep.Kafka.QueueName, Topic: conn.ep.Kafka.TopicName,
Key: sarama.StringEncoder(keyValue),
Value: sarama.StringEncoder(msg), Value: sarama.StringEncoder(msg),
} }

View File

@ -211,6 +211,9 @@ func (server *Server) getQueueCandidates(d *commandDetails) []*Hook {
[]float64{rect.Max.X, rect.Max.Y}, []float64{rect.Max.X, rect.Max.Y},
func(_, _ []float64, value interface{}) bool { func(_, _ []float64, value interface{}) bool {
hook := value.(*Hook) hook := value.(*Hook)
if hook.Key != d.key {
return true
}
var found bool var found bool
for _, candidate := range candidates { for _, candidate := range candidates {
if candidate == hook { if candidate == hook {

View File

@ -233,11 +233,9 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) (
} }
if hook, ok := c.hooks[name]; ok && hook.channel == chanCmd { if hook, ok := c.hooks[name]; ok && hook.channel == chanCmd {
hook.Close() hook.Close()
// remove hook from maps
delete(c.hooks, hook.Name) delete(c.hooks, hook.Name)
delete(c.hooksOut, hook.Name) delete(c.hooksOut, hook.Name)
d.updated = true
// remove hook from spatial index // remove hook from spatial index
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil { if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
rect := hook.Fence.obj.Rect() rect := hook.Fence.obj.Rect()
@ -246,6 +244,7 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) (
[]float64{rect.Max.X, rect.Max.Y}, []float64{rect.Max.X, rect.Max.Y},
hook) hook)
} }
d.updated = true
} }
d.timestamp = time.Now() d.timestamp = time.Now()
@ -277,16 +276,26 @@ func (c *Server) cmdPDelHook(msg *Message, channel bool) (
} }
count := 0 count := 0
for name, h := range c.hooks { for name, hook := range c.hooks {
if h.channel != channel { if hook.channel != channel {
continue continue
} }
match, _ := glob.Match(pattern, name) match, _ := glob.Match(pattern, name)
if !match { if !match {
continue continue
} }
h.Close() hook.Close()
delete(c.hooks, h.Name) // remove hook from maps
delete(c.hooks, hook.Name)
delete(c.hooksOut, hook.Name)
// remove hook from spatial index
if hook != nil && hook.Fence != nil && hook.Fence.obj != nil {
rect := hook.Fence.obj.Rect()
c.hookTree.Delete(
[]float64{rect.Min.X, rect.Min.Y},
[]float64{rect.Max.X, rect.Max.Y},
hook)
}
d.updated = true d.updated = true
count++ count++
} }