From b6884fce63b6c18a5716bf31c6fc288de525cfc2 Mon Sep 17 00:00:00 2001 From: tidwall Date: Tue, 3 Sep 2019 16:35:42 -0700 Subject: [PATCH 1/2] diagnostics --- internal/server/aof.go | 35 ++++++++++++++++++++++++++----- internal/server/dev.go | 43 +++++++++++++++++++++++++++++++++------ internal/server/server.go | 7 +------ internal/server/stats.go | 34 +++++++++++++++++++++++++------ 4 files changed, 96 insertions(+), 23 deletions(-) diff --git a/internal/server/aof.go b/internal/server/aof.go index 42a42946..5b7bf7bf 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -115,6 +115,8 @@ func commandErrIsFatal(err error) bool { return true } +// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the +// fsync the file. func (server *Server) flushAOF(sync bool) { if len(server.aofbuf) > 0 { _, err := server.aof.Write(server.aofbuf) @@ -126,15 +128,30 @@ func (server *Server) flushAOF(sync bool) { panic(err) } } - server.aofbuf = server.aofbuf[:0] + if cap(server.aofbuf) > 1024*1024*32 { + server.aofbuf = make([]byte, 0, 1024*1024*32) + } else { + server.aofbuf = server.aofbuf[:0] + } } } +type writeAOFDetails struct { + appendBufferElapsed time.Duration + notifyLiveElapsed time.Duration + geofencesElapsed time.Duration +} + func (server *Server) writeAOF(args []string, d *commandDetails) error { + _, err := server.writeAOFDetails(args, d) + return err +} + +func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details writeAOFDetails, err error) { if d != nil && !d.updated { // just ignore writes if the command did not update - return nil + return details, nil } if server.shrinking { @@ -144,6 +161,7 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { } if server.aof != nil { + start := time.Now() atomic.StoreInt32(&server.aofdirty, 1) // prewrite optimization flag n := len(server.aofbuf) server.aofbuf = redcon.AppendArray(server.aofbuf, len(args)) @@ -151,14 +169,21 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { server.aofbuf = redcon.AppendBulkString(server.aofbuf, arg) } server.aofsz += len(server.aofbuf) - n + details.appendBufferElapsed = time.Since(start) } // notify aof live connections that we have new data + start := time.Now() server.fcond.L.Lock() server.fcond.Broadcast() server.fcond.L.Unlock() + details.notifyLiveElapsed = time.Since(start) // process geofences + start = time.Now() + defer func() { + details.geofencesElapsed = time.Since(start) + }() if d != nil { // webhook geofences if server.config.followHost() == "" { @@ -167,13 +192,13 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { // queue children for _, d := range d.children { if err := server.queueHooks(d); err != nil { - return err + return details, err } } } else { // queue parent if err := server.queueHooks(d); err != nil { - return err + return details, err } } } @@ -194,7 +219,7 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { } server.lcond.L.Unlock() } - return nil + return details, nil } func (server *Server) getQueueCandidates(d *commandDetails) []*Hook { diff --git a/internal/server/dev.go b/internal/server/dev.go index c1a2562d..06a57317 100644 --- a/internal/server/dev.go +++ b/internal/server/dev.go @@ -74,18 +74,31 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { if err != nil { return NOMessage, errInvalidArgument(snumPoints) } - docmd := func(args []string) error { + + type docmdDetails struct { + writeAOFDetails writeAOFDetails + cmdElapsed time.Duration + aofElapsed time.Duration + } + + docmd := func(args []string) (docmdDetails docmdDetails, err error) { + c.mu.Lock() + defer c.mu.Unlock() var nmsg Message nmsg = *msg nmsg._command = "" nmsg.Args = args var d commandDetails + start := time.Now() _, d, err = c.command(&nmsg, nil) + docmdDetails.cmdElapsed = time.Since(start) if err != nil { - return err + return docmdDetails, err } - - return c.writeAOF(nmsg.Args, &d) + start = time.Now() + docmdDetails.writeAOFDetails, err = c.writeAOFDetails(nmsg.Args, &d) + docmdDetails.aofElapsed = time.Since(start) + return docmdDetails, err } rand.Seed(time.Now().UnixNano()) @@ -128,13 +141,31 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { strconv.FormatFloat(lon, 'f', -1, 64), ) } - if err := docmd(values); err != nil { + start := time.Now() + docmdDetails, err := docmd(values) + if err != nil { log.Fatal(err) return } + elapsed := time.Since(start) + if elapsed > time.Millisecond*5 { + log.Infof("%d"+ + ", %6.1f cmd, %6.1f aof"+ + ", %6.1f buf, %6.1f not, %6.1f fence"+ + ", %6.1f tot", + len(values), + docmdDetails.cmdElapsed.Seconds()*1000, + docmdDetails.aofElapsed.Seconds()*1000, + docmdDetails.writeAOFDetails.appendBufferElapsed.Seconds()*1000, + docmdDetails.writeAOFDetails.notifyLiveElapsed.Seconds()*1000, + docmdDetails.writeAOFDetails.geofencesElapsed.Seconds()*1000, + elapsed.Seconds()*1000, + ) + } atomic.AddUint64(&k, 1) if j%1000 == 1000-1 { - log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) + log.Debugf("mass: %s %d/%d", + key, atomic.LoadUint64(&k), cols*objs) } } }(key) diff --git a/internal/server/server.go b/internal/server/server.go index bf6c8287..9e163766 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -600,10 +600,7 @@ func (server *Server) backgroundSyncAOF() { func() { server.mu.Lock() defer server.mu.Unlock() - if len(server.aofbuf) > 0 { - server.flushAOF(true) - } - server.aofbuf = nil + server.flushAOF(true) }() } } @@ -831,8 +828,6 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { case "echo": case "massinsert": // dev operation - server.mu.Lock() - defer server.mu.Unlock() case "sleep": // dev operation server.mu.RLock() diff --git a/internal/server/stats.go b/internal/server/stats.go index 6b66b1f2..ac4f582b 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/tidwall/resp" @@ -16,6 +17,30 @@ import ( "github.com/tidwall/tile38/internal/collection" ) +var memStats runtime.MemStats +var memStatsMu sync.Mutex +var memStatsBG bool + +// ReadMemStats returns the latest memstats. It provides an instant response. +func readMemStats() runtime.MemStats { + memStatsMu.Lock() + if !memStatsBG { + runtime.ReadMemStats(&memStats) + go func() { + for { + memStatsMu.Lock() + runtime.ReadMemStats(&memStats) + memStatsMu.Unlock() + time.Sleep(time.Second / 5) + } + }() + memStatsBG = true + } + ms := memStats + memStatsMu.Unlock() + return ms +} + func (c *Server) cmdStats(msg *Message) (res resp.Value, err error) { start := time.Now() vs := msg.Args[1:] @@ -133,8 +158,7 @@ func (c *Server) basicStats(m map[string]interface{}) { m["num_points"] = points m["num_objects"] = objects m["num_strings"] = strings - var mem runtime.MemStats - runtime.ReadMemStats(&mem) + mem := readMemStats() avgsz := 0 if points != 0 { avgsz = int(mem.HeapAlloc) / points @@ -154,9 +178,8 @@ func (c *Server) basicStats(m map[string]interface{}) { // extStats populates the passed map with extended system/go/tile38 statistics func (c *Server) extStats(m map[string]interface{}) { - var mem runtime.MemStats n, _ := runtime.ThreadCreateProfile(nil) - runtime.ReadMemStats(&mem) + mem := readMemStats() // Go/Memory Stats @@ -326,8 +349,7 @@ func (c *Server) writeInfoClients(w *bytes.Buffer) { c.connsmu.RUnlock() } func (c *Server) writeInfoMemory(w *bytes.Buffer) { - var mem runtime.MemStats - runtime.ReadMemStats(&mem) + mem := readMemStats() fmt.Fprintf(w, "used_memory:%d\r\n", mem.Alloc) // total number of bytes allocated by Redis using its allocator (either standard libc, jemalloc, or an alternative allocator such as tcmalloc } func boolInt(t bool) int { From e167e88e8fdb6e19717d8dba6e215bafca963723 Mon Sep 17 00:00:00 2001 From: tidwall Date: Tue, 3 Sep 2019 16:39:51 -0700 Subject: [PATCH 2/2] removed diag --- internal/server/aof.go | 28 ++++------------------------ internal/server/dev.go | 38 +++++--------------------------------- 2 files changed, 9 insertions(+), 57 deletions(-) diff --git a/internal/server/aof.go b/internal/server/aof.go index 5b7bf7bf..afcb1f08 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -136,22 +136,10 @@ func (server *Server) flushAOF(sync bool) { } } -type writeAOFDetails struct { - appendBufferElapsed time.Duration - notifyLiveElapsed time.Duration - geofencesElapsed time.Duration -} - func (server *Server) writeAOF(args []string, d *commandDetails) error { - _, err := server.writeAOFDetails(args, d) - return err -} - -func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details writeAOFDetails, err error) { - if d != nil && !d.updated { // just ignore writes if the command did not update - return details, nil + return nil } if server.shrinking { @@ -161,7 +149,6 @@ func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details } if server.aof != nil { - start := time.Now() atomic.StoreInt32(&server.aofdirty, 1) // prewrite optimization flag n := len(server.aofbuf) server.aofbuf = redcon.AppendArray(server.aofbuf, len(args)) @@ -169,21 +156,14 @@ func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details server.aofbuf = redcon.AppendBulkString(server.aofbuf, arg) } server.aofsz += len(server.aofbuf) - n - details.appendBufferElapsed = time.Since(start) } // notify aof live connections that we have new data - start := time.Now() server.fcond.L.Lock() server.fcond.Broadcast() server.fcond.L.Unlock() - details.notifyLiveElapsed = time.Since(start) // process geofences - start = time.Now() - defer func() { - details.geofencesElapsed = time.Since(start) - }() if d != nil { // webhook geofences if server.config.followHost() == "" { @@ -192,13 +172,13 @@ func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details // queue children for _, d := range d.children { if err := server.queueHooks(d); err != nil { - return details, err + return err } } } else { // queue parent if err := server.queueHooks(d); err != nil { - return details, err + return err } } } @@ -219,7 +199,7 @@ func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details } server.lcond.L.Unlock() } - return details, nil + return nil } func (server *Server) getQueueCandidates(d *commandDetails) []*Hook { diff --git a/internal/server/dev.go b/internal/server/dev.go index 06a57317..6c1c5982 100644 --- a/internal/server/dev.go +++ b/internal/server/dev.go @@ -75,30 +75,18 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { return NOMessage, errInvalidArgument(snumPoints) } - type docmdDetails struct { - writeAOFDetails writeAOFDetails - cmdElapsed time.Duration - aofElapsed time.Duration - } - - docmd := func(args []string) (docmdDetails docmdDetails, err error) { + docmd := func(args []string) error { c.mu.Lock() defer c.mu.Unlock() var nmsg Message nmsg = *msg nmsg._command = "" nmsg.Args = args - var d commandDetails - start := time.Now() - _, d, err = c.command(&nmsg, nil) - docmdDetails.cmdElapsed = time.Since(start) + _, _, err := c.command(&nmsg, nil) if err != nil { - return docmdDetails, err + return err } - start = time.Now() - docmdDetails.writeAOFDetails, err = c.writeAOFDetails(nmsg.Args, &d) - docmdDetails.aofElapsed = time.Since(start) - return docmdDetails, err + return err } rand.Seed(time.Now().UnixNano()) @@ -141,27 +129,11 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { strconv.FormatFloat(lon, 'f', -1, 64), ) } - start := time.Now() - docmdDetails, err := docmd(values) + err := docmd(values) if err != nil { log.Fatal(err) return } - elapsed := time.Since(start) - if elapsed > time.Millisecond*5 { - log.Infof("%d"+ - ", %6.1f cmd, %6.1f aof"+ - ", %6.1f buf, %6.1f not, %6.1f fence"+ - ", %6.1f tot", - len(values), - docmdDetails.cmdElapsed.Seconds()*1000, - docmdDetails.aofElapsed.Seconds()*1000, - docmdDetails.writeAOFDetails.appendBufferElapsed.Seconds()*1000, - docmdDetails.writeAOFDetails.notifyLiveElapsed.Seconds()*1000, - docmdDetails.writeAOFDetails.geofencesElapsed.Seconds()*1000, - elapsed.Seconds()*1000, - ) - } atomic.AddUint64(&k, 1) if j%1000 == 1000-1 { log.Debugf("mass: %s %d/%d",