Fix Memory Leak in Kafka Producer

This commit addresses an issue where the sarama kafka library
leaks memory when a connection closes unless the metrics
configuration that was passed to new connection is also closed.

Fixes #613
This commit is contained in:
tidwall 2021-06-30 14:18:44 -07:00
parent 1497663b6d
commit aea7d77de5
1 changed files with 6 additions and 0 deletions

View File

@ -24,6 +24,7 @@ type KafkaConn struct {
mu sync.Mutex mu sync.Mutex
ep Endpoint ep Endpoint
conn sarama.SyncProducer conn sarama.SyncProducer
cfg *sarama.Config
ex bool ex bool
t time.Time t time.Time
} }
@ -47,6 +48,8 @@ func (conn *KafkaConn) close() {
if conn.conn != nil { if conn.conn != nil {
conn.conn.Close() conn.conn.Close()
conn.conn = nil conn.conn = nil
conn.cfg.MetricRegistry.UnregisterAll()
conn.cfg = nil
} }
} }
@ -72,6 +75,7 @@ func (conn *KafkaConn) Send(msg string) error {
log.Debugf("building kafka tls config") log.Debugf("building kafka tls config")
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile)
if err != nil { if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err return err
} }
cfg.Net.TLS.Enable = true cfg.Net.TLS.Enable = true
@ -103,10 +107,12 @@ func (conn *KafkaConn) Send(msg string) error {
c, err := sarama.NewSyncProducer([]string{uri}, cfg) c, err := sarama.NewSyncProducer([]string{uri}, cfg)
if err != nil { if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err return err
} }
conn.conn = c conn.conn = c
conn.cfg = cfg
} }
// parse json again to get out info for our kafka key // parse json again to get out info for our kafka key