From fdf4cbc87b0bcedc0889a3295b8fb48c54d3fe93 Mon Sep 17 00:00:00 2001 From: glefloch Date: Wed, 17 Oct 2018 13:17:33 +0200 Subject: [PATCH 1/2] Use sync.Pool for gzipWriter Signed-off-by: glefloch --- prometheus/promhttp/http.go | 92 +++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 0135737..b5dc912 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -53,19 +53,10 @@ const ( acceptEncodingHeader = "Accept-Encoding" ) -var bufPool sync.Pool - -func getBuf() *bytes.Buffer { - buf := bufPool.Get() - if buf == nil { - return &bytes.Buffer{} - } - return buf.(*bytes.Buffer) -} - -func giveBuf(buf *bytes.Buffer) { - buf.Reset() - bufPool.Put(buf) +var gzipPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, } // Handler returns an http.Handler for the prometheus.DefaultGatherer, using @@ -133,10 +124,9 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { } contentType := expfmt.Negotiate(req.Header) - buf := getBuf() - defer giveBuf(buf) - writer, encoding := decorateWriter(req, buf, opts.DisableCompression) - enc := expfmt.NewEncoder(writer, contentType) + buf := &bytes.Buffer{} + enc := expfmt.NewEncoder(buf, contentType) + var lastErr error for _, mf := range mfs { if err := enc.Encode(mf); err != nil { @@ -155,29 +145,50 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { } } } - if closer, ok := writer.(io.Closer); ok { - closer.Close() - } + if lastErr != nil && buf.Len() == 0 { http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) return } - header := w.Header() - header.Set(contentTypeHeader, string(contentType)) - header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) - if encoding != "" { - header.Set(contentEncodingHeader, encoding) - } + + w.Header().Set(contentTypeHeader, string(contentType)) + if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { opts.ErrorLog.Println("error while sending encoded metrics:", err) } // TODO(beorn7): Consider streaming serving of metrics. }) + gzipHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if opts.DisableCompression { + h.ServeHTTP(w, req) + return + } + header := req.Header.Get(acceptEncodingHeader) + parts := strings.Split(header, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "gzip" || strings.HasPrefix(part, "gzip;") { + + w.Header().Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + h.ServeHTTP(gzipResponseWriter{gz, w}, req) + return + } + } + h.ServeHTTP(w, req) + return + }) + if opts.Timeout <= 0 { - return h + return gzipHandler } - return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf( + return http.TimeoutHandler(gzipHandler, opts.Timeout, fmt.Sprintf( "Exceeded configured timeout of %v.\n", opts.Timeout, )) @@ -292,20 +303,13 @@ type HandlerOpts struct { Timeout time.Duration } -// decorateWriter wraps a writer to handle gzip compression if requested. It -// returns the decorated writer and the appropriate "Content-Encoding" header -// (which is empty if no compression is enabled). -func decorateWriter(request *http.Request, writer io.Writer, compressionDisabled bool) (io.Writer, string) { - if compressionDisabled { - return writer, "" - } - header := request.Header.Get(acceptEncodingHeader) - parts := strings.Split(header, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - return gzip.NewWriter(writer), "gzip" - } - } - return writer, "" +// gzipResponseWriter in charge of wrapping io.Writer and http.ReponseWriter +// together, allowing to get a single struct which implements both interface. +type gzipResponseWriter struct { + io.Writer + http.ResponseWriter +} + +func (w gzipResponseWriter) Write(b []byte) (int, error) { + return w.Writer.Write(b) } From c2c6fd2ab46aed7688371380acb3415157492ddc Mon Sep 17 00:00:00 2001 From: glefloch Date: Fri, 19 Oct 2018 13:51:45 +0200 Subject: [PATCH 2/2] Fix PR comments Signed-off-by: glefloch --- prometheus/promhttp/http.go | 82 ++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index b5dc912..73b3ea1 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -59,6 +59,12 @@ var gzipPool = sync.Pool{ }, } +var bufPool = sync.Pool{ + New: func() interface{} { + return &bytes.Buffer{} + }, +} + // Handler returns an http.Handler for the prometheus.DefaultGatherer, using // default HandlerOpts, i.e. it reports the first error as an HTTP error, it has // no error logging, and it applies compression if requested by the client. @@ -103,7 +109,6 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - mfs, err := reg.Gather() if err != nil { if opts.ErrorLog != nil { @@ -124,9 +129,12 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { } contentType := expfmt.Negotiate(req.Header) - buf := &bytes.Buffer{} + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() enc := expfmt.NewEncoder(buf, contentType) + defer bufPool.Put(buf) + var lastErr error for _, mf := range mfs { if err := enc.Encode(mf); err != nil { @@ -150,45 +158,30 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError) return } + header := w.Header() + header.Set(contentTypeHeader, string(contentType)) + header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) - w.Header().Set(contentTypeHeader, string(contentType)) + if !opts.DisableCompression && gzipAccepted(req.Header) { + header.Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) - if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { - opts.ErrorLog.Println("error while sending encoded metrics:", err) + gz.Reset(w) + defer gz.Close() + + zipWriter := gzipResponseWriter{gz, w} + writeResult(zipWriter, buf, opts) + return } + writeResult(w, buf, opts) // TODO(beorn7): Consider streaming serving of metrics. }) - gzipHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if opts.DisableCompression { - h.ServeHTTP(w, req) - return - } - header := req.Header.Get(acceptEncodingHeader) - parts := strings.Split(header, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - - w.Header().Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) - - gz.Reset(w) - defer gz.Close() - - h.ServeHTTP(gzipResponseWriter{gz, w}, req) - return - } - } - h.ServeHTTP(w, req) - return - }) - if opts.Timeout <= 0 { - return gzipHandler + return h } - return http.TimeoutHandler(gzipHandler, opts.Timeout, fmt.Sprintf( + return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf( "Exceeded configured timeout of %v.\n", opts.Timeout, )) @@ -313,3 +306,26 @@ type gzipResponseWriter struct { func (w gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) } + +// writeResult to buf using http.ResponseWriter. +// If ErrorLog is enabled, err is logged in. +func writeResult(w http.ResponseWriter, buf *bytes.Buffer, opts HandlerOpts) { + if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { + opts.ErrorLog.Println("error while sending encoded metrics:", err) + } +} + +// gzipHandler return a http.HandlerFunc in charge of compressing the content +// of the given http.HandlerFunc +func gzipAccepted(header http.Header) bool { + + a := header.Get(acceptEncodingHeader) + parts := strings.Split(a, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "gzip" || strings.HasPrefix(part, "gzip;") { + return true + } + } + return false +}