Merge branch 'iwpnd-kafka-tls'

This commit is contained in:
tidwall 2021-02-18 15:34:34 -07:00
commit 53af1e2306
3 changed files with 79 additions and 3 deletions

View File

@ -61,9 +61,13 @@ type Endpoint struct {
Channel string Channel string
} }
Kafka struct { Kafka struct {
Host string Host string
Port int Port int
TopicName string TopicName string
TLS bool
CACertFile string
CertFile string
KeyFile string
} }
AMQP struct { AMQP struct {
URI string URI string
@ -388,6 +392,29 @@ func parseEndpoint(s string) (Endpoint, error) {
if endpoint.Kafka.TopicName == "" { if endpoint.Kafka.TopicName == "" {
return endpoint, errors.New("missing kafka topic name") return endpoint, errors.New("missing kafka topic name")
} }
// Parsing additional params
if len(sqp) > 1 {
m, err := url.ParseQuery(sqp[1])
if err != nil {
return endpoint, errors.New("invalid kafka url")
}
for key, val := range m {
if len(val) == 0 {
continue
}
switch key {
case "tls":
endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0])
case "cacert":
endpoint.Kafka.CACertFile = val[0]
case "cert":
endpoint.Kafka.CertFile = val[0]
case "key":
endpoint.Kafka.KeyFile = val[0]
}
}
}
} }
if endpoint.Protocol == MQTT { if endpoint.Protocol == MQTT {

View File

@ -1,13 +1,19 @@
package endpoint package endpoint
import ( import (
"crypto/tls"
"crypto/x509"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"sync" "sync"
"time" "time"
lg "log"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/tidwall/tile38/internal/log"
) )
const kafkaExpiresAfter = time.Second * 30 const kafkaExpiresAfter = time.Second * 30
@ -53,9 +59,24 @@ func (conn *KafkaConn) Send(msg string) error {
} }
conn.t = time.Now() conn.t = time.Now()
if log.Level > 2 {
sarama.Logger = lg.New(log.Output(), "[sarama] ", 0)
}
uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port) uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port)
if conn.conn == nil { if conn.conn == nil {
cfg := sarama.NewConfig() cfg := sarama.NewConfig()
if conn.ep.Kafka.TLS {
log.Debugf("building kafka tls config")
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile)
if err != nil {
return err
}
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tlsConfig
}
cfg.Net.DialTimeout = time.Second cfg.Net.DialTimeout = time.Second
cfg.Net.ReadTimeout = time.Second * 5 cfg.Net.ReadTimeout = time.Second * 5
cfg.Net.WriteTimeout = time.Second * 5 cfg.Net.WriteTimeout = time.Second * 5
@ -102,3 +123,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn {
t: time.Now(), t: time.Now(),
} }
} }
func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// Load client cert
cert, err := tls.LoadX509KeyPair(CertFile, KeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// Load CA cert
caCert, err := ioutil.ReadFile(CACertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}

View File

@ -32,6 +32,11 @@ func init() {
SetOutput(os.Stderr) SetOutput(os.Stderr)
} }
// Output retuns the output writer
func Output() io.Writer {
return wr
}
func log(level int, tag, color string, formatted bool, format string, args ...interface{}) { func log(level int, tag, color string, formatted bool, format string, args ...interface{}) {
if Level < level { if Level < level {
return return