Background fsync and aofbuf flushing

This commit is contained in:
tidwall 2019-02-15 11:11:40 -07:00
parent f736a971f0
commit bdfa11e8dd
5 changed files with 47 additions and 30 deletions

View File

@ -72,11 +72,7 @@ func New(id string, obj geojson.Object) *Item {
item = (*Item)(unsafe.Pointer(oitem)) item = (*Item)(unsafe.Pointer(oitem))
} }
item.idLen = uint32(len(id)) item.idLen = uint32(len(id))
if len(id) > 0 { item.data = unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&id)).Data)
data := make([]byte, len(id))
copy(data, id)
item.data = unsafe.Pointer(&data[0])
}
return item return item
} }

View File

@ -115,12 +115,17 @@ func commandErrIsFatal(err error) bool {
return true return true
} }
func (server *Server) flushAOF() { func (server *Server) flushAOF(sync bool) {
if len(server.aofbuf) > 0 { if len(server.aofbuf) > 0 {
_, err := server.aof.Write(server.aofbuf) _, err := server.aof.Write(server.aofbuf)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if sync {
if err := server.aof.Sync(); err != nil {
panic(err)
}
}
server.aofbuf = server.aofbuf[:0] server.aofbuf = server.aofbuf[:0]
} }
} }

View File

@ -240,7 +240,7 @@ func (server *Server) aofshrink() {
defer server.mu.Unlock() defer server.mu.Unlock()
// flush the aof buffer // flush the aof buffer
server.flushAOF() server.flushAOF(false)
aofbuf = aofbuf[:0] aofbuf = aofbuf[:0]
for _, values := range server.shrinklog { for _, values := range server.shrinklog {

View File

@ -165,7 +165,7 @@ func (c *Server) followHandleCommand(args []string, followc int, w io.Writer) (i
return c.aofsz, err return c.aofsz, err
} }
if len(c.aofbuf) > 10240 { if len(c.aofbuf) > 10240 {
c.flushAOF() c.flushAOF(false)
} }
return c.aofsz, nil return c.aofsz, nil
} }
@ -301,7 +301,7 @@ func (c *Server) followStep(host string, port int, followc int) error {
if aofsz >= int(aofSize) { if aofsz >= int(aofSize) {
caughtUp = true caughtUp = true
c.mu.Lock() c.mu.Lock()
c.flushAOF() c.flushAOF(false)
c.fcup = true c.fcup = true
c.fcuponce = true c.fcuponce = true
c.mu.Unlock() c.mu.Unlock()

View File

@ -252,7 +252,7 @@ func Serve(host string, port int, dir string, http bool) error {
return err return err
} }
defer func() { defer func() {
server.flushAOF() server.flushAOF(false)
server.aof.Sync() server.aof.Sync()
}() }()
} }
@ -268,6 +268,7 @@ func Serve(host string, port int, dir string, http bool) error {
go server.watchLuaStatePool() go server.watchLuaStatePool()
go server.watchAutoGC() go server.watchAutoGC()
go server.backgroundExpiring() go server.backgroundExpiring()
go server.backgroundSyncAOF()
defer func() { defer func() {
// Stop background routines // Stop background routines
server.followc.add(1) // this will force any follow communication to die server.followc.add(1) // this will force any follow communication to die
@ -489,7 +490,7 @@ func (server *Server) evioServe() error {
if atomic.LoadInt32(&server.aofdirty) != 0 { if atomic.LoadInt32(&server.aofdirty) != 0 {
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
server.flushAOF() server.flushAOF(false)
atomic.StoreInt32(&server.aofdirty, 1) atomic.StoreInt32(&server.aofdirty, 1)
} }
} }
@ -667,7 +668,7 @@ func (server *Server) netServe() error {
// prewrite // prewrite
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
server.flushAOF() server.flushAOF(false)
}() }()
atomic.StoreInt32(&server.aofdirty, 0) atomic.StoreInt32(&server.aofdirty, 0)
} }
@ -755,23 +756,21 @@ func (server *Server) watchOutOfMemory() {
defer t.Stop() defer t.Stop()
var mem runtime.MemStats var mem runtime.MemStats
for range t.C { for range t.C {
func() { if server.stopServer.on() {
if server.stopServer.on() { return
return }
} oom := server.outOfMemory.on()
oom := server.outOfMemory.on() if server.config.maxMemory() == 0 {
if server.config.maxMemory() == 0 {
if oom {
server.outOfMemory.set(false)
}
return
}
if oom { if oom {
runtime.GC() server.outOfMemory.set(false)
} }
runtime.ReadMemStats(&mem) return
server.outOfMemory.set(int(mem.HeapAlloc) > server.config.maxMemory()) }
}() if oom {
runtime.GC()
}
runtime.ReadMemStats(&mem)
server.outOfMemory.set(int(mem.HeapAlloc) > server.config.maxMemory())
} }
} }
@ -779,9 +778,26 @@ func (server *Server) watchLuaStatePool() {
t := time.NewTicker(time.Second * 10) t := time.NewTicker(time.Second * 10)
defer t.Stop() defer t.Stop()
for range t.C { for range t.C {
func() { if server.stopServer.on() {
server.luapool.Prune() return
}() }
server.luapool.Prune()
}
}
func (server *Server) backgroundSyncAOF() {
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
if server.stopServer.on() {
return
}
server.mu.Lock()
if len(server.aofbuf) > 0 {
server.flushAOF(true)
}
server.aofbuf = nil
server.mu.Unlock()
} }
} }