diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index d0a04f6a..d84e0f72 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -24,6 +24,7 @@ type KafkaConn struct { mu sync.Mutex ep Endpoint conn sarama.SyncProducer + cfg *sarama.Config ex bool t time.Time } @@ -47,6 +48,8 @@ func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil + conn.cfg.MetricRegistry.UnregisterAll() + conn.cfg = nil } } @@ -72,6 +75,7 @@ func (conn *KafkaConn) Send(msg string) error { log.Debugf("building kafka tls config") tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } cfg.Net.TLS.Enable = true @@ -103,10 +107,12 @@ func (conn *KafkaConn) Send(msg string) error { c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } conn.conn = c + conn.cfg = cfg } // parse json again to get out info for our kafka key