Use streaming encoding of metrics

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2018-10-20 01:28:16 +02:00
parent 752f50d366
commit 62361fc0fb
3 changed files with 55 additions and 75 deletions

View File

@ -32,7 +32,6 @@
package promhttp
import (
"bytes"
"compress/gzip"
"fmt"
"io"
@ -59,12 +58,6 @@ var gzipPool = sync.Pool{
},
}
var bufPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
// Handler returns an http.Handler for the prometheus.DefaultGatherer, using
// default HandlerOpts, i.e. it reports the first error as an HTTP error, it has
// no error logging, and it applies compression if requested by the client.
@ -97,13 +90,13 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
}
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if inFlightSem != nil {
select {
case inFlightSem <- struct{}{}: // All good, carry on.
defer func() { <-inFlightSem }()
default:
http.Error(w, fmt.Sprintf(
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
), http.StatusServiceUnavailable)
return
@ -119,49 +112,21 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
panic(err)
case ContinueOnError:
if len(mfs) == 0 {
http.Error(w, "No metrics gathered, last error:\n\n"+err.Error(), http.StatusInternalServerError)
// Still report the error if no metrics have been gathered.
httpError(rsp, err)
return
}
case HTTPErrorOnError:
http.Error(w, "An error has occurred during metrics gathering:\n\n"+err.Error(), http.StatusInternalServerError)
httpError(rsp, err)
return
}
}
contentType := expfmt.Negotiate(req.Header)
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
enc := expfmt.NewEncoder(buf, contentType)
defer bufPool.Put(buf)
var lastErr error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
lastErr = err
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding metric family:", err)
}
switch opts.ErrorHandling {
case PanicOnError:
panic(err)
case ContinueOnError:
// Handled later.
case HTTPErrorOnError:
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError)
return
}
}
}
if lastErr != nil && buf.Len() == 0 {
http.Error(w, "No metrics encoded, last error:\n\n"+lastErr.Error(), http.StatusInternalServerError)
return
}
header := w.Header()
header := rsp.Header()
header.Set(contentTypeHeader, string(contentType))
header.Set(contentLengthHeader, fmt.Sprint(buf.Len()))
w := io.Writer(rsp)
if !opts.DisableCompression && gzipAccepted(req.Header) {
header.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
@ -170,12 +135,33 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
gz.Reset(w)
defer gz.Close()
zipWriter := gzipResponseWriter{gz, w}
writeResult(zipWriter, buf, opts)
w = gz
}
enc := expfmt.NewEncoder(w, contentType)
var lastErr error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
lastErr = err
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding and sending metric family:", err)
}
switch opts.ErrorHandling {
case PanicOnError:
panic(err)
case ContinueOnError:
// Handled later.
case HTTPErrorOnError:
httpError(rsp, err)
return
}
writeResult(w, buf, opts)
// TODO(beorn7): Consider streaming serving of metrics.
}
}
if lastErr != nil {
httpError(rsp, lastErr)
}
})
if opts.Timeout <= 0 {
@ -296,29 +282,8 @@ type HandlerOpts struct {
Timeout time.Duration
}
// gzipResponseWriter in charge of wrapping io.Writer and http.ReponseWriter
// together, allowing to get a single struct which implements both interface.
type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
}
func (w gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}
// writeResult to buf using http.ResponseWriter.
// If ErrorLog is enabled, err is logged in.
func writeResult(w http.ResponseWriter, buf *bytes.Buffer, opts HandlerOpts) {
if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil {
opts.ErrorLog.Println("error while sending encoded metrics:", err)
}
}
// gzipHandler return a http.HandlerFunc in charge of compressing the content
// of the given http.HandlerFunc
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(header http.Header) bool {
a := header.Get(acceptEncodingHeader)
parts := strings.Split(a, ",")
for _, part := range parts {
@ -329,3 +294,18 @@ func gzipAccepted(header http.Header) bool {
}
return false
}
// httpError removes any content-encoding header and then calls http.Error with
// the provided error and http.StatusInternalServerErrer. Error contents is
// supposed to be uncompressed plain text. However, same as with a plain
// http.Error, any header settings will be void if the header has already been
// sent. The error message will still be written to the writer, but it will
// probably be of limited use.
func httpError(rsp http.ResponseWriter, err error) {
rsp.Header().Del(contentEncodingHeader)
http.Error(
rsp,
"An error has occurred while serving metrics:\n\n"+err.Error(),
http.StatusInternalServerError,
)
}

View File

@ -103,7 +103,7 @@ func TestHandlerErrorHandling(t *testing.T) {
})
wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
`
wantErrorBody := `An error has occurred during metrics gathering:
wantErrorBody := `An error has occurred while serving metrics:
error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
`

View File

@ -250,7 +250,7 @@ metric: <
},
}
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred during metrics gathering:
expectedMetricFamilyInvalidLabelValueAsText := []byte(`An error has occurred while serving metrics:
collected metric "name" { label:<name:"constname" value:"\377" > label:<name:"labelname" value:"different_val" > counter:<value:42 > } has a label named "constname" whose value is not utf8: "\xff"
`)
@ -299,15 +299,15 @@ complex_bucket 1
},
},
}
bucketCollisionMsg := []byte(`An error has occurred during metrics gathering:
bucketCollisionMsg := []byte(`An error has occurred while serving metrics:
collected metric named "complex_bucket" collides with previously collected histogram named "complex"
`)
summaryCountCollisionMsg := []byte(`An error has occurred during metrics gathering:
summaryCountCollisionMsg := []byte(`An error has occurred while serving metrics:
collected metric named "complex_count" collides with previously collected summary named "complex"
`)
histogramCountCollisionMsg := []byte(`An error has occurred during metrics gathering:
histogramCountCollisionMsg := []byte(`An error has occurred while serving metrics:
collected metric named "complex_count" collides with previously collected histogram named "complex"
`)
@ -333,7 +333,7 @@ collected metric named "complex_count" collides with previously collected histog
},
},
}
duplicateLabelMsg := []byte(`An error has occurred during metrics gathering:
duplicateLabelMsg := []byte(`An error has occurred while serving metrics:
collected metric "broken_metric" { label:<name:"foo" value:"bar" > label:<name:"foo" value:"baz" > counter:<value:2.7 > } has two or more labels with the same name: foo
`)