diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 94676240..836d7a48 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -63,9 +63,13 @@ type Endpoint struct { Channel string } Kafka struct { - Host string - Port int - TopicName string + Host string + Port int + TopicName string + TLS bool + CACertFile string + CertFile string + KeyFile string } AMQP struct { URI string @@ -399,6 +403,29 @@ func parseEndpoint(s string) (Endpoint, error) { if endpoint.Kafka.TopicName == "" { return endpoint, errors.New("missing kafka topic name") } + + // Parsing additional params + if len(sqp) > 1 { + m, err := url.ParseQuery(sqp[1]) + if err != nil { + return endpoint, errors.New("invalid kafka url") + } + for key, val := range m { + if len(val) == 0 { + continue + } + switch key { + case "tls": + endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0]) + case "cacert": + endpoint.Kafka.CACertFile = val[0] + case "cert": + endpoint.Kafka.CertFile = val[0] + case "key": + endpoint.Kafka.KeyFile = val[0] + } + } + } } if endpoint.Protocol == MQTT { diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 44606bbc..9f0e1df2 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -1,13 +1,20 @@ package endpoint import ( + "crypto/tls" + "crypto/x509" "errors" "fmt" + "io/ioutil" + "os" "sync" "time" + lg "log" + "github.com/Shopify/sarama" "github.com/tidwall/gjson" + "github.com/tidwall/tile38/internal/log" ) const kafkaExpiresAfter = time.Second * 30 @@ -53,9 +60,25 @@ func (conn *KafkaConn) Send(msg string) error { } conn.t = time.Now() + if log.Level > 2 { + sarama.Logger = lg.New(os.Stdout, "[sarama] ", lg.LstdFlags) + } + uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) if conn.conn == nil { cfg := sarama.NewConfig() + cfg.ClientID = "Tile38" // otherwise defaults to sarama + + if conn.ep.Kafka.TLS { + log.Debugf("building kafka tls config") + tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) + if err != nil { + return err + } + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsConfig + } + cfg.Net.DialTimeout = time.Second cfg.Net.ReadTimeout = time.Second * 5 cfg.Net.WriteTimeout = time.Second * 5 @@ -102,3 +125,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn { t: time.Now(), } } + +func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) { + tlsConfig := tls.Config{} + + // Load client cert + cert, err := tls.LoadX509KeyPair(CertFile, KeyFile) + if err != nil { + return &tlsConfig, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + + // Load CA cert + caCert, err := ioutil.ReadFile(CACertFile) + if err != nil { + return &tlsConfig, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caCertPool + + tlsConfig.BuildNameToCertificate() + return &tlsConfig, err +}