From e4b03ca17428b06b463f910dc3c220a2ae3b367f Mon Sep 17 00:00:00 2001 From: Benjamin Ramser Date: Thu, 8 Jul 2021 12:05:16 +0200 Subject: [PATCH] Add endpoint option for sasl --- internal/endpoint/endpoint.go | 3 +++ internal/endpoint/kafka.go | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 836d7a48..a7742026 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -66,6 +66,7 @@ type Endpoint struct { Host string Port int TopicName string + SASL bool TLS bool CACertFile string CertFile string @@ -423,6 +424,8 @@ func parseEndpoint(s string) (Endpoint, error) { endpoint.Kafka.CertFile = val[0] case "key": endpoint.Kafka.KeyFile = val[0] + case "sasl": + endpoint.Kafka.SASL, _ = strconv.ParseBool(val[0]) } } } diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 901f22f2..a7e11f3c 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -76,7 +76,9 @@ func (conn *KafkaConn) Send(msg string) error { } cfg.Net.TLS.Enable = true cfg.Net.TLS.Config = tlsConfig - } else { + } + + if conn.ep.Kafka.SASL { log.Debugf("building kafka sasl config") cfg.ClientID = "sasl_scram_client" cfg.Net.SASL.Enable = true