Fix Memory Leak in Kafka Producer

This commit addresses an issue where the sarama kafka library
leaks memory when a connection closes unless the metrics
configuration that was passed to new connection is also closed.

Fixes #613
This commit is contained in:
tidwall 2021-06-30 14:18:44 -07:00
parent df8d3d7b12
commit ef5a428591
1 changed files with 6 additions and 0 deletions

View File

@ -23,6 +23,7 @@ type KafkaConn struct {
mu sync.Mutex
ep Endpoint
conn sarama.SyncProducer
cfg *sarama.Config
ex bool
t time.Time
}
@ -46,6 +47,8 @@ func (conn *KafkaConn) close() {
if conn.conn != nil {
conn.conn.Close()
conn.conn = nil
conn.cfg.MetricRegistry.UnregisterAll()
conn.cfg = nil
}
}
@ -71,6 +74,7 @@ func (conn *KafkaConn) Send(msg string) error {
log.Debugf("building kafka tls config")
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile)
if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err
}
cfg.Net.TLS.Enable = true
@ -86,10 +90,12 @@ func (conn *KafkaConn) Send(msg string) error {
c, err := sarama.NewSyncProducer([]string{uri}, cfg)
if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err
}
conn.conn = c
conn.cfg = cfg
}
// parse json again to get out info for our kafka key