diff --git a/internal/server/aof.go b/internal/server/aof.go index 42a42946..afcb1f08 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -115,6 +115,8 @@ func commandErrIsFatal(err error) bool { return true } +// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the +// fsync the file. func (server *Server) flushAOF(sync bool) { if len(server.aofbuf) > 0 { _, err := server.aof.Write(server.aofbuf) @@ -126,12 +128,15 @@ func (server *Server) flushAOF(sync bool) { panic(err) } } - server.aofbuf = server.aofbuf[:0] + if cap(server.aofbuf) > 1024*1024*32 { + server.aofbuf = make([]byte, 0, 1024*1024*32) + } else { + server.aofbuf = server.aofbuf[:0] + } } } func (server *Server) writeAOF(args []string, d *commandDetails) error { - if d != nil && !d.updated { // just ignore writes if the command did not update return nil diff --git a/internal/server/dev.go b/internal/server/dev.go index c1a2562d..85aa939a 100644 --- a/internal/server/dev.go +++ b/internal/server/dev.go @@ -74,17 +74,18 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { if err != nil { return NOMessage, errInvalidArgument(snumPoints) } + docmd := func(args []string) error { + c.mu.Lock() + defer c.mu.Unlock() var nmsg Message nmsg = *msg nmsg._command = "" nmsg.Args = args - var d commandDetails - _, d, err = c.command(&nmsg, nil) + _, d, err := c.command(&nmsg, nil) if err != nil { return err } - return c.writeAOF(nmsg.Args, &d) } @@ -128,13 +129,15 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { strconv.FormatFloat(lon, 'f', -1, 64), ) } - if err := docmd(values); err != nil { + err := docmd(values) + if err != nil { log.Fatal(err) return } atomic.AddUint64(&k, 1) if j%1000 == 1000-1 { - log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) + log.Debugf("massinsert: %s %d/%d", + key, atomic.LoadUint64(&k), cols*objs) } } }(key) diff --git a/internal/server/server.go b/internal/server/server.go index bf6c8287..9e163766 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -600,10 +600,7 @@ func (server *Server) backgroundSyncAOF() { func() { server.mu.Lock() defer server.mu.Unlock() - if len(server.aofbuf) > 0 { - server.flushAOF(true) - } - server.aofbuf = nil + server.flushAOF(true) }() } } @@ -831,8 +828,6 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { case "echo": case "massinsert": // dev operation - server.mu.Lock() - defer server.mu.Unlock() case "sleep": // dev operation server.mu.RLock()