From ef5a428591890840f9a32af41f613d3a2172564d Mon Sep 17 00:00:00 2001 From: tidwall Date: Wed, 30 Jun 2021 14:18:44 -0700 Subject: [PATCH] Fix Memory Leak in Kafka Producer This commit addresses an issue where the sarama kafka library leaks memory when a connection closes unless the metrics configuration that was passed to new connection is also closed. Fixes #613 --- internal/endpoint/kafka.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 5df14a67..ffc233bf 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -23,6 +23,7 @@ type KafkaConn struct { mu sync.Mutex ep Endpoint conn sarama.SyncProducer + cfg *sarama.Config ex bool t time.Time } @@ -46,6 +47,8 @@ func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil + conn.cfg.MetricRegistry.UnregisterAll() + conn.cfg = nil } } @@ -71,6 +74,7 @@ func (conn *KafkaConn) Send(msg string) error { log.Debugf("building kafka tls config") tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } cfg.Net.TLS.Enable = true @@ -86,10 +90,12 @@ func (conn *KafkaConn) Send(msg string) error { c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } conn.conn = c + conn.cfg = cfg } // parse json again to get out info for our kafka key