mirror of https://github.com/tidwall/tile38.git
add kafka tls config
fix endpoint, add logging to tlsconfig creation add logging if log.Level > 2
This commit is contained in:
parent
5f0e69ca7a
commit
248c3d8b72
|
@ -63,9 +63,13 @@ type Endpoint struct {
|
||||||
Channel string
|
Channel string
|
||||||
}
|
}
|
||||||
Kafka struct {
|
Kafka struct {
|
||||||
Host string
|
Host string
|
||||||
Port int
|
Port int
|
||||||
TopicName string
|
TopicName string
|
||||||
|
TLS bool
|
||||||
|
CACertFile string
|
||||||
|
CertFile string
|
||||||
|
KeyFile string
|
||||||
}
|
}
|
||||||
AMQP struct {
|
AMQP struct {
|
||||||
URI string
|
URI string
|
||||||
|
@ -399,6 +403,29 @@ func parseEndpoint(s string) (Endpoint, error) {
|
||||||
if endpoint.Kafka.TopicName == "" {
|
if endpoint.Kafka.TopicName == "" {
|
||||||
return endpoint, errors.New("missing kafka topic name")
|
return endpoint, errors.New("missing kafka topic name")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parsing additional params
|
||||||
|
if len(sqp) > 1 {
|
||||||
|
m, err := url.ParseQuery(sqp[1])
|
||||||
|
if err != nil {
|
||||||
|
return endpoint, errors.New("invalid kafka url")
|
||||||
|
}
|
||||||
|
for key, val := range m {
|
||||||
|
if len(val) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch key {
|
||||||
|
case "tls":
|
||||||
|
endpoint.Kafka.TLS, _ = strconv.ParseBool(val[0])
|
||||||
|
case "cacert":
|
||||||
|
endpoint.Kafka.CACertFile = val[0]
|
||||||
|
case "cert":
|
||||||
|
endpoint.Kafka.CertFile = val[0]
|
||||||
|
case "key":
|
||||||
|
endpoint.Kafka.KeyFile = val[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if endpoint.Protocol == MQTT {
|
if endpoint.Protocol == MQTT {
|
||||||
|
|
|
@ -1,13 +1,20 @@
|
||||||
package endpoint
|
package endpoint
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
lg "log"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
"github.com/tidwall/tile38/internal/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const kafkaExpiresAfter = time.Second * 30
|
const kafkaExpiresAfter = time.Second * 30
|
||||||
|
@ -53,9 +60,25 @@ func (conn *KafkaConn) Send(msg string) error {
|
||||||
}
|
}
|
||||||
conn.t = time.Now()
|
conn.t = time.Now()
|
||||||
|
|
||||||
|
if log.Level > 2 {
|
||||||
|
sarama.Logger = lg.New(os.Stdout, "[sarama] ", lg.LstdFlags)
|
||||||
|
}
|
||||||
|
|
||||||
uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port)
|
uri := fmt.Sprintf("%s:%d", conn.ep.Kafka.Host, conn.ep.Kafka.Port)
|
||||||
if conn.conn == nil {
|
if conn.conn == nil {
|
||||||
cfg := sarama.NewConfig()
|
cfg := sarama.NewConfig()
|
||||||
|
cfg.ClientID = "Tile38" // otherwise defaults to sarama
|
||||||
|
|
||||||
|
if conn.ep.Kafka.TLS {
|
||||||
|
log.Debugf("building kafka tls config")
|
||||||
|
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cfg.Net.TLS.Enable = true
|
||||||
|
cfg.Net.TLS.Config = tlsConfig
|
||||||
|
}
|
||||||
|
|
||||||
cfg.Net.DialTimeout = time.Second
|
cfg.Net.DialTimeout = time.Second
|
||||||
cfg.Net.ReadTimeout = time.Second * 5
|
cfg.Net.ReadTimeout = time.Second * 5
|
||||||
cfg.Net.WriteTimeout = time.Second * 5
|
cfg.Net.WriteTimeout = time.Second * 5
|
||||||
|
@ -102,3 +125,26 @@ func newKafkaConn(ep Endpoint) *KafkaConn {
|
||||||
t: time.Now(),
|
t: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newKafkaTLSConfig(CertFile, KeyFile, CACertFile string) (*tls.Config, error) {
|
||||||
|
tlsConfig := tls.Config{}
|
||||||
|
|
||||||
|
// Load client cert
|
||||||
|
cert, err := tls.LoadX509KeyPair(CertFile, KeyFile)
|
||||||
|
if err != nil {
|
||||||
|
return &tlsConfig, err
|
||||||
|
}
|
||||||
|
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||||
|
|
||||||
|
// Load CA cert
|
||||||
|
caCert, err := ioutil.ReadFile(CACertFile)
|
||||||
|
if err != nil {
|
||||||
|
return &tlsConfig, err
|
||||||
|
}
|
||||||
|
caCertPool := x509.NewCertPool()
|
||||||
|
caCertPool.AppendCertsFromPEM(caCert)
|
||||||
|
tlsConfig.RootCAs = caCertPool
|
||||||
|
|
||||||
|
tlsConfig.BuildNameToCertificate()
|
||||||
|
return &tlsConfig, err
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue