diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index a0917f22..eee293e3 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -64,6 +64,7 @@ type Endpoint struct { Host string Port int TopicName string + SASL bool TLS bool CACertFile string CertFile string @@ -412,6 +413,8 @@ func parseEndpoint(s string) (Endpoint, error) { endpoint.Kafka.CertFile = val[0] case "key": endpoint.Kafka.KeyFile = val[0] + case "sasl": + endpoint.Kafka.SASL, _ = strconv.ParseBool(val[0]) } } } diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 901f22f2..a7e11f3c 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -76,7 +76,9 @@ func (conn *KafkaConn) Send(msg string) error { } cfg.Net.TLS.Enable = true cfg.Net.TLS.Config = tlsConfig - } else { + } + + if conn.ep.Kafka.SASL { log.Debugf("building kafka sasl config") cfg.ClientID = "sasl_scram_client" cfg.Net.SASL.Enable = true