Merge pull request #614 from tidwall/kafka-memory-leak

Kafka memory leak
This commit is contained in:
Josh 2021-07-08 06:15:16 -07:00 committed by GitHub
commit e5f3224e08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 17 deletions

View File

@ -24,6 +24,7 @@ type KafkaConn struct {
mu sync.Mutex
ep Endpoint
conn sarama.SyncProducer
cfg *sarama.Config
ex bool
t time.Time
}
@ -47,6 +48,8 @@ func (conn *KafkaConn) close() {
if conn.conn != nil {
conn.conn.Close()
conn.conn = nil
conn.cfg.MetricRegistry.UnregisterAll()
conn.cfg = nil
}
}
@ -72,6 +75,7 @@ func (conn *KafkaConn) Send(msg string) error {
log.Debugf("building kafka tls config")
tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile)
if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err
}
cfg.Net.TLS.Enable = true
@ -103,10 +107,12 @@ func (conn *KafkaConn) Send(msg string) error {
c, err := sarama.NewSyncProducer([]string{uri}, cfg)
if err != nil {
cfg.MetricRegistry.UnregisterAll()
return err
}
conn.conn = c
conn.cfg = cfg
}
// parse json again to get out info for our kafka key

View File

@ -483,8 +483,16 @@ func (s *Server) cmdAOF(msg *Message) (res resp.Value, err error) {
}
func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error {
s.mu.RLock()
f, err := os.Open(s.aof.Name())
s.mu.RUnlock()
if err != nil {
return err
}
defer f.Close()
s.mu.Lock()
s.aofconnM[conn] = true
s.aofconnM[conn] = f
s.mu.Unlock()
defer func() {
s.mu.Lock()
@ -497,13 +505,6 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess
return err
}
s.mu.RLock()
f, err := os.Open(s.aof.Name())
s.mu.RUnlock()
if err != nil {
return err
}
defer f.Close()
if _, err := f.Seek(pos, 0); err != nil {
return err
}
@ -552,11 +553,13 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess
// The reader needs to be OK with the eof not
for {
n, err := f.Read(b)
if err != io.EOF && n > 0 {
if err != nil {
if n > 0 {
if _, err := conn.Write(b[:n]); err != nil {
return err
}
if _, err := conn.Write(b[:n]); err != nil {
}
if err != io.EOF {
if err != nil {
return err
}
continue

View File

@ -237,6 +237,18 @@ func (server *Server) aofshrink() {
server.mu.Lock()
defer server.mu.Unlock()
// kill all followers connections and close their files. This
// ensures that there is only one opened AOF at a time which is
// what Windows requires in order to perform the Rename function
// below.
for conn, f := range server.aofconnM {
conn.Close()
f.Close()
}
// send a broadcast to all sleeping followers
server.fcond.Broadcast()
// flush the aof buffer
server.flushAOF(false)
@ -291,10 +303,6 @@ func (server *Server) aofshrink() {
os.Remove(core.AppendFileName + "-bak") // ignore error
// kill all followers connections
for conn := range server.aofconnM {
conn.Close()
}
return nil
}()
}()

View File

@ -123,7 +123,7 @@ type Server struct {
hookCross rtree.RTree // hook spatial tree for "cross" geofences
hookTree rtree.RTree // hook spatial tree for all
hooksOut map[string]*Hook // hooks with "outside" detection
aofconnM map[net.Conn]bool
aofconnM map[net.Conn]io.Closer
luascripts *lScriptMap
luapool *lStatePool
@ -155,7 +155,7 @@ func Serve(host string, port int, dir string, http bool, metricsAddr string) err
lcond: sync.NewCond(&sync.Mutex{}),
hooks: make(map[string]*Hook),
hooksOut: make(map[string]*Hook),
aofconnM: make(map[net.Conn]bool),
aofconnM: make(map[net.Conn]io.Closer),
expires: rhh.New(0),
started: time.Now(),
conns: make(map[int]*Client),