From c551c3c661432c89180e0dfed6579cf215de8dea Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 9 Feb 2018 17:05:10 +0100 Subject: [PATCH] promhttp: Introduce limit for connections in flight and timeout --- prometheus/promhttp/http.go | 48 ++++++++++++++++++- prometheus/promhttp/http_test.go | 79 ++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 2 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index cdca3e3..714a7f9 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -39,6 +39,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/prometheus/common/expfmt" @@ -94,7 +95,24 @@ func Handler() http.Handler { // instrumentation. Use the InstrumentMetricHandler function to apply the same // kind of instrumentation as it is used by the Handler function. func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + var inFlightSem chan struct{} + if opts.MaxRequestsInFlight > 0 { + inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight) + } + + h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if inFlightSem != nil { + select { + case inFlightSem <- struct{}{}: // All good, carry on. + defer func() { <-inFlightSem }() + default: + http.Error(w, fmt.Sprintf( + "Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight, + ), http.StatusServiceUnavailable) + return + } + } + mfs, err := reg.Gather() if err != nil { if opts.ErrorLog != nil { @@ -150,9 +168,19 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { if encoding != "" { header.Set(contentEncodingHeader, encoding) } - w.Write(buf.Bytes()) + if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil { + opts.ErrorLog.Println("error while sending encoded metrics:", err) + } // TODO(beorn7): Consider streaming serving of metrics. }) + + if opts.Timeout <= 0 { + return h + } + return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf( + "Exceeded configured timeout of %v.\n", + opts.Timeout, + )) } // InstrumentMetricHandler is usually used with an http.Handler returned by the @@ -182,6 +210,7 @@ func InstrumentMetricHandler(reg prometheus.Registerer, handler http.Handler) ht // Initialize the most likely HTTP status codes. cnt.WithLabelValues("200") cnt.WithLabelValues("500") + cnt.WithLabelValues("503") if err := reg.Register(cnt); err != nil { if are, ok := err.(prometheus.AlreadyRegisteredError); ok { cnt = are.ExistingCollector.(*prometheus.CounterVec) @@ -246,6 +275,21 @@ type HandlerOpts struct { // If DisableCompression is true, the handler will never compress the // response, even if requested by the client. DisableCompression bool + // The number of concurrent HTTP requests is limited to + // MaxRequestsInFlight. Additional requests are responded to with 503 + // Service Unavailable and a suitable message in the body. If + // MaxRequestsInFlight is 0 or negative, no limit is applied. + MaxRequestsInFlight int + // If handling a request takes longer than Timeout, it is repsonded to + // with 503 ServiceUnavailable and a suitable Message. No timeout is + // applied if Timeout is 0 or negative. Note that with the current + // implementation, reaching the timeout simply ends the HTTP requests as + // described above (and even that only if sending of the body hasn't + // started yet), while the bulk work of gathering all the metrics keeps + // running in the background (with the eventual result to be thrown + // away). Until the implementation is improved, it is recommended to + // implement a separate timeout in potentially slow Collectors. + Timeout time.Duration } // decorateWriter wraps a writer to handle gzip compression if requested. It diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index a47ca7b..aeaa0b4 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -21,6 +21,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus" ) @@ -38,6 +39,23 @@ func (e errorCollector) Collect(ch chan<- prometheus.Metric) { ) } +type blockingCollector struct { + CollectStarted, Block chan struct{} +} + +func (b blockingCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- prometheus.NewDesc("dummy_desc", "not helpful", nil, nil) +} + +func (b blockingCollector) Collect(ch chan<- prometheus.Metric) { + select { + case b.CollectStarted <- struct{}{}: + default: + } + // Collects nothing, just waits for a channel receive. + <-b.Block +} + func TestHandlerErrorHandling(t *testing.T) { // Create a registry that collects a MetricFamily with two elements, @@ -169,3 +187,64 @@ func TestInstrumentMetricHandler(t *testing.T) { t.Errorf("got body %q, does not contain %q", got, want) } } + +func TestHandlerMaxRequestsInFlight(t *testing.T) { + reg := prometheus.NewRegistry() + handler := HandlerFor(reg, HandlerOpts{MaxRequestsInFlight: 1}) + w1 := httptest.NewRecorder() + w2 := httptest.NewRecorder() + w3 := httptest.NewRecorder() + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add("Accept", "test/plain") + + c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)} + reg.MustRegister(c) + + rq1Done := make(chan struct{}) + go func() { + handler.ServeHTTP(w1, request) + close(rq1Done) + }() + <-c.CollectStarted + + handler.ServeHTTP(w2, request) + + if got, want := w2.Code, http.StatusServiceUnavailable; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + if got, want := w2.Body.String(), "Limit of concurrent requests reached (1), try again later.\n"; got != want { + t.Errorf("got body %q, want %q", got, want) + } + + close(c.Block) + <-rq1Done + + handler.ServeHTTP(w3, request) + + if got, want := w3.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } +} + +func TestHandlerTimeout(t *testing.T) { + reg := prometheus.NewRegistry() + handler := HandlerFor(reg, HandlerOpts{Timeout: time.Millisecond}) + w := httptest.NewRecorder() + + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add("Accept", "test/plain") + + c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)} + reg.MustRegister(c) + + handler.ServeHTTP(w, request) + + if got, want := w.Code, http.StatusServiceUnavailable; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + if got, want := w.Body.String(), "Exceeded configured timeout of 1ms.\n"; got != want { + t.Errorf("got body %q, want %q", got, want) + } + + close(c.Block) // To not leak a goroutine. +}