From 5333fab870aac54e547801ac6ad080bc2d522b80 Mon Sep 17 00:00:00 2001 From: tidwall Date: Sun, 10 Mar 2019 10:48:14 -0700 Subject: [PATCH] Recycle aof buffer --- internal/server/aof.go | 7 ++++++- internal/server/aofshrink.go | 2 +- internal/server/follow.go | 4 ++-- internal/server/server.go | 26 +++++++++++++++++++++++--- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/internal/server/aof.go b/internal/server/aof.go index c190d128..42a42946 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -115,12 +115,17 @@ func commandErrIsFatal(err error) bool { return true } -func (server *Server) flushAOF() { +func (server *Server) flushAOF(sync bool) { if len(server.aofbuf) > 0 { _, err := server.aof.Write(server.aofbuf) if err != nil { panic(err) } + if sync { + if err := server.aof.Sync(); err != nil { + panic(err) + } + } server.aofbuf = server.aofbuf[:0] } } diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index efc1c1c1..095cad79 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -236,7 +236,7 @@ func (server *Server) aofshrink() { defer server.mu.Unlock() // flush the aof buffer - server.flushAOF() + server.flushAOF(false) aofbuf = aofbuf[:0] for _, values := range server.shrinklog { diff --git a/internal/server/follow.go b/internal/server/follow.go index 42d27dc5..6d12dd26 100644 --- a/internal/server/follow.go +++ b/internal/server/follow.go @@ -165,7 +165,7 @@ func (c *Server) followHandleCommand(args []string, followc int, w io.Writer) (i return c.aofsz, err } if len(c.aofbuf) > 10240 { - c.flushAOF() + c.flushAOF(false) } return c.aofsz, nil } @@ -291,7 +291,7 @@ func (c *Server) followStep(host string, port int, followc int) error { if aofsz >= int(aofSize) { caughtUp = true c.mu.Lock() - c.flushAOF() + c.flushAOF(false) c.fcup = true c.fcuponce = true c.mu.Unlock() diff --git a/internal/server/server.go b/internal/server/server.go index 94f38922..59f80c4f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -250,7 +250,7 @@ func Serve(host string, port int, dir string, http bool) error { return err } defer func() { - server.flushAOF() + server.flushAOF(false) server.aof.Sync() }() } @@ -266,6 +266,7 @@ func Serve(host string, port int, dir string, http bool) error { go server.watchLuaStatePool() go server.watchAutoGC() go server.backgroundExpiring() + go server.backgroundSyncAOF() defer func() { // Stop background routines server.followc.add(1) // this will force any follow communication to die @@ -487,7 +488,7 @@ func (server *Server) evioServe() error { if atomic.LoadInt32(&server.aofdirty) != 0 { server.mu.Lock() defer server.mu.Unlock() - server.flushAOF() + server.flushAOF(false) atomic.StoreInt32(&server.aofdirty, 1) } } @@ -665,7 +666,7 @@ func (server *Server) netServe() error { // prewrite server.mu.Lock() defer server.mu.Unlock() - server.flushAOF() + server.flushAOF(false) }() atomic.StoreInt32(&server.aofdirty, 0) } @@ -783,6 +784,25 @@ func (server *Server) watchLuaStatePool() { } } +// backgroundSyncAOF ensures that the aof buffer is does not grow too big. +func (server *Server) backgroundSyncAOF() { + t := time.NewTicker(time.Second) + defer t.Stop() + for range t.C { + if server.stopServer.on() { + return + } + func() { + server.mu.Lock() + defer server.mu.Unlock() + if len(server.aofbuf) > 0 { + server.flushAOF(true) + } + server.aofbuf = nil + }() + } +} + func (server *Server) setCol(key string, col *collection.Collection) { server.cols.Set(key, col) }