From df8d3d7b12e843879927ba7cd5613054e88ab22a Mon Sep 17 00:00:00 2001 From: tidwall Date: Sun, 13 Jun 2021 07:53:27 -0700 Subject: [PATCH 1/2] Close follower files before finishing aofshrink fixes #449 --- internal/server/aof.go | 25 ++++++++++++++----------- internal/server/aofshrink.go | 16 ++++++++++++---- internal/server/server.go | 4 ++-- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/internal/server/aof.go b/internal/server/aof.go index c8ed7b6c..73cdfcf3 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -483,8 +483,16 @@ func (s *Server) cmdAOF(msg *Message) (res resp.Value, err error) { } func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error { + s.mu.RLock() + f, err := os.Open(s.aof.Name()) + s.mu.RUnlock() + if err != nil { + return err + } + defer f.Close() + s.mu.Lock() - s.aofconnM[conn] = true + s.aofconnM[conn] = f s.mu.Unlock() defer func() { s.mu.Lock() @@ -497,13 +505,6 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess return err } - s.mu.RLock() - f, err := os.Open(s.aof.Name()) - s.mu.RUnlock() - if err != nil { - return err - } - defer f.Close() if _, err := f.Seek(pos, 0); err != nil { return err } @@ -552,11 +553,13 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess // The reader needs to be OK with the eof not for { n, err := f.Read(b) - if err != io.EOF && n > 0 { - if err != nil { + if n > 0 { + if _, err := conn.Write(b[:n]); err != nil { return err } - if _, err := conn.Write(b[:n]); err != nil { + } + if err != io.EOF { + if err != nil { return err } continue diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index be01af89..7aa7b331 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -237,6 +237,18 @@ func (server *Server) aofshrink() { server.mu.Lock() defer server.mu.Unlock() + // kill all followers connections and close their files. This + // ensures that there is only one opened AOF at a time which is + // what Windows requires in order to perform the Rename function + // below. + for conn, f := range server.aofconnM { + conn.Close() + f.Close() + } + + // send a broadcast to all sleeping followers + server.fcond.Broadcast() + // flush the aof buffer server.flushAOF(false) @@ -291,10 +303,6 @@ func (server *Server) aofshrink() { os.Remove(core.AppendFileName + "-bak") // ignore error - // kill all followers connections - for conn := range server.aofconnM { - conn.Close() - } return nil }() }() diff --git a/internal/server/server.go b/internal/server/server.go index a9b72498..2f531d78 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -123,7 +123,7 @@ type Server struct { hookCross rtree.RTree // hook spatial tree for "cross" geofences hookTree rtree.RTree // hook spatial tree for all hooksOut map[string]*Hook // hooks with "outside" detection - aofconnM map[net.Conn]bool + aofconnM map[net.Conn]io.Closer luascripts *lScriptMap luapool *lStatePool @@ -155,7 +155,7 @@ func Serve(host string, port int, dir string, http bool, metricsAddr string) err lcond: sync.NewCond(&sync.Mutex{}), hooks: make(map[string]*Hook), hooksOut: make(map[string]*Hook), - aofconnM: make(map[net.Conn]bool), + aofconnM: make(map[net.Conn]io.Closer), expires: rhh.New(0), started: time.Now(), conns: make(map[int]*Client), From ef5a428591890840f9a32af41f613d3a2172564d Mon Sep 17 00:00:00 2001 From: tidwall Date: Wed, 30 Jun 2021 14:18:44 -0700 Subject: [PATCH 2/2] Fix Memory Leak in Kafka Producer This commit addresses an issue where the sarama kafka library leaks memory when a connection closes unless the metrics configuration that was passed to new connection is also closed. Fixes #613 --- internal/endpoint/kafka.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index 5df14a67..ffc233bf 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -23,6 +23,7 @@ type KafkaConn struct { mu sync.Mutex ep Endpoint conn sarama.SyncProducer + cfg *sarama.Config ex bool t time.Time } @@ -46,6 +47,8 @@ func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil + conn.cfg.MetricRegistry.UnregisterAll() + conn.cfg = nil } } @@ -71,6 +74,7 @@ func (conn *KafkaConn) Send(msg string) error { log.Debugf("building kafka tls config") tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } cfg.Net.TLS.Enable = true @@ -86,10 +90,12 @@ func (conn *KafkaConn) Send(msg string) error { c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } conn.conn = c + conn.cfg = cfg } // parse json again to get out info for our kafka key