diff --git a/internal/server/aof.go b/internal/server/aof.go index a43eb491..bca23c9d 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -130,6 +130,8 @@ func (s *Server) flushAOF(sync bool) { if err != nil { panic(err) } + // send a broadcast to all sleeping followers + s.fcond.Broadcast() if sync { if err := s.aof.Sync(); err != nil { panic(err) @@ -165,11 +167,6 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error { s.aofsz += len(s.aofbuf) - n } - // notify aof live connections that we have new data - s.fcond.L.Lock() - s.fcond.Broadcast() - s.fcond.L.Unlock() - // process geofences if d != nil { // webhook geofences @@ -516,6 +513,7 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess if err != nil { return err } + b := make([]byte, 4096*2) for { n, err := f.Read(b) @@ -525,7 +523,9 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess } } if err == io.EOF { - time.Sleep(time.Second / 4) + s.fcond.L.Lock() + s.fcond.Wait() + s.fcond.L.Unlock() } else if err != nil { return err } diff --git a/internal/server/server.go b/internal/server/server.go index b5c5d3f3..95b37112 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -135,9 +135,9 @@ type Server struct { fcond *sync.Cond lstack []*commandDetails lives map[*liveBuffer]bool - lcond *sync.Cond - fcup bool // follow caught up - fcuponce bool // follow caught up once + lcond *sync.Cond // live geofence signal + fcup bool // follow caught up + fcuponce bool // follow caught up once aofconnM map[net.Conn]io.Closer // lua scripts @@ -305,10 +305,19 @@ func Serve(opts Options) error { nerr <- s.netServe() }() + var fstop atomic.Bool + go func() { + for !fstop.Load() { + s.fcond.Broadcast() + time.Sleep(time.Second / 4) + } + }() + go func() { <-opts.Shutdown s.stopServer.Store(true) log.Warnf("Shutting down...") + fstop.Store(true) s.lnmu.Lock() ln := s.ln s.ln = nil