refactor: kafka auth

This commit is contained in:
Benjamin Ramser 2021-08-03 14:46:40 +02:00
parent 6e52e3a7eb
commit 9ee91ca5c3
2 changed files with 65 additions and 36 deletions

View File

@ -66,10 +66,10 @@ type Endpoint struct {
Host string Host string
Port int Port int
TopicName string TopicName string
SASL bool Auth string
SSL bool
SASLSHA256 bool SASLSHA256 bool
SASLSHA512 bool SASLSHA512 bool
TLS bool
CACertFile string CACertFile string
CertFile string CertFile string
KeyFile string KeyFile string
@ -422,16 +422,16 @@ func parseEndpoint(s string) (Endpoint, error) {
continue continue
} }
switch key { switch key {
case "tls": case "auth":
endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0]) endpoint.Kafka.Auth = val[0]
case "ssl":
endpoint.Kafka.SSL, _ = strconv.ParseBool(val[0])
case "cacert": case "cacert":
endpoint.Kafka.CACertFile = val[0] endpoint.Kafka.CACertFile = val[0]
case "cert": case "cert":
endpoint.Kafka.CertFile = val[0] endpoint.Kafka.CertFile = val[0]
case "key": case "key":
endpoint.Kafka.KeyFile = val[0] endpoint.Kafka.KeyFile = val[0]
case "sasl":
endpoint.Kafka.SASL, _ = strconv.ParseBool(val[0])
case "sha256": case "sha256":
endpoint.Kafka.SASLSHA256, _ = strconv.ParseBool(val[0]) endpoint.Kafka.SASLSHA256, _ = strconv.ParseBool(val[0])
case "sha512": case "sha512":

View File

@ -71,18 +71,30 @@ func (conn *KafkaConn) Send(msg string) error {
if conn.conn == nil { if conn.conn == nil {
cfg := sarama.NewConfig() cfg := sarama.NewConfig()
if conn.ep.Kafka.TLS { cfg.Net.DialTimeout = time.Second
log.Debugf("building kafka tls config") cfg.Net.ReadTimeout = time.Second * 5
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) cfg.Net.WriteTimeout = time.Second * 5
// Fix #333 : fix backward incompatibility introduced by sarama library
cfg.Producer.Return.Successes = true
cfg.Version = sarama.V0_10_0_0
switch conn.ep.Kafka.Auth {
case "sasl":
// Other than TLS authentication, SASL does not require SSL
if conn.ep.Kafka.SSL {
tlsConfig := tls.Config{}
log.Debugf("building kafka tls root config")
caCertPool, err := loadRootTLSCert(conn.ep.Kafka.CACertFile)
if err != nil { if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err return err
} }
tlsConfig.RootCAs = &caCertPool
cfg.Net.TLS.Enable = true cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tlsConfig cfg.Net.TLS.Config = &tlsConfig
} }
if conn.ep.Kafka.SASL {
log.Debugf("building kafka sasl config") log.Debugf("building kafka sasl config")
cfg.Net.SASL.Enable = true cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = os.Getenv("KAFKA_USERNAME") cfg.Net.SASL.User = os.Getenv("KAFKA_USERNAME")
@ -96,14 +108,29 @@ func (conn *KafkaConn) Send(msg string) error {
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} }
case "tls":
tlsConfig := tls.Config{}
log.Debugf("building kafka tls client config")
certificates, err := loadClientTLSCert(conn.ep.Kafka.KeyFile, conn.ep.Kafka.CertFile)
if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err
} }
cfg.Net.DialTimeout = time.Second // TLS authentication requires SSL
cfg.Net.ReadTimeout = time.Second * 5 // and Tile38 requires certificates to be validated
cfg.Net.WriteTimeout = time.Second * 5 caCertPool, err := loadRootTLSCert(conn.ep.Kafka.CACertFile)
// Fix #333 : fix backward incompatibility introduced by sarama library if err != nil {
cfg.Producer.Return.Successes = true return err
cfg.Version = sarama.V0_10_0_0 }
cfg.Net.TLS.Enable = true
tlsConfig.Certificates = certificates
tlsConfig.RootCAs = &caCertPool
cfg.Net.TLS.Config = &tlsConfig
}
c, err := sarama.NewSyncProducer([]string{uri}, cfg) c, err := sarama.NewSyncProducer([]string{uri}, cfg)
if err != nil { if err != nil {
@ -147,24 +174,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn {
} }
} }
func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) { func loadClientTLSCert(KeyFile, CertFile string) ([]tls.Certificate, error) {
tlsConfig := tls.Config{} // load client cert
// Load client cert
cert, err := tls.LoadX509KeyPair(CertFile, KeyFile) cert, err := tls.LoadX509KeyPair(CertFile, KeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
if err != nil {
return []tls.Certificate{cert}, err
}
return []tls.Certificate{cert}, err
}
func loadRootTLSCert(CACertFile string) (x509.CertPool, error) {
// Load CA cert // Load CA cert
caCert, err := ioutil.ReadFile(CACertFile) caCert, err := ioutil.ReadFile(CACertFile)
if err != nil { if err != nil {
return &tlsConfig, err return x509.CertPool{}, err
} }
caCertPool := x509.NewCertPool() caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert) caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool return *caCertPool, err
return &tlsConfig, err
} }