Merge branch 'kafka-key' of https://github.com/flix-tech/tile38 into flix-tech-kafka-key

This commit is contained in:
tidwall 2018-11-26 19:27:13 -07:00
commit e086e785cb
2 changed files with 13 additions and 6 deletions

View File

@ -63,7 +63,7 @@ type Endpoint struct {
Kafka struct { Kafka struct {
Host string Host string
Port int Port int
QueueName string TopicName string
} }
AMQP struct { AMQP struct {
URI string URI string
@ -157,8 +157,8 @@ func (epc *Manager) Validate(url string) error {
func (epc *Manager) Send(endpoint, msg string) error { func (epc *Manager) Send(endpoint, msg string) error {
for { for {
epc.mu.Lock() epc.mu.Lock()
conn, ok := epc.conns[endpoint] conn, exists := epc.conns[endpoint]
if !ok || conn.Expired() { if !exists || conn.Expired() {
ep, err := parseEndpoint(endpoint) ep, err := parseEndpoint(endpoint)
if err != nil { if err != nil {
epc.mu.Unlock() epc.mu.Unlock()
@ -370,14 +370,14 @@ func parseEndpoint(s string) (Endpoint, error) {
// Parsing Kafka queue name // Parsing Kafka queue name
if len(sp) > 1 { if len(sp) > 1 {
var err error var err error
endpoint.Kafka.QueueName, err = url.QueryUnescape(sp[1]) endpoint.Kafka.TopicName, err = url.QueryUnescape(sp[1])
if err != nil { if err != nil {
return endpoint, errors.New("invalid kafka topic name") return endpoint, errors.New("invalid kafka topic name")
} }
} }
// Throw error if we not provide any queue name // Throw error if we not provide any queue name
if endpoint.Kafka.QueueName == "" { if endpoint.Kafka.TopicName == "" {
return endpoint, errors.New("missing kafka topic name") return endpoint, errors.New("missing kafka topic name")
} }
} }

View File

@ -3,6 +3,7 @@ package endpoint
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/tidwall/gjson"
"sync" "sync"
"time" "time"
@ -71,8 +72,14 @@ func (conn *KafkaConn) Send(msg string) error {
conn.conn = c conn.conn = c
} }
// parse json again to get out info for our kafka key
key := gjson.Get(msg, "key")
id := gjson.Get(msg, "id")
keyValue := fmt.Sprintf("%s-%s", key.String(), id.String())
message := &sarama.ProducerMessage{ message := &sarama.ProducerMessage{
Topic: conn.ep.Kafka.QueueName, Topic: conn.ep.Kafka.TopicName,
Key: sarama.StringEncoder(keyValue),
Value: sarama.StringEncoder(msg), Value: sarama.StringEncoder(msg),
} }