Merge pull request #482 from prometheus/beorn7/http

Use gzip from a pool and stream while encoding metrics
This commit is contained in:
Björn Rabenstein 2018-10-22 13:47:59 +02:00 committed by GitHub
commit 16f375c74d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 111 deletions

View File

@ -15,9 +15,7 @@ package prometheus
import ( import (
"bufio" "bufio"
"bytes"
"compress/gzip" "compress/gzip"
"fmt"
"io" "io"
"net" "net"
"net/http" "net/http"
@ -41,19 +39,10 @@ const (
acceptEncodingHeader = "Accept-Encoding" acceptEncodingHeader = "Accept-Encoding"
) )
var bufPool sync.Pool var gzipPool = sync.Pool{
New: func() interface{} {
func getBuf() *bytes.Buffer { return gzip.NewWriter(nil)
buf := bufPool.Get() },
if buf == nil {
return &bytes.Buffer{}
}
return buf.(*bytes.Buffer)
}
func giveBuf(buf *bytes.Buffer) {
buf.Reset()
bufPool.Put(buf)
} }
// Handler returns an HTTP handler for the DefaultGatherer. It is // Handler returns an HTTP handler for the DefaultGatherer. It is
@ -71,56 +60,38 @@ func Handler() http.Handler {
// Deprecated: Use promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{}) // Deprecated: Use promhttp.HandlerFor(DefaultGatherer, promhttp.HandlerOpts{})
// instead. See there for further documentation. // instead. See there for further documentation.
func UninstrumentedHandler() http.Handler { func UninstrumentedHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
mfs, err := DefaultGatherer.Gather() mfs, err := DefaultGatherer.Gather()
if err != nil { if err != nil {
http.Error(w, "An error has occurred during metrics collection:\n\n"+err.Error(), http.StatusInternalServerError) httpError(rsp, err)
return return
} }
contentType := expfmt.Negotiate(req.Header) contentType := expfmt.Negotiate(req.Header)
buf := getBuf() header := rsp.Header()
defer giveBuf(buf)
writer, encoding := decorateWriter(req, buf)
enc := expfmt.NewEncoder(writer, contentType)
var lastErr error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
lastErr = err
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError)
return
}
}
if closer, ok := writer.(io.Closer); ok {
closer.Close()
}
if lastErr != nil && buf.Len() == 0 {
http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError)
return
}
header := w.Header()
header.Set(contentTypeHeader, string(contentType)) header.Set(contentTypeHeader, string(contentType))
header.Set(contentLengthHeader, fmt.Sprint(buf.Len()))
if encoding != "" { w := io.Writer(rsp)
header.Set(contentEncodingHeader, encoding) if gzipAccepted(req.Header) {
} header.Set(contentEncodingHeader, "gzip")
w.Write(buf.Bytes()) gz := gzipPool.Get().(*gzip.Writer)
}) defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
} }
// decorateWriter wraps a writer to handle gzip compression if requested. It enc := expfmt.NewEncoder(w, contentType)
// returns the decorated writer and the appropriate "Content-Encoding" header
// (which is empty if no compression is enabled). for _, mf := range mfs {
func decorateWriter(request *http.Request, writer io.Writer) (io.Writer, string) { if err := enc.Encode(mf); err != nil {
header := request.Header.Get(acceptEncodingHeader) httpError(rsp, err)
parts := strings.Split(header, ",") return
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
return gzip.NewWriter(writer), "gzip"
} }
} }
return writer, "" })
} }
var instLabels = []string{"method", "code"} var instLabels = []string{"method", "code"}
@ -503,3 +474,31 @@ func sanitizeCode(s int) string {
return strconv.Itoa(s) return strconv.Itoa(s)
} }
} }
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(header http.Header) bool {
a := header.Get(acceptEncodingHeader)
parts := strings.Split(a, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
return true
}
}
return false
}
// httpError removes any content-encoding header and then calls http.Error with
// the provided error and http.StatusInternalServerErrer. Error contents is
// supposed to be uncompressed plain text. However, same as with a plain
// http.Error, any header settings will be void if the header has already been
// sent. The error message will still be written to the writer, but it will
// probably be of limited use.
func httpError(rsp http.ResponseWriter, err error) {
rsp.Header().Del(contentEncodingHeader)
http.Error(
rsp,
"An error has occurred while serving metrics:\n\n"+err.Error(),
http.StatusInternalServerError,
)
}

View File

@ -32,7 +32,6 @@
package promhttp package promhttp
import ( import (
"bytes"
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"io" "io"
@ -53,19 +52,10 @@ const (
acceptEncodingHeader = "Accept-Encoding" acceptEncodingHeader = "Accept-Encoding"
) )
var bufPool sync.Pool var gzipPool = sync.Pool{
New: func() interface{} {
func getBuf() *bytes.Buffer { return gzip.NewWriter(nil)
buf := bufPool.Get() },
if buf == nil {
return &bytes.Buffer{}
}
return buf.(*bytes.Buffer)
}
func giveBuf(buf *bytes.Buffer) {
buf.Reset()
bufPool.Put(buf)
} }
// Handler returns an http.Handler for the prometheus.DefaultGatherer, using // Handler returns an http.Handler for the prometheus.DefaultGatherer, using
@ -100,19 +90,18 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight) inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
} }
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if inFlightSem != nil { if inFlightSem != nil {
select { select {
case inFlightSem <- struct{}{}: // All good, carry on. case inFlightSem <- struct{}{}: // All good, carry on.
defer func() { <-inFlightSem }() defer func() { <-inFlightSem }()
default: default:
http.Error(w, fmt.Sprintf( http.Error(rsp, fmt.Sprintf(
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight, "Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
), http.StatusServiceUnavailable) ), http.StatusServiceUnavailable)
return return
} }
} }
mfs, err := reg.Gather() mfs, err := reg.Gather()
if err != nil { if err != nil {
if opts.ErrorLog != nil { if opts.ErrorLog != nil {
@ -123,26 +112,40 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
panic(err) panic(err)
case ContinueOnError: case ContinueOnError:
if len(mfs) == 0 { if len(mfs) == 0 {
http.Error(w, "No metrics gathered, last error:\n\n"+err.Error(), http.StatusInternalServerError) // Still report the error if no metrics have been gathered.
httpError(rsp, err)
return return
} }
case HTTPErrorOnError: case HTTPErrorOnError:
http.Error(w, "An error has occurred during metrics gathering:\n\n"+err.Error(), http.StatusInternalServerError) httpError(rsp, err)
return return
} }
} }
contentType := expfmt.Negotiate(req.Header) contentType := expfmt.Negotiate(req.Header)
buf := getBuf() header := rsp.Header()
defer giveBuf(buf) header.Set(contentTypeHeader, string(contentType))
writer, encoding := decorateWriter(req, buf, opts.DisableCompression)
enc := expfmt.NewEncoder(writer, contentType) w := io.Writer(rsp)
if !opts.DisableCompression && gzipAccepted(req.Header) {
header.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
}
enc := expfmt.NewEncoder(w, contentType)
var lastErr error var lastErr error
for _, mf := range mfs { for _, mf := range mfs {
if err := enc.Encode(mf); err != nil { if err := enc.Encode(mf); err != nil {
lastErr = err lastErr = err
if opts.ErrorLog != nil { if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding metric family:", err) opts.ErrorLog.Println("error encoding and sending metric family:", err)
} }
switch opts.ErrorHandling { switch opts.ErrorHandling {
case PanicOnError: case PanicOnError:
@ -150,28 +153,15 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
case ContinueOnError: case ContinueOnError:
// Handled later. // Handled later.
case HTTPErrorOnError: case HTTPErrorOnError:
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError) httpError(rsp, err)
return return
} }
} }
} }
if closer, ok := writer.(io.Closer); ok {
closer.Close() if lastErr != nil {
httpError(rsp, lastErr)
} }
if lastErr != nil && buf.Len() == 0 {
http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError)
return
}
header := w.Header()
header.Set(contentTypeHeader, string(contentType))
header.Set(contentLengthHeader, fmt.Sprint(buf.Len()))
if encoding != "" {
header.Set(contentEncodingHeader, encoding)
}
if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil {
opts.ErrorLog.Println("error while sending encoded metrics:", err)
}
// TODO(beorn7): Consider streaming serving of metrics.
}) })
if opts.Timeout <= 0 { if opts.Timeout <= 0 {
@ -292,20 +282,30 @@ type HandlerOpts struct {
Timeout time.Duration Timeout time.Duration
} }
// decorateWriter wraps a writer to handle gzip compression if requested. It // gzipAccepted returns whether the client will accept gzip-encoded content.
// returns the decorated writer and the appropriate "Content-Encoding" header func gzipAccepted(header http.Header) bool {
// (which is empty if no compression is enabled). a := header.Get(acceptEncodingHeader)
func decorateWriter(request *http.Request, writer io.Writer, compressionDisabled bool) (io.Writer, string) { parts := strings.Split(a, ",")
if compressionDisabled {
return writer, ""
}
header := request.Header.Get(acceptEncodingHeader)
parts := strings.Split(header, ",")
for _, part := range parts { for _, part := range parts {
part = strings.TrimSpace(part) part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") { if part == "gzip" || strings.HasPrefix(part, "gzip;") {
return gzip.NewWriter(writer), "gzip" return true
} }
} }
return writer, "" return false
}
// httpError removes any content-encoding header and then calls http.Error with
// the provided error and http.StatusInternalServerErrer. Error contents is
// supposed to be uncompressed plain text. However, same as with a plain
// http.Error, any header settings will be void if the header has already been
// sent. The error message will still be written to the writer, but it will
// probably be of limited use.
func httpError(rsp http.ResponseWriter, err error) {
rsp.Header().Del(contentEncodingHeader)
http.Error(
rsp,
"An error has occurred while serving metrics:\n\n"+err.Error(),
http.StatusInternalServerError,
)
} }

View File

@ -103,7 +103,7 @@ func TestHandlerErrorHandling(t *testing.T) {
}) })
wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
` `
wantErrorBody := `An error has occurred during metrics gathering: wantErrorBody := `An error has occurred while serving metrics:
error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
` `

View File

@ -250,7 +250,7 @@ metric: <
}, },
} }
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred during metrics gathering: expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred while serving metrics:
collected metric "name" { label:<name:"constname" value:"\377" > label:<name:"labelname" value:"different_val" > counter:<value:42 > } has a label named "constname" whose value is not utf8: "\xff" collected metric "name" { label:<name:"constname" value:"\377" > label:<name:"labelname" value:"different_val" > counter:<value:42 > } has a label named "constname" whose value is not utf8: "\xff"
`) `)
@ -299,15 +299,15 @@ complex_bucket 1
}, },
}, },
} }
bucketCollisionMsg := []byte(`An error has occurred during metrics gathering: bucketCollisionMsg := []byte(`An error has occurred while serving metrics:
collected metric named "complex_bucket" collides with previously collected histogram named "complex" collected metric named "complex_bucket" collides with previously collected histogram named "complex"
`) `)
summaryCountCollisionMsg := []byte(`An error has occurred during metrics gathering: summaryCountCollisionMsg := []byte(`An error has occurred while serving metrics:
collected metric named "complex_count" collides with previously collected summary named "complex" collected metric named "complex_count" collides with previously collected summary named "complex"
`) `)
histogramCountCollisionMsg := []byte(`An error has occurred during metrics gathering: histogramCountCollisionMsg := []byte(`An error has occurred while serving metrics:
collected metric named "complex_count" collides with previously collected histogram named "complex" collected metric named "complex_count" collides with previously collected histogram named "complex"
`) `)
@ -333,7 +333,7 @@ collected metric named "complex_count" collides with previously collected histog
}, },
}, },
} }
duplicateLabelMsg := []byte(`An error has occurred during metrics gathering: duplicateLabelMsg := []byte(`An error has occurred while serving metrics:
collected metric "broken_metric" { label:<name:"foo" value:"bar" > label:<name:"foo" value:"baz" > counter:<value:2.7 > } has two or more labels with the same name: foo collected metric "broken_metric" { label:<name:"foo" value:"bar" > label:<name:"foo" value:"baz" > counter:<value:2.7 > } has two or more labels with the same name: foo
`) `)