From 734d33365ae382b86639689a68003663902d8f7e Mon Sep 17 00:00:00 2001 From: Benjamin Ramser Date: Mon, 15 Feb 2021 18:04:37 +0100 Subject: [PATCH 1/3] add kafka tls config fix endpoint, add logging to tlsconfig creation add logging if log.Level > 2 --- internal/endpoint/endpoint.go | 33 ++++++++++++++++++++++--- internal/endpoint/kafka.go | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 31609923..a0917f22 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -61,9 +61,13 @@ type Endpoint struct { Channel string } Kafka struct { - Host string - Port int - TopicName string + Host string + Port int + TopicName string + TLS bool + CACertFile string + CertFile string + KeyFile string } AMQP struct { URI string @@ -388,6 +392,29 @@ func parseEndpoint(s string) (Endpoint, error) { if endpoint.Kafka.TopicName == "" { return endpoint, errors.New("missing kafka topic name") } + + // Parsing additional params + if len(sqp) > 1 { + m, err := url.ParseQuery(sqp[1]) + if err != nil { + return endpoint, errors.New("invalid kafka url") + } + for key, val := range m { + if len(val) == 0 { + continue + } + switch key { + case "tls": + endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0]) + case "cacert": + endpoint.Kafka.CACertFile = val[0] + case "cert": + endpoint.Kafka.CertFile = val[0] + case "key": + endpoint.Kafka.KeyFile = val[0] + } + } + } } if endpoint.Protocol == MQTT { diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 44606bbc..9f0e1df2 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -1,13 +1,20 @@ package endpoint import ( + "crypto/tls" + "crypto/x509" "errors" "fmt" + "io/ioutil" + "os" "sync" "time" + lg "log" + "github.com/Shopify/sarama" "github.com/tidwall/gjson" + "github.com/tidwall/tile38/internal/log" ) const kafkaExpiresAfter = time.Second * 30 @@ -53,9 +60,25 @@ func (conn *KafkaConn) Send(msg string) error { } conn.t = time.Now() + if log.Level > 2 { + sarama.Logger = lg.New(os.Stdout, "[sarama] ", lg.LstdFlags) + } + uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) if conn.conn == nil { cfg := sarama.NewConfig() + cfg.ClientID = "Tile38" // otherwise defaults to sarama + + if conn.ep.Kafka.TLS { + log.Debugf("building kafka tls config") + tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) + if err != nil { + return err + } + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsConfig + } + cfg.Net.DialTimeout = time.Second cfg.Net.ReadTimeout = time.Second * 5 cfg.Net.WriteTimeout = time.Second * 5 @@ -102,3 +125,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn { t: time.Now(), } } + +func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) { + tlsConfig := tls.Config{} + + // Load client cert + cert, err := tls.LoadX509KeyPair(CertFile, KeyFile) + if err != nil { + return &tlsConfig, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + + // Load CA cert + caCert, err := ioutil.ReadFile(CACertFile) + if err != nil { + return &tlsConfig, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caCertPool + + tlsConfig.BuildNameToCertificate() + return &tlsConfig, err +} From d7c0c5b8551e323619aea7126578e339adf057a7 Mon Sep 17 00:00:00 2001 From: Benjamin Ramser Date: Thu, 18 Feb 2021 22:35:49 +0100 Subject: [PATCH 2/3] refactor: dont set client id --- internal/endpoint/kafka.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 9f0e1df2..310a0194 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -67,7 +67,6 @@ func (conn *KafkaConn) Send(msg string) error { uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) if conn.conn == nil { cfg := sarama.NewConfig() - cfg.ClientID = "Tile38" // otherwise defaults to sarama if conn.ep.Kafka.TLS { log.Debugf("building kafka tls config") From 9ac25647add88a3ee04d77b6a362d759e1d472e1 Mon Sep 17 00:00:00 2001 From: tidwall Date: Thu, 18 Feb 2021 15:34:01 -0700 Subject: [PATCH 3/3] Expose log output writer --- internal/endpoint/kafka.go | 3 +-- internal/log/log.go | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 9f0e1df2..4698e788 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io/ioutil" - "os" "sync" "time" @@ -61,7 +60,7 @@ func (conn *KafkaConn) Send(msg string) error { conn.t = time.Now() if log.Level > 2 { - sarama.Logger = lg.New(os.Stdout, "[sarama] ", lg.LstdFlags) + sarama.Logger = lg.New(log.Output(), "[sarama] ", 0) } uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) diff --git a/internal/log/log.go b/internal/log/log.go index 80abb7ea..eaebf044 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -32,6 +32,11 @@ func init() { SetOutput(os.Stderr) } +// Output retuns the output writer +func Output() io.Writer { + return wr +} + func log(level int, tag, color string, formatted bool, format string, args ...interface{}) { if Level < level { return