From fdf4cbc87b0bcedc0889a3295b8fb48c54d3fe93 Mon Sep 17 00:00:00 2001 From: glefloch Date: Wed, 17 Oct 2018 13:17:33 +0200 Subject: [PATCH 1/4] Use sync.Pool for gzipWriter Signed-off-by: glefloch --- prometheus/promhttp/http.go | 92 +++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 0135737..b5dc912 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -53,19 +53,10 @@ const ( acceptEncodingHeader = "Accept-Encoding" ) -var bufPool sync.Pool - -func getBuf() *bytes.Buffer { - buf := bufPool.Get() - if buf == nil { - return &bytes.Buffer{} - } - return buf.(*bytes.Buffer) -} - -func giveBuf(buf *bytes.Buffer) { - buf.Reset() - bufPool.Put(buf) +var gzipPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, } // Handler returns an http.Handler for the prometheus.DefaultGatherer, using @@ -133,10 +124,9 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { } contentType := expfmt.Negotiate(req.Header) - buf := getBuf() - defer giveBuf(buf) - writer, encoding := decorateWriter(req, buf, opts.DisableCompression) - enc := expfmt.NewEncoder(writer, contentType) + buf := &bytes.Buffer{} + enc := expfmt.NewEncoder(buf, contentType) + var lastErr error for _, mf := range mfs { if err := enc.Encode(mf); err != nil { @@ -155,29 +145,50 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { } } } - if closer, ok := writer.(io.Closer); ok { - closer.Close() - } + if lastErr != nil && buf.Len() == 0 { http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) return } - header := w.Header() - header.Set(contentTypeHeader, string(contentType)) - header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) - if encoding != "" { - header.Set(contentEncodingHeader, encoding) - } + + w.Header().Set(contentTypeHeader, string(contentType)) + if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { opts.ErrorLog.Println("error while sending encoded metrics:", err) } // TODO(beorn7): Consider streaming serving of metrics. }) + gzipHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if opts.DisableCompression { + h.ServeHTTP(w, req) + return + } + header := req.Header.Get(acceptEncodingHeader) + parts := strings.Split(header, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "gzip" || strings.HasPrefix(part, "gzip;") { + + w.Header().Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + h.ServeHTTP(gzipResponseWriter{gz, w}, req) + return + } + } + h.ServeHTTP(w, req) + return + }) + if opts.Timeout <= 0 { - return h + return gzipHandler } - return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf( + return http.TimeoutHandler(gzipHandler, opts.Timeout, fmt.Sprintf( "Exceeded configured timeout of %v.\n", opts.Timeout, )) @@ -292,20 +303,13 @@ type HandlerOpts struct { Timeout time.Duration } -// decorateWriter wraps a writer to handle gzip compression if requested. It -// returns the decorated writer and the appropriate "Content-Encoding" header -// (which is empty if no compression is enabled). -func decorateWriter(request *http.Request, writer io.Writer, compressionDisabled bool) (io.Writer, string) { - if compressionDisabled { - return writer, "" - } - header := request.Header.Get(acceptEncodingHeader) - parts := strings.Split(header, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - return gzip.NewWriter(writer), "gzip" - } - } - return writer, "" +// gzipResponseWriter in charge of wrapping io.Writer and http.ReponseWriter +// together, allowing to get a single struct which implements both interface. +type gzipResponseWriter struct { + io.Writer + http.ResponseWriter +} + +func (w gzipResponseWriter) Write(b []byte) (int, error) { + return w.Writer.Write(b) } From c2c6fd2ab46aed7688371380acb3415157492ddc Mon Sep 17 00:00:00 2001 From: glefloch Date: Fri, 19 Oct 2018 13:51:45 +0200 Subject: [PATCH 2/4] Fix PR comments Signed-off-by: glefloch --- prometheus/promhttp/http.go | 82 ++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index b5dc912..73b3ea1 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -59,6 +59,12 @@ var gzipPool = sync.Pool{ }, } +var bufPool = sync.Pool{ + New: func() interface{} { + return &bytes.Buffer{} + }, +} + // Handler returns an http.Handler for the prometheus.DefaultGatherer, using // default HandlerOpts, i.e. it reports the first error as an HTTP error, it has // no error logging, and it applies compression if requested by the client. @@ -103,7 +109,6 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - mfs, err := reg.Gather() if err != nil { if opts.ErrorLog != nil { @@ -124,9 +129,12 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { } contentType := expfmt.Negotiate(req.Header) - buf := &bytes.Buffer{} + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() enc := expfmt.NewEncoder(buf, contentType) + defer bufPool.Put(buf) + var lastErr error for _, mf := range mfs { if err := enc.Encode(mf); err != nil { @@ -150,45 +158,30 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) return } + header := w.Header() + header.Set(contentTypeHeader, string(contentType)) + header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) - w.Header().Set(contentTypeHeader, string(contentType)) + if !opts.DisableCompression && gzipAccepted(req.Header) { + header.Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) - if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { - opts.ErrorLog.Println("error while sending encoded metrics:", err) + gz.Reset(w) + defer gz.Close() + + zipWriter := gzipResponseWriter{gz, w} + writeResult(zipWriter, buf, opts) + return } + writeResult(w, buf, opts) // TODO(beorn7): Consider streaming serving of metrics. }) - gzipHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if opts.DisableCompression { - h.ServeHTTP(w, req) - return - } - header := req.Header.Get(acceptEncodingHeader) - parts := strings.Split(header, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - - w.Header().Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) - - gz.Reset(w) - defer gz.Close() - - h.ServeHTTP(gzipResponseWriter{gz, w}, req) - return - } - } - h.ServeHTTP(w, req) - return - }) - if opts.Timeout <= 0 { - return gzipHandler + return h } - return http.TimeoutHandler(gzipHandler, opts.Timeout, fmt.Sprintf( + return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf( "Exceeded configured timeout of %v.\n", opts.Timeout, )) @@ -313,3 +306,26 @@ type gzipResponseWriter struct { func (w gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) } + +// writeResult to buf using http.ResponseWriter. +// If ErrorLog is enabled, err is logged in. +func writeResult(w http.ResponseWriter, buf *bytes.Buffer, opts HandlerOpts) { + if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { + opts.ErrorLog.Println("error while sending encoded metrics:", err) + } +} + +// gzipHandler return a http.HandlerFunc in charge of compressing the content +// of the given http.HandlerFunc +func gzipAccepted(header http.Header) bool { + + a := header.Get(acceptEncodingHeader) + parts := strings.Split(a, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "gzip" || strings.HasPrefix(part, "gzip;") { + return true + } + } + return false +} From 62361fc0fbae54da9db049062dcdd1e5c64337ee Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sat, 20 Oct 2018 01:28:16 +0200 Subject: [PATCH 3/4] Use streaming encoding of metrics Signed-off-by: beorn7 --- prometheus/promhttp/http.go | 118 +++++++++++++------------------ prometheus/promhttp/http_test.go | 2 +- prometheus/registry_test.go | 10 +-- 3 files changed, 55 insertions(+), 75 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 73b3ea1..668eb6b 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -32,7 +32,6 @@ package promhttp import ( - "bytes" "compress/gzip" "fmt" "io" @@ -59,12 +58,6 @@ var gzipPool = sync.Pool{ }, } -var bufPool = sync.Pool{ - New: func() interface{} { - return &bytes.Buffer{} - }, -} - // Handler returns an http.Handler for the prometheus.DefaultGatherer, using // default HandlerOpts, i.e. it reports the first error as an HTTP error, it has // no error logging, and it applies compression if requested by the client. @@ -97,13 +90,13 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight) } - h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if inFlightSem != nil { select { case inFlightSem <- struct{}{}: // All good, carry on. defer func() { <-inFlightSem }() default: - http.Error(w, fmt.Sprintf( + http.Error(rsp, fmt.Sprintf( "Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight, ), http.StatusServiceUnavailable) return @@ -119,49 +112,21 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { panic(err) case ContinueOnError: if len(mfs) == 0 { - http.Error(w, "No metrics gathered, last error:\n\n"+err.Error(), http.StatusInternalServerError) + // Still report the error if no metrics have been gathered. + httpError(rsp, err) return } case HTTPErrorOnError: - http.Error(w, "An error has occurred during metrics gathering:\n\n"+err.Error(), http.StatusInternalServerError) + httpError(rsp, err) return } } contentType := expfmt.Negotiate(req.Header) - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() - enc := expfmt.NewEncoder(buf, contentType) - - defer bufPool.Put(buf) - - var lastErr error - for _, mf := range mfs { - if err := enc.Encode(mf); err != nil { - lastErr = err - if opts.ErrorLog != nil { - opts.ErrorLog.Println("error encoding metric family:", err) - } - switch opts.ErrorHandling { - case PanicOnError: - panic(err) - case ContinueOnError: - // Handled later. - case HTTPErrorOnError: - http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError) - return - } - } - } - - if lastErr != nil && buf.Len() == 0 { - http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) - return - } - header := w.Header() + header := rsp.Header() header.Set(contentTypeHeader, string(contentType)) - header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) + w := io.Writer(rsp) if !opts.DisableCompression && gzipAccepted(req.Header) { header.Set(contentEncodingHeader, "gzip") gz := gzipPool.Get().(*gzip.Writer) @@ -170,12 +135,33 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { gz.Reset(w) defer gz.Close() - zipWriter := gzipResponseWriter{gz, w} - writeResult(zipWriter, buf, opts) - return + w = gz + } + + enc := expfmt.NewEncoder(w, contentType) + + var lastErr error + for _, mf := range mfs { + if err := enc.Encode(mf); err != nil { + lastErr = err + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error encoding and sending metric family:", err) + } + switch opts.ErrorHandling { + case PanicOnError: + panic(err) + case ContinueOnError: + // Handled later. + case HTTPErrorOnError: + httpError(rsp, err) + return + } + } + } + + if lastErr != nil { + httpError(rsp, lastErr) } - writeResult(w, buf, opts) - // TODO(beorn7): Consider streaming serving of metrics. }) if opts.Timeout <= 0 { @@ -296,29 +282,8 @@ type HandlerOpts struct { Timeout time.Duration } -// gzipResponseWriter in charge of wrapping io.Writer and http.ReponseWriter -// together, allowing to get a single struct which implements both interface. -type gzipResponseWriter struct { - io.Writer - http.ResponseWriter -} - -func (w gzipResponseWriter) Write(b []byte) (int, error) { - return w.Writer.Write(b) -} - -// writeResult to buf using http.ResponseWriter. -// If ErrorLog is enabled, err is logged in. -func writeResult(w http.ResponseWriter, buf *bytes.Buffer, opts HandlerOpts) { - if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { - opts.ErrorLog.Println("error while sending encoded metrics:", err) - } -} - -// gzipHandler return a http.HandlerFunc in charge of compressing the content -// of the given http.HandlerFunc +// gzipAccepted returns whether the client will accept gzip-encoded content. func gzipAccepted(header http.Header) bool { - a := header.Get(acceptEncodingHeader) parts := strings.Split(a, ",") for _, part := range parts { @@ -329,3 +294,18 @@ func gzipAccepted(header http.Header) bool { } return false } + +// httpError removes any content-encoding header and then calls http.Error with +// the provided error and http.StatusInternalServerErrer. Error contents is +// supposed to be uncompressed plain text. However, same as with a plain +// http.Error, any header settings will be void if the header has already been +// sent. The error message will still be written to the writer, but it will +// probably be of limited use. +func httpError(rsp http.ResponseWriter, err error) { + rsp.Header().Del(contentEncodingHeader) + http.Error( + rsp, + "An error has occurred while serving metrics:\n\n"+err.Error(), + http.StatusInternalServerError, + ) +} diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 24d2f8c..6e23e6c 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -103,7 +103,7 @@ func TestHandlerErrorHandling(t *testing.T) { }) wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` - wantErrorBody := `An error has occurred during metrics gathering: + wantErrorBody := `An error has occurred while serving metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` diff --git a/prometheus/registry_test.go b/prometheus/registry_test.go index d05ac9a..3172960 100644 --- a/prometheus/registry_test.go +++ b/prometheus/registry_test.go @@ -250,7 +250,7 @@ metric: < }, } - expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred during metrics gathering: + expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred while serving metrics: collected metric "name" { label: label: counter: } has a label named "constname" whose value is not utf8: "\xff" `) @@ -299,15 +299,15 @@ complex_bucket 1 }, }, } - bucketCollisionMsg := []byte(`An error has occurred during metrics gathering: + bucketCollisionMsg := []byte(`An error has occurred while serving metrics: collected metric named "complex_bucket" collides with previously collected histogram named "complex" `) - summaryCountCollisionMsg := []byte(`An error has occurred during metrics gathering: + summaryCountCollisionMsg := []byte(`An error has occurred while serving metrics: collected metric named "complex_count" collides with previously collected summary named "complex" `) - histogramCountCollisionMsg := []byte(`An error has occurred during metrics gathering: + histogramCountCollisionMsg := []byte(`An error has occurred while serving metrics: collected metric named "complex_count" collides with previously collected histogram named "complex" `) @@ -333,7 +333,7 @@ collected metric named "complex_count" collides with previously collected histog }, }, } - duplicateLabelMsg := []byte(`An error has occurred during metrics gathering: + duplicateLabelMsg := []byte(`An error has occurred while serving metrics: collected metric "broken_metric" { label: label: counter: } has two or more labels with the same name: foo `) From fb0f7fe8c286edfa0ddd252024f8994cb00208b3 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 22 Oct 2018 12:36:33 +0200 Subject: [PATCH 4/4] Pull up HTTP changes into deprecated functions So that also users of those can benefit. Obviously, we will end updating deprecated functions one day (at latest once v0.10 is out). Signed-off-by: beorn7 --- prometheus/http.go | 105 ++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/prometheus/http.go b/prometheus/http.go index 4b8e602..9f0875b 100644 --- a/prometheus/http.go +++ b/prometheus/http.go @@ -15,9 +15,7 @@ package prometheus import ( "bufio" - "bytes" "compress/gzip" - "fmt" "io" "net" "net/http" @@ -41,19 +39,10 @@ const ( acceptEncodingHeader = "Accept-Encoding" ) -var bufPool sync.Pool - -func getBuf() *bytes.Buffer { - buf := bufPool.Get() - if buf == nil { - return &bytes.Buffer{} - } - return buf.(*bytes.Buffer) -} - -func giveBuf(buf *bytes.Buffer) { - buf.Reset() - bufPool.Put(buf) +var gzipPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, } // Handler returns an HTTP handler for the DefaultGatherer. It is @@ -71,58 +60,40 @@ func Handler() http.Handler { // Deprecated: Use promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{}) // instead. See there for further documentation. func UninstrumentedHandler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + return http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { mfs, err := DefaultGatherer.Gather() if err != nil { - http.Error(w, "An error has occurred during metrics collection:\n\n"+err.Error(), http.StatusInternalServerError) + httpError(rsp, err) return } contentType := expfmt.Negotiate(req.Header) - buf := getBuf() - defer giveBuf(buf) - writer, encoding := decorateWriter(req, buf) - enc := expfmt.NewEncoder(writer, contentType) - var lastErr error + header := rsp.Header() + header.Set(contentTypeHeader, string(contentType)) + + w := io.Writer(rsp) + if gzipAccepted(req.Header) { + header.Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + w = gz + } + + enc := expfmt.NewEncoder(w, contentType) + for _, mf := range mfs { if err := enc.Encode(mf); err != nil { - lastErr = err - http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError) + httpError(rsp, err) return } } - if closer, ok := writer.(io.Closer); ok { - closer.Close() - } - if lastErr != nil && buf.Len() == 0 { - http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) - return - } - header := w.Header() - header.Set(contentTypeHeader, string(contentType)) - header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) - if encoding != "" { - header.Set(contentEncodingHeader, encoding) - } - w.Write(buf.Bytes()) }) } -// decorateWriter wraps a writer to handle gzip compression if requested. It -// returns the decorated writer and the appropriate "Content-Encoding" header -// (which is empty if no compression is enabled). -func decorateWriter(request *http.Request, writer io.Writer) (io.Writer, string) { - header := request.Header.Get(acceptEncodingHeader) - parts := strings.Split(header, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - return gzip.NewWriter(writer), "gzip" - } - } - return writer, "" -} - var instLabels = []string{"method", "code"} type nower interface { @@ -503,3 +474,31 @@ func sanitizeCode(s int) string { return strconv.Itoa(s) } } + +// gzipAccepted returns whether the client will accept gzip-encoded content. +func gzipAccepted(header http.Header) bool { + a := header.Get(acceptEncodingHeader) + parts := strings.Split(a, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "gzip" || strings.HasPrefix(part, "gzip;") { + return true + } + } + return false +} + +// httpError removes any content-encoding header and then calls http.Error with +// the provided error and http.StatusInternalServerErrer. Error contents is +// supposed to be uncompressed plain text. However, same as with a plain +// http.Error, any header settings will be void if the header has already been +// sent. The error message will still be written to the writer, but it will +// probably be of limited use. +func httpError(rsp http.ResponseWriter, err error) { + rsp.Header().Del(contentEncodingHeader) + http.Error( + rsp, + "An error has occurred while serving metrics:\n\n"+err.Error(), + http.StatusInternalServerError, + ) +}