Speed up leader/follower replication

This commit lowers the response time for a leader to send
updates to a follower. Should now be nearly instant.
This commit is contained in:
tidwall 2022-12-14 04:46:28 -07:00
parent e60ea706aa
commit a8c92a07c1
2 changed files with 18 additions and 9 deletions

View File

@ -130,6 +130,8 @@ func (s *Server) flushAOF(sync bool) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
// send a broadcast to all sleeping followers
s.fcond.Broadcast()
if sync { if sync {
if err := s.aof.Sync(); err != nil { if err := s.aof.Sync(); err != nil {
panic(err) panic(err)
@ -165,11 +167,6 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error {
s.aofsz += len(s.aofbuf) - n s.aofsz += len(s.aofbuf) - n
} }
// notify aof live connections that we have new data
s.fcond.L.Lock()
s.fcond.Broadcast()
s.fcond.L.Unlock()
// process geofences // process geofences
if d != nil { if d != nil {
// webhook geofences // webhook geofences
@ -516,6 +513,7 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess
if err != nil { if err != nil {
return err return err
} }
b := make([]byte, 4096*2) b := make([]byte, 4096*2)
for { for {
n, err := f.Read(b) n, err := f.Read(b)
@ -525,7 +523,9 @@ func (s *Server) liveAOF(pos int64, conn net.Conn, rd *PipelineReader, msg *Mess
} }
} }
if err == io.EOF { if err == io.EOF {
time.Sleep(time.Second / 4) s.fcond.L.Lock()
s.fcond.Wait()
s.fcond.L.Unlock()
} else if err != nil { } else if err != nil {
return err return err
} }

View File

@ -135,9 +135,9 @@ type Server struct {
fcond *sync.Cond fcond *sync.Cond
lstack []*commandDetails lstack []*commandDetails
lives map[*liveBuffer]bool lives map[*liveBuffer]bool
lcond *sync.Cond lcond *sync.Cond // live geofence signal
fcup bool // follow caught up fcup bool // follow caught up
fcuponce bool // follow caught up once fcuponce bool // follow caught up once
aofconnM map[net.Conn]io.Closer aofconnM map[net.Conn]io.Closer
// lua scripts // lua scripts
@ -305,10 +305,19 @@ func Serve(opts Options) error {
nerr <- s.netServe() nerr <- s.netServe()
}() }()
var fstop atomic.Bool
go func() {
for !fstop.Load() {
s.fcond.Broadcast()
time.Sleep(time.Second / 4)
}
}()
go func() { go func() {
<-opts.Shutdown <-opts.Shutdown
s.stopServer.Store(true) s.stopServer.Store(true)
log.Warnf("Shutting down...") log.Warnf("Shutting down...")
fstop.Store(true)
s.lnmu.Lock() s.lnmu.Lock()
ln := s.ln ln := s.ln
s.ln = nil s.ln = nil