Minimize AOF buffer releases

This commit is contained in:
tidwall 2019-09-03 17:01:26 -07:00
parent 4bd6b4b838
commit 2571ce5106
3 changed files with 16 additions and 13 deletions

View File

@ -115,6 +115,8 @@ func commandErrIsFatal(err error) bool {
return true return true
} }
// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the
// fsync the file.
func (server *Server) flushAOF(sync bool) { func (server *Server) flushAOF(sync bool) {
if len(server.aofbuf) > 0 { if len(server.aofbuf) > 0 {
_, err := server.aof.Write(server.aofbuf) _, err := server.aof.Write(server.aofbuf)
@ -126,12 +128,15 @@ func (server *Server) flushAOF(sync bool) {
panic(err) panic(err)
} }
} }
server.aofbuf = server.aofbuf[:0] if cap(server.aofbuf) > 1024*1024*32 {
server.aofbuf = make([]byte, 0, 1024*1024*32)
} else {
server.aofbuf = server.aofbuf[:0]
}
} }
} }
func (server *Server) writeAOF(args []string, d *commandDetails) error { func (server *Server) writeAOF(args []string, d *commandDetails) error {
if d != nil && !d.updated { if d != nil && !d.updated {
// just ignore writes if the command did not update // just ignore writes if the command did not update
return nil return nil

View File

@ -74,17 +74,18 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
if err != nil { if err != nil {
return NOMessage, errInvalidArgument(snumPoints) return NOMessage, errInvalidArgument(snumPoints)
} }
docmd := func(args []string) error { docmd := func(args []string) error {
c.mu.Lock()
defer c.mu.Unlock()
var nmsg Message var nmsg Message
nmsg = *msg nmsg = *msg
nmsg._command = "" nmsg._command = ""
nmsg.Args = args nmsg.Args = args
var d commandDetails _, d, err := c.command(&nmsg, nil)
_, d, err = c.command(&nmsg, nil)
if err != nil { if err != nil {
return err return err
} }
return c.writeAOF(nmsg.Args, &d) return c.writeAOF(nmsg.Args, &d)
} }
@ -128,13 +129,15 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
strconv.FormatFloat(lon, 'f', -1, 64), strconv.FormatFloat(lon, 'f', -1, 64),
) )
} }
if err := docmd(values); err != nil { err := docmd(values)
if err != nil {
log.Fatal(err) log.Fatal(err)
return return
} }
atomic.AddUint64(&k, 1) atomic.AddUint64(&k, 1)
if j%1000 == 1000-1 { if j%1000 == 1000-1 {
log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) log.Debugf("massinsert: %s %d/%d",
key, atomic.LoadUint64(&k), cols*objs)
} }
} }
}(key) }(key)

View File

@ -600,10 +600,7 @@ func (server *Server) backgroundSyncAOF() {
func() { func() {
server.mu.Lock() server.mu.Lock()
defer server.mu.Unlock() defer server.mu.Unlock()
if len(server.aofbuf) > 0 { server.flushAOF(true)
server.flushAOF(true)
}
server.aofbuf = nil
}() }()
} }
} }
@ -831,8 +828,6 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error {
case "echo": case "echo":
case "massinsert": case "massinsert":
// dev operation // dev operation
server.mu.Lock()
defer server.mu.Unlock()
case "sleep": case "sleep":
// dev operation // dev operation
server.mu.RLock() server.mu.RLock()