add kafka tls config

fix endpoint, add logging to tlsconfig creation

add logging if log.Level > 2
This commit is contained in:
Benjamin Ramser 2021-02-15 18:04:37 +01:00
parent d211858db6
commit 734d33365a
2 changed files with 76 additions and 3 deletions

View File

@ -64,6 +64,10 @@ type Endpoint struct {
Host string Host string
Port int Port int
TopicName string TopicName string
TLS bool
CACertFile string
CertFile string
KeyFile string
} }
AMQP struct { AMQP struct {
URI string URI string
@ -388,6 +392,29 @@ func parseEndpoint(s string) (Endpoint, error) {
if endpoint.Kafka.TopicName == "" { if endpoint.Kafka.TopicName == "" {
return endpoint, errors.New("missing kafka topic name") return endpoint, errors.New("missing kafka topic name")
} }
// Parsing additional params
if len(sqp) > 1 {
m, err := url.ParseQuery(sqp[1])
if err != nil {
return endpoint, errors.New("invalid kafka url")
}
for key, val := range m {
if len(val) == 0 {
continue
}
switch key {
case "tls":
endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0])
case "cacert":
endpoint.Kafka.CACertFile = val[0]
case "cert":
endpoint.Kafka.CertFile = val[0]
case "key":
endpoint.Kafka.KeyFile = val[0]
}
}
}
} }
if endpoint.Protocol == MQTT { if endpoint.Protocol == MQTT {

View File

@ -1,13 +1,20 @@
package endpoint package endpoint
import ( import (
"crypto/tls"
"crypto/x509"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"os"
"sync" "sync"
"time" "time"
lg "log"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/tidwall/tile38/internal/log"
) )
const kafkaExpiresAfter = time.Second * 30 const kafkaExpiresAfter = time.Second * 30
@ -53,9 +60,25 @@ func (conn *KafkaConn) Send(msg string) error {
} }
conn.t = time.Now() conn.t = time.Now()
if log.Level > 2 {
sarama.Logger = lg.New(os.Stdout, "[sarama] ", lg.LstdFlags)
}
uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port)
if conn.conn == nil { if conn.conn == nil {
cfg := sarama.NewConfig() cfg := sarama.NewConfig()
cfg.ClientID = "Tile38" // otherwise defaults to sarama
if conn.ep.Kafka.TLS {
log.Debugf("building kafka tls config")
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile)
if err != nil {
return err
}
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tlsConfig
}
cfg.Net.DialTimeout = time.Second cfg.Net.DialTimeout = time.Second
cfg.Net.ReadTimeout = time.Second * 5 cfg.Net.ReadTimeout = time.Second * 5
cfg.Net.WriteTimeout = time.Second * 5 cfg.Net.WriteTimeout = time.Second * 5
@ -102,3 +125,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn {
t: time.Now(), t: time.Now(),
} }
} }
func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// Load client cert
cert, err := tls.LoadX509KeyPair(CertFile, KeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// Load CA cert
caCert, err := ioutil.ReadFile(CACertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}