diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index cacf32e0..d612ccfb 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -63,7 +63,7 @@ type Endpoint struct { Kafka struct { Host string Port int - QueueName string + TopicName string } AMQP struct { URI string @@ -157,8 +157,8 @@ func (epc *Manager) Validate(url string) error { func (epc *Manager) Send(endpoint, msg string) error { for { epc.mu.Lock() - conn, ok := epc.conns[endpoint] - if !ok || conn.Expired() { + conn, exists := epc.conns[endpoint] + if !exists || conn.Expired() { ep, err := parseEndpoint(endpoint) if err != nil { epc.mu.Unlock() @@ -370,14 +370,14 @@ func parseEndpoint(s string) (Endpoint, error) { // Parsing Kafka queue name if len(sp) > 1 { var err error - endpoint.Kafka.QueueName, err = url.QueryUnescape(sp[1]) + endpoint.Kafka.TopicName, err = url.QueryUnescape(sp[1]) if err != nil { return endpoint, errors.New("invalid kafka topic name") } } // Throw error if we not provide any queue name - if endpoint.Kafka.QueueName == "" { + if endpoint.Kafka.TopicName == "" { return endpoint, errors.New("missing kafka topic name") } } diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index e450fe0d..eb6080a5 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -3,6 +3,7 @@ package endpoint import ( "errors" "fmt" + "github.com/tidwall/gjson" "sync" "time" @@ -71,8 +72,14 @@ func (conn *KafkaConn) Send(msg string) error { conn.conn = c } + // parse json again to get out info for our kafka key + key := gjson.Get(msg, "key") + id := gjson.Get(msg, "id") + keyValue := fmt.Sprintf("%s-%s", key.String(), id.String()) + message := &sarama.ProducerMessage{ - Topic: conn.ep.Kafka.QueueName, + Topic: conn.ep.Kafka.TopicName, + Key: sarama.StringEncoder(keyValue), Value: sarama.StringEncoder(msg), }