diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index a7742026..22256d80 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -67,6 +67,8 @@ type Endpoint struct { Port int TopicName string SASL bool + SASLSHA256 bool + SASLSHA512 bool TLS bool CACertFile string CertFile string @@ -426,6 +428,10 @@ func parseEndpoint(s string) (Endpoint, error) { endpoint.Kafka.KeyFile = val[0] case "sasl": endpoint.Kafka.SASL, _ = strconv.ParseBool(val[0]) + case "sha256": + endpoint.Kafka.SASLSHA256, _ = strconv.ParseBool(val[0]) + case "sha512": + endpoint.Kafka.SASLSHA512, _ = strconv.ParseBool(val[0]) } } } diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 77cf0348..d0a04f6a 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -84,8 +84,14 @@ func (conn *KafkaConn) Send(msg string) error { cfg.Net.SASL.User = os.Getenv("KAFKA_USERNAME") cfg.Net.SASL.Password = os.Getenv("KAFKA_PASSWORD") cfg.Net.SASL.Handshake = true - cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } - cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + if conn.ep.Kafka.SASLSHA256 { + cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } + cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + } + if conn.ep.Kafka.SASLSHA512 { + cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } } cfg.Net.DialTimeout = time.Second