diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index d0a04f6a..d84e0f72 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -24,6 +24,7 @@ type KafkaConn struct { mu sync.Mutex ep Endpoint conn sarama.SyncProducer + cfg *sarama.Config ex bool t time.Time } @@ -47,6 +48,8 @@ func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil + conn.cfg.MetricRegistry.UnregisterAll() + conn.cfg = nil } } @@ -72,6 +75,7 @@ func (conn *KafkaConn) Send(msg string) error { log.Debugf("building kafka tls config") tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } cfg.Net.TLS.Enable = true @@ -103,10 +107,12 @@ func (conn *KafkaConn) Send(msg string) error { c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } conn.conn = c + conn.cfg = cfg } // parse json again to get out info for our kafka key diff --git a/internal/server/aof.go b/internal/server/aof.go index c8ed7b6c..73cdfcf3 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -483,8 +483,16 @@ func (s *Server) cmdAOF(msg *Message) (res resp.Value, err error) { } func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Message) error { + s.mu.RLock() + f, err := os.Open(s.aof.Name()) + s.mu.RUnlock() + if err != nil { + return err + } + defer f.Close() + s.mu.Lock() - s.aofconnM[conn] = true + s.aofconnM[conn] = f s.mu.Unlock() defer func() { s.mu.Lock() @@ -497,13 +505,6 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess return err } - s.mu.RLock() - f, err := os.Open(s.aof.Name()) - s.mu.RUnlock() - if err != nil { - return err - } - defer f.Close() if _, err := f.Seek(pos, 0); err != nil { return err } @@ -552,11 +553,13 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess // The reader needs to be OK with the eof not for { n, err := f.Read(b) - if err != io.EOF && n > 0 { - if err != nil { + if n > 0 { + if _, err := conn.Write(b[:n]); err != nil { return err } - if _, err := conn.Write(b[:n]); err != nil { + } + if err != io.EOF { + if err != nil { return err } continue diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index be01af89..7aa7b331 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -237,6 +237,18 @@ func (server *Server) aofshrink() { server.mu.Lock() defer server.mu.Unlock() + // kill all followers connections and close their files. This + // ensures that there is only one opened AOF at a time which is + // what Windows requires in order to perform the Rename function + // below. + for conn, f := range server.aofconnM { + conn.Close() + f.Close() + } + + // send a broadcast to all sleeping followers + server.fcond.Broadcast() + // flush the aof buffer server.flushAOF(false) @@ -291,10 +303,6 @@ func (server *Server) aofshrink() { os.Remove(core.AppendFileName + "-bak") // ignore error - // kill all followers connections - for conn := range server.aofconnM { - conn.Close() - } return nil }() }() diff --git a/internal/server/server.go b/internal/server/server.go index a9b72498..2f531d78 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -123,7 +123,7 @@ type Server struct { hookCross rtree.RTree // hook spatial tree for "cross" geofences hookTree rtree.RTree // hook spatial tree for all hooksOut map[string]*Hook // hooks with "outside" detection - aofconnM map[net.Conn]bool + aofconnM map[net.Conn]io.Closer luascripts *lScriptMap luapool *lStatePool @@ -155,7 +155,7 @@ func Serve(host string, port int, dir string, http bool, metricsAddr string) err lcond: sync.NewCond(&sync.Mutex{}), hooks: make(map[string]*Hook), hooksOut: make(map[string]*Hook), - aofconnM: make(map[net.Conn]bool), + aofconnM: make(map[net.Conn]io.Closer), expires: rhh.New(0), started: time.Now(), conns: make(map[int]*Client),