diff --git a/cmd/tile38-cli/main.go b/cmd/tile38-cli/main.go index d6daf1da..1d13d141 100644 --- a/cmd/tile38-cli/main.go +++ b/cmd/tile38-cli/main.go @@ -133,6 +133,10 @@ func refusedErrorString(addr string) string { var groupsM = make(map[string][]string) +func jsonOK(msg []byte) bool { + return gjson.GetBytes(msg, "ok").Bool() +} + func main() { if !parseArgs() { return @@ -335,13 +339,14 @@ func main() { output = "resp" } case "output json": - if strings.HasPrefix(string(msg), `{"ok":true`) { + if jsonOK(msg) { output = "json" } } mustOutput := true - if oneCommand == "" && !strings.HasPrefix(string(msg), `{"ok":true`) { + + if oneCommand == "" && !jsonOK(msg) { var cerr connError if err := json.Unmarshal(msg, &cerr); err == nil { fmt.Fprintln(os.Stderr, "(error) "+cerr.Err) @@ -361,6 +366,7 @@ func main() { } fmt.Fprintln(os.Stdout, string(msg)) } else { + msg = bytes.TrimSpace(msg) if raw { fmt.Fprintln(os.Stdout, string(msg)) } else { diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index cacf32e0..d612ccfb 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -63,7 +63,7 @@ type Endpoint struct { Kafka struct { Host string Port int - QueueName string + TopicName string } AMQP struct { URI string @@ -157,8 +157,8 @@ func (epc *Manager) Validate(url string) error { func (epc *Manager) Send(endpoint, msg string) error { for { epc.mu.Lock() - conn, ok := epc.conns[endpoint] - if !ok || conn.Expired() { + conn, exists := epc.conns[endpoint] + if !exists || conn.Expired() { ep, err := parseEndpoint(endpoint) if err != nil { epc.mu.Unlock() @@ -370,14 +370,14 @@ func parseEndpoint(s string) (Endpoint, error) { // Parsing Kafka queue name if len(sp) > 1 { var err error - endpoint.Kafka.QueueName, err = url.QueryUnescape(sp[1]) + endpoint.Kafka.TopicName, err = url.QueryUnescape(sp[1]) if err != nil { return endpoint, errors.New("invalid kafka topic name") } } // Throw error if we not provide any queue name - if endpoint.Kafka.QueueName == "" { + if endpoint.Kafka.TopicName == "" { return endpoint, errors.New("missing kafka topic name") } } diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index e450fe0d..eb6080a5 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -3,6 +3,7 @@ package endpoint import ( "errors" "fmt" + "github.com/tidwall/gjson" "sync" "time" @@ -71,8 +72,14 @@ func (conn *KafkaConn) Send(msg string) error { conn.conn = c } + // parse json again to get out info for our kafka key + key := gjson.Get(msg, "key") + id := gjson.Get(msg, "id") + keyValue := fmt.Sprintf("%s-%s", key.String(), id.String()) + message := &sarama.ProducerMessage{ - Topic: conn.ep.Kafka.QueueName, + Topic: conn.ep.Kafka.TopicName, + Key: sarama.StringEncoder(keyValue), Value: sarama.StringEncoder(msg), } diff --git a/internal/server/aof.go b/internal/server/aof.go index 73a70845..26c31158 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -211,6 +211,9 @@ func (server *Server) getQueueCandidates(d *commandDetails) []*Hook { []float64{rect.Max.X, rect.Max.Y}, func(_, _ []float64, value interface{}) bool { hook := value.(*Hook) + if hook.Key != d.key { + return true + } var found bool for _, candidate := range candidates { if candidate == hook { diff --git a/internal/server/hooks.go b/internal/server/hooks.go index 7b9d22c3..b473fe20 100644 --- a/internal/server/hooks.go +++ b/internal/server/hooks.go @@ -233,11 +233,9 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) ( } if hook, ok := c.hooks[name]; ok && hook.channel == chanCmd { hook.Close() + // remove hook from maps delete(c.hooks, hook.Name) delete(c.hooksOut, hook.Name) - - d.updated = true - // remove hook from spatial index if hook != nil && hook.Fence != nil && hook.Fence.obj != nil { rect := hook.Fence.obj.Rect() @@ -246,6 +244,7 @@ func (c *Server) cmdDelHook(msg *Message, chanCmd bool) ( []float64{rect.Max.X, rect.Max.Y}, hook) } + d.updated = true } d.timestamp = time.Now() @@ -277,16 +276,26 @@ func (c *Server) cmdPDelHook(msg *Message, channel bool) ( } count := 0 - for name, h := range c.hooks { - if h.channel != channel { + for name, hook := range c.hooks { + if hook.channel != channel { continue } match, _ := glob.Match(pattern, name) if !match { continue } - h.Close() - delete(c.hooks, h.Name) + hook.Close() + // remove hook from maps + delete(c.hooks, hook.Name) + delete(c.hooksOut, hook.Name) + // remove hook from spatial index + if hook != nil && hook.Fence != nil && hook.Fence.obj != nil { + rect := hook.Fence.obj.Rect() + c.hookTree.Delete( + []float64{rect.Min.X, rect.Min.Y}, + []float64{rect.Max.X, rect.Max.Y}, + hook) + } d.updated = true count++ }