From 62361fc0fbae54da9db049062dcdd1e5c64337ee Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sat, 20 Oct 2018 01:28:16 +0200 Subject: [PATCH] Use streaming encoding of metrics Signed-off-by: beorn7 --- prometheus/promhttp/http.go | 118 +++++++++++++------------------ prometheus/promhttp/http_test.go | 2 +- prometheus/registry_test.go | 10 +-- 3 files changed, 55 insertions(+), 75 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 73b3ea1..668eb6b 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -32,7 +32,6 @@ package promhttp import ( - "bytes" "compress/gzip" "fmt" "io" @@ -59,12 +58,6 @@ var gzipPool = sync.Pool{ }, } -var bufPool = sync.Pool{ - New: func() interface{} { - return &bytes.Buffer{} - }, -} - // Handler returns an http.Handler for the prometheus.DefaultGatherer, using // default HandlerOpts, i.e. it reports the first error as an HTTP error, it has // no error logging, and it applies compression if requested by the client. @@ -97,13 +90,13 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight) } - h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if inFlightSem != nil { select { case inFlightSem <- struct{}{}: // All good, carry on. defer func() { <-inFlightSem }() default: - http.Error(w, fmt.Sprintf( + http.Error(rsp, fmt.Sprintf( "Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight, ), http.StatusServiceUnavailable) return @@ -119,49 +112,21 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { panic(err) case ContinueOnError: if len(mfs) == 0 { - http.Error(w, "No metrics gathered, last error:\n\n"+err.Error(), http.StatusInternalServerError) + // Still report the error if no metrics have been gathered. + httpError(rsp, err) return } case HTTPErrorOnError: - http.Error(w, "An error has occurred during metrics gathering:\n\n"+err.Error(), http.StatusInternalServerError) + httpError(rsp, err) return } } contentType := expfmt.Negotiate(req.Header) - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() - enc := expfmt.NewEncoder(buf, contentType) - - defer bufPool.Put(buf) - - var lastErr error - for _, mf := range mfs { - if err := enc.Encode(mf); err != nil { - lastErr = err - if opts.ErrorLog != nil { - opts.ErrorLog.Println("error encoding metric family:", err) - } - switch opts.ErrorHandling { - case PanicOnError: - panic(err) - case ContinueOnError: - // Handled later. - case HTTPErrorOnError: - http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError) - return - } - } - } - - if lastErr != nil && buf.Len() == 0 { - http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) - return - } - header := w.Header() + header := rsp.Header() header.Set(contentTypeHeader, string(contentType)) - header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) + w := io.Writer(rsp) if !opts.DisableCompression && gzipAccepted(req.Header) { header.Set(contentEncodingHeader, "gzip") gz := gzipPool.Get().(*gzip.Writer) @@ -170,12 +135,33 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { gz.Reset(w) defer gz.Close() - zipWriter := gzipResponseWriter{gz, w} - writeResult(zipWriter, buf, opts) - return + w = gz + } + + enc := expfmt.NewEncoder(w, contentType) + + var lastErr error + for _, mf := range mfs { + if err := enc.Encode(mf); err != nil { + lastErr = err + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error encoding and sending metric family:", err) + } + switch opts.ErrorHandling { + case PanicOnError: + panic(err) + case ContinueOnError: + // Handled later. + case HTTPErrorOnError: + httpError(rsp, err) + return + } + } + } + + if lastErr != nil { + httpError(rsp, lastErr) } - writeResult(w, buf, opts) - // TODO(beorn7): Consider streaming serving of metrics. }) if opts.Timeout <= 0 { @@ -296,29 +282,8 @@ type HandlerOpts struct { Timeout time.Duration } -// gzipResponseWriter in charge of wrapping io.Writer and http.ReponseWriter -// together, allowing to get a single struct which implements both interface. -type gzipResponseWriter struct { - io.Writer - http.ResponseWriter -} - -func (w gzipResponseWriter) Write(b []byte) (int, error) { - return w.Writer.Write(b) -} - -// writeResult to buf using http.ResponseWriter. -// If ErrorLog is enabled, err is logged in. -func writeResult(w http.ResponseWriter, buf *bytes.Buffer, opts HandlerOpts) { - if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { - opts.ErrorLog.Println("error while sending encoded metrics:", err) - } -} - -// gzipHandler return a http.HandlerFunc in charge of compressing the content -// of the given http.HandlerFunc +// gzipAccepted returns whether the client will accept gzip-encoded content. func gzipAccepted(header http.Header) bool { - a := header.Get(acceptEncodingHeader) parts := strings.Split(a, ",") for _, part := range parts { @@ -329,3 +294,18 @@ func gzipAccepted(header http.Header) bool { } return false } + +// httpError removes any content-encoding header and then calls http.Error with +// the provided error and http.StatusInternalServerErrer. Error contents is +// supposed to be uncompressed plain text. However, same as with a plain +// http.Error, any header settings will be void if the header has already been +// sent. The error message will still be written to the writer, but it will +// probably be of limited use. +func httpError(rsp http.ResponseWriter, err error) { + rsp.Header().Del(contentEncodingHeader) + http.Error( + rsp, + "An error has occurred while serving metrics:\n\n"+err.Error(), + http.StatusInternalServerError, + ) +} diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 24d2f8c..6e23e6c 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -103,7 +103,7 @@ func TestHandlerErrorHandling(t *testing.T) { }) wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` - wantErrorBody := `An error has occurred during metrics gathering: + wantErrorBody := `An error has occurred while serving metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` diff --git a/prometheus/registry_test.go b/prometheus/registry_test.go index d05ac9a..3172960 100644 --- a/prometheus/registry_test.go +++ b/prometheus/registry_test.go @@ -250,7 +250,7 @@ metric: < }, } - expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred during metrics gathering: + expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred while serving metrics: collected metric "name" { label: label: counter: } has a label named "constname" whose value is not utf8: "\xff" `) @@ -299,15 +299,15 @@ complex_bucket 1 }, }, } - bucketCollisionMsg := []byte(`An error has occurred during metrics gathering: + bucketCollisionMsg := []byte(`An error has occurred while serving metrics: collected metric named "complex_bucket" collides with previously collected histogram named "complex" `) - summaryCountCollisionMsg := []byte(`An error has occurred during metrics gathering: + summaryCountCollisionMsg := []byte(`An error has occurred while serving metrics: collected metric named "complex_count" collides with previously collected summary named "complex" `) - histogramCountCollisionMsg := []byte(`An error has occurred during metrics gathering: + histogramCountCollisionMsg := []byte(`An error has occurred while serving metrics: collected metric named "complex_count" collides with previously collected histogram named "complex" `) @@ -333,7 +333,7 @@ collected metric named "complex_count" collides with previously collected histog }, }, } - duplicateLabelMsg := []byte(`An error has occurred during metrics gathering: + duplicateLabelMsg := []byte(`An error has occurred while serving metrics: collected metric "broken_metric" { label: label: counter: } has two or more labels with the same name: foo `)