Merge pull request #429 from tidwall/mem-optz

Release aof buffer periodically
This commit is contained in:
Josh Baker 2019-03-10 10:52:28 -07:00 committed by GitHub
commit e05b3dc25c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 7 deletions

View File

@ -115,12 +115,17 @@ func commandErrIsFatal(err error) bool {
return true return true
} }
func (server *Server) flushAOF() { func (server *Server) flushAOF(sync bool) {
if len(server.aofbuf) > 0 { if len(server.aofbuf) > 0 {
_, err := server.aof.Write(server.aofbuf) _, err := server.aof.Write(server.aofbuf)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if sync {
if err := server.aof.Sync(); err != nil {
panic(err)
}
}
server.aofbuf = server.aofbuf[:0] server.aofbuf = server.aofbuf[:0]
} }
} }

View File

@ -236,7 +236,7 @@ func (server *Server) aofshrink() {
defer server.mu.Unlock() defer server.mu.Unlock()
// flush the aof buffer // flush the aof buffer
server.flushAOF() server.flushAOF(false)
aofbuf = aofbuf[:0] aofbuf = aofbuf[:0]
for _, values := range server.shrinklog { for _, values := range server.shrinklog {

View File

@ -165,7 +165,7 @@ func (c *Server) followHandleCommand(args []string, followc int, w io.Writer) (i
return c.aofsz, err return c.aofsz, err
} }
if len(c.aofbuf) > 10240 { if len(c.aofbuf) > 10240 {
c.flushAOF() c.flushAOF(false)
} }
return c.aofsz, nil return c.aofsz, nil
} }
@ -291,7 +291,7 @@ func (c *Server) followStep(host string, port int, followc int) error {
if aofsz >= int(aofSize) { if aofsz >= int(aofSize) {
caughtUp = true caughtUp = true
c.mu.Lock() c.mu.Lock()
c.flushAOF() c.flushAOF(false)
c.fcup = true c.fcup = true
c.fcuponce = true c.fcuponce = true
c.mu.Unlock() c.mu.Unlock()

View File

@ -250,7 +250,7 @@ func Serve(host string, port int, dir string, http bool) error {
return err return err
} }
defer func() { defer func() {
server.flushAOF() server.flushAOF(false)
server.aof.Sync() server.aof.Sync()
}() }()
} }
@ -266,6 +266,7 @@ func Serve(host string, port int, dir string, http bool) error {
go server.watchLuaStatePool() go server.watchLuaStatePool()
go server.watchAutoGC() go server.watchAutoGC()
go server.backgroundExpiring() go server.backgroundExpiring()
go server.backgroundSyncAOF()
defer func() { defer func() {
// Stop background routines // Stop background routines
server.followc.add(1) // this will force any follow communication to die server.followc.add(1) // this will force any follow communication to die
@ -487,7 +488,7 @@ func (server *Server) evioServe() error {
if atomic.LoadInt32(&server.aofdirty) != 0 { if atomic.LoadInt32(&server.aofdirty) != 0 {
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
server.flushAOF() server.flushAOF(false)
atomic.StoreInt32(&server.aofdirty, 1) atomic.StoreInt32(&server.aofdirty, 1)
} }
} }
@ -665,7 +666,7 @@ func (server *Server) netServe() error {
// prewrite // prewrite
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
server.flushAOF() server.flushAOF(false)
}() }()
atomic.StoreInt32(&server.aofdirty, 0) atomic.StoreInt32(&server.aofdirty, 0)
} }
@ -783,6 +784,25 @@ func (server *Server) watchLuaStatePool() {
} }
} }
// backgroundSyncAOF ensures that the aof buffer is does not grow too big.
func (server *Server) backgroundSyncAOF() {
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
if server.stopServer.on() {
return
}
func() {
server.mu.Lock()
defer server.mu.Unlock()
if len(server.aofbuf) > 0 {
server.flushAOF(true)
}
server.aofbuf = nil
}()
}
}
func (server *Server) setCol(key string, col *collection.Collection) { func (server *Server) setCol(key string, col *collection.Collection) {
server.cols.Set(key, col) server.cols.Set(key, col)
} }