Merge pull request #379 from prometheus/beorn7/http

promhttp: Introduce limit for connections in flight and timeout
This commit is contained in:
Björn Rabenstein 2018-02-10 15:02:05 +01:00 committed by GitHub
commit a40133b69f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 2 deletions

View File

@ -39,6 +39,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"time"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
@ -94,7 +95,24 @@ func Handler() http.Handler {
// instrumentation. Use the InstrumentMetricHandler function to apply the same // instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function. // kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { var inFlightSem chan struct{}
if opts.MaxRequestsInFlight > 0 {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
}
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if inFlightSem != nil {
select {
case inFlightSem <- struct{}{}: // All good, carry on.
defer func() { <-inFlightSem }()
default:
http.Error(w, fmt.Sprintf(
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
), http.StatusServiceUnavailable)
return
}
}
mfs, err := reg.Gather() mfs, err := reg.Gather()
if err != nil { if err != nil {
if opts.ErrorLog != nil { if opts.ErrorLog != nil {
@ -150,9 +168,19 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
if encoding != "" { if encoding != "" {
header.Set(contentEncodingHeader, encoding) header.Set(contentEncodingHeader, encoding)
} }
w.Write(buf.Bytes()) if _, err := w.Write(buf.Bytes()); err != nil && opts.ErrorLog != nil {
opts.ErrorLog.Println("error while sending encoded metrics:", err)
}
// TODO(beorn7): Consider streaming serving of metrics. // TODO(beorn7): Consider streaming serving of metrics.
}) })
if opts.Timeout <= 0 {
return h
}
return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n",
opts.Timeout,
))
} }
// InstrumentMetricHandler is usually used with an http.Handler returned by the // InstrumentMetricHandler is usually used with an http.Handler returned by the
@ -182,6 +210,7 @@ func InstrumentMetricHandler(reg prometheus.Registerer, handler http.Handler) ht
// Initialize the most likely HTTP status codes. // Initialize the most likely HTTP status codes.
cnt.WithLabelValues("200") cnt.WithLabelValues("200")
cnt.WithLabelValues("500") cnt.WithLabelValues("500")
cnt.WithLabelValues("503")
if err := reg.Register(cnt); err != nil { if err := reg.Register(cnt); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok { if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
cnt = are.ExistingCollector.(*prometheus.CounterVec) cnt = are.ExistingCollector.(*prometheus.CounterVec)
@ -246,6 +275,21 @@ type HandlerOpts struct {
// If DisableCompression is true, the handler will never compress the // If DisableCompression is true, the handler will never compress the
// response, even if requested by the client. // response, even if requested by the client.
DisableCompression bool DisableCompression bool
// The number of concurrent HTTP requests is limited to
// MaxRequestsInFlight. Additional requests are responded to with 503
// Service Unavailable and a suitable message in the body. If
// MaxRequestsInFlight is 0 or negative, no limit is applied.
MaxRequestsInFlight int
// If handling a request takes longer than Timeout, it is repsonded to
// with 503 ServiceUnavailable and a suitable Message. No timeout is
// applied if Timeout is 0 or negative. Note that with the current
// implementation, reaching the timeout simply ends the HTTP requests as
// described above (and even that only if sending of the body hasn't
// started yet), while the bulk work of gathering all the metrics keeps
// running in the background (with the eventual result to be thrown
// away). Until the implementation is improved, it is recommended to
// implement a separate timeout in potentially slow Collectors.
Timeout time.Duration
} }
// decorateWriter wraps a writer to handle gzip compression if requested. It // decorateWriter wraps a writer to handle gzip compression if requested. It

View File

@ -21,6 +21,7 @@ import (
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"testing" "testing"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -38,6 +39,23 @@ func (e errorCollector) Collect(ch chan<- prometheus.Metric) {
) )
} }
type blockingCollector struct {
CollectStarted, Block chan struct{}
}
func (b blockingCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy_desc", "not helpful", nil, nil)
}
func (b blockingCollector) Collect(ch chan<- prometheus.Metric) {
select {
case b.CollectStarted <- struct{}{}:
default:
}
// Collects nothing, just waits for a channel receive.
<-b.Block
}
func TestHandlerErrorHandling(t *testing.T) { func TestHandlerErrorHandling(t *testing.T) {
// Create a registry that collects a MetricFamily with two elements, // Create a registry that collects a MetricFamily with two elements,
@ -169,3 +187,64 @@ func TestInstrumentMetricHandler(t *testing.T) {
t.Errorf("got body %q, does not contain %q", got, want) t.Errorf("got body %q, does not contain %q", got, want)
} }
} }
func TestHandlerMaxRequestsInFlight(t *testing.T) {
reg := prometheus.NewRegistry()
handler := HandlerFor(reg, HandlerOpts{MaxRequestsInFlight: 1})
w1 := httptest.NewRecorder()
w2 := httptest.NewRecorder()
w3 := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil)
request.Header.Add("Accept", "test/plain")
c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)}
reg.MustRegister(c)
rq1Done := make(chan struct{})
go func() {
handler.ServeHTTP(w1, request)
close(rq1Done)
}()
<-c.CollectStarted
handler.ServeHTTP(w2, request)
if got, want := w2.Code, http.StatusServiceUnavailable; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
if got, want := w2.Body.String(), "Limit of concurrent requests reached (1), try again later.\n"; got != want {
t.Errorf("got body %q, want %q", got, want)
}
close(c.Block)
<-rq1Done
handler.ServeHTTP(w3, request)
if got, want := w3.Code, http.StatusOK; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
}
func TestHandlerTimeout(t *testing.T) {
reg := prometheus.NewRegistry()
handler := HandlerFor(reg, HandlerOpts{Timeout: time.Millisecond})
w := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/", nil)
request.Header.Add("Accept", "test/plain")
c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)}
reg.MustRegister(c)
handler.ServeHTTP(w, request)
if got, want := w.Code, http.StatusServiceUnavailable; got != want {
t.Errorf("got HTTP status code %d, want %d", got, want)
}
if got, want := w.Body.String(), "Exceeded configured timeout of 1ms.\n"; got != want {
t.Errorf("got body %q, want %q", got, want)
}
close(c.Block) // To not leak a goroutine.
}