From bdfa11e8dd897f8af3ac6e083d30bb06f38ed874 Mon Sep 17 00:00:00 2001 From: tidwall Date: Fri, 15 Feb 2019 11:11:40 -0700 Subject: [PATCH] Background fsync and aofbuf flushing --- internal/collection/item/item.go | 6 +--- internal/server/aof.go | 7 +++- internal/server/aofshrink.go | 2 +- internal/server/follow.go | 4 +-- internal/server/server.go | 58 ++++++++++++++++++++------------ 5 files changed, 47 insertions(+), 30 deletions(-) diff --git a/internal/collection/item/item.go b/internal/collection/item/item.go index 4f3c78dc..ea612c8b 100644 --- a/internal/collection/item/item.go +++ b/internal/collection/item/item.go @@ -72,11 +72,7 @@ func New(id string, obj geojson.Object) *Item { item = (*Item)(unsafe.Pointer(oitem)) } item.idLen = uint32(len(id)) - if len(id) > 0 { - data := make([]byte, len(id)) - copy(data, id) - item.data = unsafe.Pointer(&data[0]) - } + item.data = unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&id)).Data) return item } diff --git a/internal/server/aof.go b/internal/server/aof.go index c190d128..42a42946 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -115,12 +115,17 @@ func commandErrIsFatal(err error) bool { return true } -func (server *Server) flushAOF() { +func (server *Server) flushAOF(sync bool) { if len(server.aofbuf) > 0 { _, err := server.aof.Write(server.aofbuf) if err != nil { panic(err) } + if sync { + if err := server.aof.Sync(); err != nil { + panic(err) + } + } server.aofbuf = server.aofbuf[:0] } } diff --git a/internal/server/aofshrink.go b/internal/server/aofshrink.go index 03ca574f..d3c8c95b 100644 --- a/internal/server/aofshrink.go +++ b/internal/server/aofshrink.go @@ -240,7 +240,7 @@ func (server *Server) aofshrink() { defer server.mu.Unlock() // flush the aof buffer - server.flushAOF() + server.flushAOF(false) aofbuf = aofbuf[:0] for _, values := range server.shrinklog { diff --git a/internal/server/follow.go b/internal/server/follow.go index 1531fb1c..bd014cd2 100644 --- a/internal/server/follow.go +++ b/internal/server/follow.go @@ -165,7 +165,7 @@ func (c *Server) followHandleCommand(args []string, followc int, w io.Writer) (i return c.aofsz, err } if len(c.aofbuf) > 10240 { - c.flushAOF() + c.flushAOF(false) } return c.aofsz, nil } @@ -301,7 +301,7 @@ func (c *Server) followStep(host string, port int, followc int) error { if aofsz >= int(aofSize) { caughtUp = true c.mu.Lock() - c.flushAOF() + c.flushAOF(false) c.fcup = true c.fcuponce = true c.mu.Unlock() diff --git a/internal/server/server.go b/internal/server/server.go index 08fa1468..d04fbc1c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -252,7 +252,7 @@ func Serve(host string, port int, dir string, http bool) error { return err } defer func() { - server.flushAOF() + server.flushAOF(false) server.aof.Sync() }() } @@ -268,6 +268,7 @@ func Serve(host string, port int, dir string, http bool) error { go server.watchLuaStatePool() go server.watchAutoGC() go server.backgroundExpiring() + go server.backgroundSyncAOF() defer func() { // Stop background routines server.followc.add(1) // this will force any follow communication to die @@ -489,7 +490,7 @@ func (server *Server) evioServe() error { if atomic.LoadInt32(&server.aofdirty) != 0 { server.mu.Lock() defer server.mu.Unlock() - server.flushAOF() + server.flushAOF(false) atomic.StoreInt32(&server.aofdirty, 1) } } @@ -667,7 +668,7 @@ func (server *Server) netServe() error { // prewrite server.mu.Lock() defer server.mu.Unlock() - server.flushAOF() + server.flushAOF(false) }() atomic.StoreInt32(&server.aofdirty, 0) } @@ -755,23 +756,21 @@ func (server *Server) watchOutOfMemory() { defer t.Stop() var mem runtime.MemStats for range t.C { - func() { - if server.stopServer.on() { - return - } - oom := server.outOfMemory.on() - if server.config.maxMemory() == 0 { - if oom { - server.outOfMemory.set(false) - } - return - } + if server.stopServer.on() { + return + } + oom := server.outOfMemory.on() + if server.config.maxMemory() == 0 { if oom { - runtime.GC() + server.outOfMemory.set(false) } - runtime.ReadMemStats(&mem) - server.outOfMemory.set(int(mem.HeapAlloc) > server.config.maxMemory()) - }() + return + } + if oom { + runtime.GC() + } + runtime.ReadMemStats(&mem) + server.outOfMemory.set(int(mem.HeapAlloc) > server.config.maxMemory()) } } @@ -779,9 +778,26 @@ func (server *Server) watchLuaStatePool() { t := time.NewTicker(time.Second * 10) defer t.Stop() for range t.C { - func() { - server.luapool.Prune() - }() + if server.stopServer.on() { + return + } + server.luapool.Prune() + } +} + +func (server *Server) backgroundSyncAOF() { + t := time.NewTicker(time.Second) + defer t.Stop() + for range t.C { + if server.stopServer.on() { + return + } + server.mu.Lock() + if len(server.aofbuf) > 0 { + server.flushAOF(true) + } + server.aofbuf = nil + server.mu.Unlock() } }