2015-02-02 17:14:36 +03:00
// Copyright 2014 The Prometheus Authors
2014-05-07 22:08:33 +04:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2015-02-02 17:14:36 +03:00
// Copyright (c) 2013, The Prometheus Authors
2013-02-12 05:36:06 +04:00
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be found
// in the LICENSE file.
2012-05-20 01:59:25 +04:00
2013-04-03 20:33:32 +04:00
package prometheus
2012-05-20 01:59:25 +04:00
import (
2014-05-07 22:08:33 +04:00
"bytes"
2014-12-11 04:05:35 +03:00
"compress/gzip"
2014-05-07 22:08:33 +04:00
"errors"
2013-01-19 17:48:30 +04:00
"fmt"
"io"
2012-05-20 01:59:25 +04:00
"net/http"
2014-07-01 22:20:42 +04:00
"net/url"
2014-12-11 04:05:35 +03:00
"os"
2013-01-19 17:48:30 +04:00
"sort"
2014-07-04 19:08:57 +04:00
"strings"
2012-05-20 01:59:25 +04:00
"sync"
2013-06-27 20:46:16 +04:00
2015-02-27 18:12:59 +03:00
"github.com/golang/protobuf/proto"
2015-09-17 14:06:43 +03:00
"github.com/prometheus/common/expfmt"
2015-02-27 18:12:59 +03:00
dto "github.com/prometheus/client_model/go"
2012-05-20 01:59:25 +04:00
)
2014-05-07 22:08:33 +04:00
var (
2014-12-11 04:05:35 +03:00
defRegistry = newDefaultRegistry ( )
2014-05-07 22:08:33 +04:00
errAlreadyReg = errors . New ( "duplicate metrics collector registration attempted" )
)
// Constants relevant to the HTTP interface.
2012-05-24 22:02:44 +04:00
const (
2014-05-07 22:08:33 +04:00
// APIVersion is the version of the format of the exported data. This
// will match this library's version, which subscribes to the Semantic
// Versioning scheme.
APIVersion = "0.0.4"
// DelimitedTelemetryContentType is the content type set on telemetry
// data responses in delimited protobuf format.
2014-07-15 17:34:52 +04:00
DelimitedTelemetryContentType = ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited `
2014-05-07 22:08:33 +04:00
// TextTelemetryContentType is the content type set on telemetry data
// responses in text format.
TextTelemetryContentType = ` text/plain; version= ` + APIVersion
// ProtoTextTelemetryContentType is the content type set on telemetry
// data responses in protobuf text format. (Only used for debugging.)
2014-07-15 17:34:52 +04:00
ProtoTextTelemetryContentType = ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=text `
2014-05-07 22:08:33 +04:00
// ProtoCompactTextTelemetryContentType is the content type set on
// telemetry data responses in protobuf compact text format. (Only used
// for debugging.)
2014-07-15 17:34:52 +04:00
ProtoCompactTextTelemetryContentType = ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=compact-text `
2014-05-07 22:08:33 +04:00
// Constants for object pools.
numBufs = 4
numMetricFamilies = 1000
numMetrics = 10000
// Capacity for the channel to collect metrics and descriptors.
capMetricChan = 1000
capDescChan = 10
2014-07-04 19:08:57 +04:00
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
acceptHeader = "Accept"
2013-02-12 05:36:06 +04:00
)
2012-05-20 01:59:25 +04:00
2014-05-07 22:08:33 +04:00
// Handler returns the HTTP handler for the global Prometheus registry. It is
// already instrumented with InstrumentHandler (using "prometheus" as handler
// name). Usually the handler is used to handle the "/metrics" endpoint.
2016-05-15 18:59:51 +03:00
//
// Please note the issues described in the doc comment of InstrumentHandler. You
// might want to consider using UninstrumentedHandler instead.
2014-05-07 22:08:33 +04:00
func Handler ( ) http . Handler {
return InstrumentHandler ( "prometheus" , defRegistry )
}
2014-04-23 15:40:37 +04:00
2014-05-07 22:08:33 +04:00
// UninstrumentedHandler works in the same way as Handler, but the returned HTTP
// handler is not instrumented. This is useful if no instrumentation is desired
// (for whatever reason) or if the instrumentation has to happen with a
// different handler name (or with a different instrumentation approach
// altogether). See the InstrumentHandler example.
func UninstrumentedHandler ( ) http . Handler {
return defRegistry
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
// Register registers a new Collector to be included in metrics collection. It
// returns an error if the descriptors provided by the Collector are invalid or
// if they - in combination with descriptors of already registered Collectors -
// do not fulfill the consistency and uniqueness criteria described in the Desc
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
// documentation.
2014-05-07 22:08:33 +04:00
//
// Do not register the same Collector multiple times concurrently. (Registering
// the same Collector twice would result in an error anyway, but on top of that,
// it is not safe to do so concurrently.)
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
func Register ( m Collector ) error {
_ , err := defRegistry . Register ( m )
return err
2014-05-07 22:08:33 +04:00
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
// MustRegister works like Register but panics where Register would have
// returned an error.
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
func MustRegister ( m Collector ) {
err := Register ( m )
2014-05-07 22:08:33 +04:00
if err != nil {
panic ( err )
}
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
// RegisterOrGet works like Register but does not return an error if a Collector
// is registered that equals a previously registered Collector. (Two Collectors
// are considered equal if their Describe method yields the same set of
// descriptors.) Instead, the previously registered Collector is returned (which
// is helpful if the new and previously registered Collectors are equal but not
// identical, i.e. not pointers to the same object).
//
// As for Register, it is still not safe to call RegisterOrGet with the same
// Collector multiple times concurrently.
func RegisterOrGet ( m Collector ) ( Collector , error ) {
return defRegistry . RegisterOrGet ( m )
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
// MustRegisterOrGet works like Register but panics where RegisterOrGet would
// have returned an error.
func MustRegisterOrGet ( m Collector ) Collector {
existing , err := RegisterOrGet ( m )
if err != nil {
panic ( err )
}
return existing
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
// Unregister unregisters the Collector that equals the Collector passed in as
// an argument. (Two Collectors are considered equal if their Describe method
// yields the same set of descriptors.) The function returns whether a Collector
// was unregistered.
func Unregister ( c Collector ) bool {
return defRegistry . Unregister ( c )
2013-02-18 08:09:21 +04:00
}
2014-05-07 22:08:33 +04:00
// SetMetricFamilyInjectionHook sets a function that is called whenever metrics
// are collected. The hook function must be set before metrics collection begins
// (i.e. call SetMetricFamilyInjectionHook before setting the HTTP handler.) The
2015-03-15 17:47:56 +03:00
// MetricFamily protobufs returned by the hook function are merged with the
// metrics collected in the usual way.
2013-02-12 05:36:06 +04:00
//
2014-05-07 22:08:33 +04:00
// This is a way to directly inject MetricFamily protobufs managed and owned by
2015-03-15 17:47:56 +03:00
// the caller. The caller has full responsibility. As no registration of the
// injected metrics has happened, there is no descriptor to check against, and
// there are no registration-time checks. If collect-time checks are disabled
// (see function EnableCollectChecks), no sanity checks are performed on the
// returned protobufs at all. If collect-checks are enabled, type and uniqueness
// checks are performed, but no further consistency checks (which would require
// knowledge of a metric descriptor).
//
2015-06-08 23:12:07 +03:00
// Sorting concerns: The caller is responsible for sorting the label pairs in
// each metric. However, the order of metrics will be sorted by the registry as
// it is required anyway after merging with the metric families collected
// conventionally.
//
2015-03-15 17:47:56 +03:00
// The function must be callable at any time and concurrently.
2014-05-07 22:08:33 +04:00
func SetMetricFamilyInjectionHook ( hook func ( ) [ ] * dto . MetricFamily ) {
defRegistry . metricFamilyInjectionHook = hook
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
// PanicOnCollectError sets the behavior whether a panic is caused upon an error
2015-02-19 17:34:04 +03:00
// while metrics are collected and served to the HTTP endpoint. By default, an
2014-05-07 22:08:33 +04:00
// internal server error (status code 500) is served with an error message.
func PanicOnCollectError ( b bool ) {
defRegistry . panicOnCollectError = b
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
// EnableCollectChecks enables (or disables) additional consistency checks
2014-06-23 13:45:49 +04:00
// during metrics collection. These additional checks are not enabled by default
// because they inflict a performance penalty and the errors they check for can
// only happen if the used Metric and Collector types have internal programming
// errors. It can be helpful to enable these checks while working with custom
// Collectors or Metrics whose correctness is not well established yet.
2014-05-07 22:08:33 +04:00
func EnableCollectChecks ( b bool ) {
defRegistry . collectChecksEnabled = b
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
// encoder is a function that writes a dto.MetricFamily to an io.Writer in a
// certain encoding. It returns the number of bytes written and any error
2015-04-07 16:19:47 +03:00
// encountered. Note that pbutil.WriteDelimited and pbutil.MetricFamilyToText
// are encoders.
2014-05-07 22:08:33 +04:00
type encoder func ( io . Writer , * dto . MetricFamily ) ( int , error )
type registry struct {
mtx sync . RWMutex
collectorsByID map [ uint64 ] Collector // ID is a hash of the descIDs.
descIDs map [ uint64 ] struct { }
dimHashesByName map [ string ] uint64
bufPool chan * bytes . Buffer
metricFamilyPool chan * dto . MetricFamily
metricPool chan * dto . Metric
metricFamilyInjectionHook func ( ) [ ] * dto . MetricFamily
panicOnCollectError , collectChecksEnabled bool
2014-04-02 15:31:22 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) Register ( c Collector ) ( Collector , error ) {
descChan := make ( chan * Desc , capDescChan )
go func ( ) {
c . Describe ( descChan )
close ( descChan )
} ( )
newDescIDs := map [ uint64 ] struct { } { }
newDimHashesByName := map [ string ] uint64 { }
2014-06-26 15:58:15 +04:00
var collectorID uint64 // Just a sum of all desc IDs.
2014-05-07 22:08:33 +04:00
var duplicateDescErr error
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
// Coduct various tests...
for desc := range descChan {
// Is the descriptor valid at all?
if desc . err != nil {
return c , fmt . Errorf ( "descriptor %s is invalid: %s" , desc , desc . err )
}
2013-04-19 16:11:01 +04:00
2014-05-07 22:08:33 +04:00
// Is the descID unique?
// (In other words: Is the fqName + constLabel combination unique?)
if _ , exists := r . descIDs [ desc . id ] ; exists {
duplicateDescErr = fmt . Errorf ( "descriptor %s already exists with the same fully-qualified name and const label values" , desc )
}
// If it is not a duplicate desc in this collector, add it to
2014-06-26 15:58:15 +04:00
// the collectorID. (We allow duplicate descs within the same
2014-05-07 22:08:33 +04:00
// collector, but their existence must be a no-op.)
if _ , exists := newDescIDs [ desc . id ] ; ! exists {
newDescIDs [ desc . id ] = struct { } { }
2014-06-26 15:58:15 +04:00
collectorID += desc . id
2014-05-07 22:08:33 +04:00
}
2013-04-19 16:11:01 +04:00
2014-05-07 22:08:33 +04:00
// Are all the label names and the help string consistent with
// previous descriptors of the same name?
// First check existing descriptors...
if dimHash , exists := r . dimHashesByName [ desc . fqName ] ; exists {
if dimHash != desc . dimHash {
return nil , fmt . Errorf ( "a previously registered descriptor with the same fully-qualified name as %s has different label names or a different help string" , desc )
}
} else {
// ...then check the new descriptors already seen.
if dimHash , exists := newDimHashesByName [ desc . fqName ] ; exists {
if dimHash != desc . dimHash {
return nil , fmt . Errorf ( "descriptors reported by collector have inconsistent label names or help strings for the same fully-qualified name, offender is %s" , desc )
}
} else {
newDimHashesByName [ desc . fqName ] = desc . dimHash
}
}
}
// Did anything happen at all?
if len ( newDescIDs ) == 0 {
return nil , errors . New ( "collector has no descriptors" )
}
if existing , exists := r . collectorsByID [ collectorID ] ; exists {
return existing , errAlreadyReg
}
// If the collectorID is new, but at least one of the descs existed
// before, we are in trouble.
if duplicateDescErr != nil {
return nil , duplicateDescErr
}
2013-04-19 16:11:01 +04:00
2014-05-07 22:08:33 +04:00
// Only after all tests have passed, actually register.
r . collectorsByID [ collectorID ] = c
for hash := range newDescIDs {
r . descIDs [ hash ] = struct { } { }
}
for name , dimHash := range newDimHashesByName {
r . dimHashesByName [ name ] = dimHash
}
return c , nil
2013-04-19 16:11:01 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) RegisterOrGet ( m Collector ) ( Collector , error ) {
existing , err := r . Register ( m )
if err != nil && err != errAlreadyReg {
return nil , err
}
return existing , nil
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) Unregister ( c Collector ) bool {
descChan := make ( chan * Desc , capDescChan )
go func ( ) {
c . Describe ( descChan )
close ( descChan )
} ( )
descIDs := map [ uint64 ] struct { } { }
2014-06-26 15:58:15 +04:00
var collectorID uint64 // Just a sum of the desc IDs.
2014-05-07 22:08:33 +04:00
for desc := range descChan {
if _ , exists := descIDs [ desc . id ] ; ! exists {
2014-06-26 15:58:15 +04:00
collectorID += desc . id
2014-05-07 22:08:33 +04:00
descIDs [ desc . id ] = struct { } { }
2013-01-19 17:48:30 +04:00
}
}
2014-05-07 22:08:33 +04:00
r . mtx . RLock ( )
if _ , exists := r . collectorsByID [ collectorID ] ; ! exists {
r . mtx . RUnlock ( )
return false
}
r . mtx . RUnlock ( )
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
delete ( r . collectorsByID , collectorID )
for id := range descIDs {
delete ( r . descIDs , id )
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
// dimHashesByName is left untouched as those must be consistent
// throughout the lifetime of a program.
return true
}
2013-01-19 17:48:30 +04:00
2015-03-15 21:29:57 +03:00
func ( r * registry ) Push ( job , instance , pushURL , method string ) error {
if ! strings . Contains ( pushURL , "://" ) {
pushURL = "http://" + pushURL
}
2015-11-26 21:16:53 +03:00
if strings . HasSuffix ( pushURL , "/" ) {
pushURL = pushURL [ : len ( pushURL ) - 1 ]
}
2015-03-15 21:29:57 +03:00
pushURL = fmt . Sprintf ( "%s/metrics/jobs/%s" , pushURL , url . QueryEscape ( job ) )
2014-07-01 22:20:42 +04:00
if instance != "" {
2015-03-15 21:29:57 +03:00
pushURL += "/instances/" + url . QueryEscape ( instance )
2014-07-01 22:20:42 +04:00
}
buf := r . getBuf ( )
defer r . giveBuf ( buf )
2015-09-17 14:06:43 +03:00
if err := r . writePB ( expfmt . NewEncoder ( buf , expfmt . FmtProtoDelim ) ) ; err != nil {
2014-07-01 22:20:42 +04:00
if r . panicOnCollectError {
panic ( err )
}
return err
}
2015-03-15 21:29:57 +03:00
req , err := http . NewRequest ( method , pushURL , buf )
2014-07-30 21:02:39 +04:00
if err != nil {
return err
}
2014-07-02 20:57:34 +04:00
req . Header . Set ( contentTypeHeader , DelimitedTelemetryContentType )
2014-07-01 22:20:42 +04:00
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return err
}
defer resp . Body . Close ( )
if resp . StatusCode != 202 {
2015-03-15 21:29:57 +03:00
return fmt . Errorf ( "unexpected status code %d while pushing to %s" , resp . StatusCode , pushURL )
2014-07-01 22:20:42 +04:00
}
return nil
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) ServeHTTP ( w http . ResponseWriter , req * http . Request ) {
2015-09-17 14:06:43 +03:00
contentType := expfmt . Negotiate ( req . Header )
2014-05-07 22:08:33 +04:00
buf := r . getBuf ( )
defer r . giveBuf ( buf )
2014-07-04 19:08:57 +04:00
writer , encoding := decorateWriter ( req , buf )
2015-09-17 14:06:43 +03:00
if err := r . writePB ( expfmt . NewEncoder ( writer , contentType ) ) ; err != nil {
2014-05-07 22:08:33 +04:00
if r . panicOnCollectError {
2013-01-19 17:48:30 +04:00
panic ( err )
}
2014-06-20 22:40:48 +04:00
http . Error ( w , "An error has occurred:\n\n" + err . Error ( ) , http . StatusInternalServerError )
2014-05-07 22:08:33 +04:00
return
2013-01-19 17:48:30 +04:00
}
2014-07-04 19:08:57 +04:00
if closer , ok := writer . ( io . Closer ) ; ok {
closer . Close ( )
}
2014-07-02 20:57:34 +04:00
header := w . Header ( )
2015-09-17 14:06:43 +03:00
header . Set ( contentTypeHeader , string ( contentType ) )
2014-07-02 20:57:34 +04:00
header . Set ( contentLengthHeader , fmt . Sprint ( buf . Len ( ) ) )
2014-07-04 19:08:57 +04:00
if encoding != "" {
header . Set ( contentEncodingHeader , encoding )
}
2014-05-07 22:08:33 +04:00
w . Write ( buf . Bytes ( ) )
}
2013-01-19 17:48:30 +04:00
2015-09-17 14:06:43 +03:00
func ( r * registry ) writePB ( encoder expfmt . Encoder ) error {
2014-05-07 22:08:33 +04:00
var metricHashes map [ uint64 ] struct { }
if r . collectChecksEnabled {
metricHashes = make ( map [ uint64 ] struct { } )
}
metricChan := make ( chan Metric , capMetricChan )
wg := sync . WaitGroup { }
2015-02-01 00:08:48 +03:00
r . mtx . RLock ( )
metricFamiliesByName := make ( map [ string ] * dto . MetricFamily , len ( r . dimHashesByName ) )
2014-05-07 22:08:33 +04:00
// Scatter.
// (Collectors could be complex and slow, so we call them all at once.)
wg . Add ( len ( r . collectorsByID ) )
go func ( ) {
wg . Wait ( )
close ( metricChan )
} ( )
for _ , collector := range r . collectorsByID {
go func ( collector Collector ) {
defer wg . Done ( )
collector . Collect ( metricChan )
} ( collector )
}
r . mtx . RUnlock ( )
2014-12-19 15:54:04 +03:00
// Drain metricChan in case of premature return.
defer func ( ) {
2015-01-09 16:49:17 +03:00
for _ = range metricChan {
2014-12-19 15:54:04 +03:00
}
} ( )
2014-05-07 22:08:33 +04:00
// Gather.
for metric := range metricChan {
// This could be done concurrently, too, but it required locking
// of metricFamiliesByName (and of metricHashes if checks are
// enabled). Most likely not worth it.
desc := metric . Desc ( )
metricFamily , ok := metricFamiliesByName [ desc . fqName ]
if ! ok {
metricFamily = r . getMetricFamily ( )
defer r . giveMetricFamily ( metricFamily )
metricFamily . Name = proto . String ( desc . fqName )
metricFamily . Help = proto . String ( desc . help )
metricFamiliesByName [ desc . fqName ] = metricFamily
}
dtoMetric := r . getMetric ( )
defer r . giveMetric ( dtoMetric )
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
if err := metric . Write ( dtoMetric ) ; err != nil {
// TODO: Consider different means of error reporting so
// that a single erroneous metric could be skipped
// instead of blowing up the whole collection.
2015-09-17 14:06:43 +03:00
return fmt . Errorf ( "error collecting metric %v: %s" , desc , err )
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
}
2014-05-07 22:08:33 +04:00
switch {
case metricFamily . Type != nil :
// Type already set. We are good.
case dtoMetric . Gauge != nil :
metricFamily . Type = dto . MetricType_GAUGE . Enum ( )
case dtoMetric . Counter != nil :
metricFamily . Type = dto . MetricType_COUNTER . Enum ( )
case dtoMetric . Summary != nil :
metricFamily . Type = dto . MetricType_SUMMARY . Enum ( )
case dtoMetric . Untyped != nil :
metricFamily . Type = dto . MetricType_UNTYPED . Enum ( )
2015-02-20 21:08:36 +03:00
case dtoMetric . Histogram != nil :
metricFamily . Type = dto . MetricType_HISTOGRAM . Enum ( )
2014-05-07 22:08:33 +04:00
default :
2015-09-17 14:06:43 +03:00
return fmt . Errorf ( "empty metric collected: %s" , dtoMetric )
2014-05-07 22:08:33 +04:00
}
if r . collectChecksEnabled {
if err := r . checkConsistency ( metricFamily , dtoMetric , desc , metricHashes ) ; err != nil {
2015-09-17 14:06:43 +03:00
return err
2014-05-07 22:08:33 +04:00
}
}
metricFamily . Metric = append ( metricFamily . Metric , dtoMetric )
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
if r . metricFamilyInjectionHook != nil {
for _ , mf := range r . metricFamilyInjectionHook ( ) {
2015-03-15 17:47:56 +03:00
existingMF , exists := metricFamiliesByName [ mf . GetName ( ) ]
if ! exists {
metricFamiliesByName [ mf . GetName ( ) ] = mf
if r . collectChecksEnabled {
for _ , m := range mf . Metric {
if err := r . checkConsistency ( mf , m , nil , metricHashes ) ; err != nil {
2015-09-17 14:06:43 +03:00
return err
2015-03-15 17:47:56 +03:00
}
}
}
continue
}
for _ , m := range mf . Metric {
if r . collectChecksEnabled {
if err := r . checkConsistency ( existingMF , m , nil , metricHashes ) ; err != nil {
2015-09-17 14:06:43 +03:00
return err
2015-03-15 17:47:56 +03:00
}
}
existingMF . Metric = append ( existingMF . Metric , m )
2013-01-19 17:48:30 +04:00
}
}
}
2014-05-07 22:08:33 +04:00
// Now that MetricFamilies are all set, sort their Metrics
// lexicographically by their label values.
for _ , mf := range metricFamiliesByName {
sort . Sort ( metricSorter ( mf . Metric ) )
}
// Write out MetricFamilies sorted by their name.
names := make ( [ ] string , 0 , len ( metricFamiliesByName ) )
for name := range metricFamiliesByName {
names = append ( names , name )
}
sort . Strings ( names )
for _ , name := range names {
2015-09-17 14:06:43 +03:00
if err := encoder . Encode ( metricFamiliesByName [ name ] ) ; err != nil {
return err
2014-05-07 22:08:33 +04:00
}
}
2015-09-17 14:06:43 +03:00
return nil
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) checkConsistency ( metricFamily * dto . MetricFamily , dtoMetric * dto . Metric , desc * Desc , metricHashes map [ uint64 ] struct { } ) error {
// Type consistency with metric family.
if metricFamily . GetType ( ) == dto . MetricType_GAUGE && dtoMetric . Gauge == nil ||
metricFamily . GetType ( ) == dto . MetricType_COUNTER && dtoMetric . Counter == nil ||
metricFamily . GetType ( ) == dto . MetricType_SUMMARY && dtoMetric . Summary == nil ||
2015-06-08 23:12:07 +03:00
metricFamily . GetType ( ) == dto . MetricType_HISTOGRAM && dtoMetric . Histogram == nil ||
2014-05-07 22:08:33 +04:00
metricFamily . GetType ( ) == dto . MetricType_UNTYPED && dtoMetric . Untyped == nil {
return fmt . Errorf (
2015-06-08 23:12:07 +03:00
"collected metric %s %s is not a %s" ,
metricFamily . GetName ( ) , dtoMetric , metricFamily . GetType ( ) ,
2014-05-07 22:08:33 +04:00
)
}
2012-05-20 01:59:25 +04:00
2015-03-15 17:47:56 +03:00
// Is the metric unique (i.e. no other metric with the same name and the same label values)?
2015-11-12 15:55:12 +03:00
h := hashNew ( )
h = hashAdd ( h , metricFamily . GetName ( ) )
h = hashAddByte ( h , separatorByte )
2015-06-08 23:12:07 +03:00
// Make sure label pairs are sorted. We depend on it for the consistency
// check. Label pairs must be sorted by contract. But the point of this
// method is to check for contract violations. So we better do the sort
// now.
sort . Sort ( LabelPairSorter ( dtoMetric . Label ) )
2015-03-15 17:47:56 +03:00
for _ , lp := range dtoMetric . Label {
2015-11-12 15:55:12 +03:00
h = hashAdd ( h , lp . GetValue ( ) )
h = hashAddByte ( h , separatorByte )
2015-03-15 17:47:56 +03:00
}
2015-11-12 15:55:12 +03:00
if _ , exists := metricHashes [ h ] ; exists {
2015-03-15 17:47:56 +03:00
return fmt . Errorf (
2015-06-08 23:12:07 +03:00
"collected metric %s %s was collected before with the same name and label values" ,
metricFamily . GetName ( ) , dtoMetric ,
2015-03-15 17:47:56 +03:00
)
}
2015-11-12 15:55:12 +03:00
metricHashes [ h ] = struct { } { }
2015-03-15 17:47:56 +03:00
if desc == nil {
return nil // Nothing left to check if we have no desc.
}
2014-05-07 22:08:33 +04:00
// Desc consistency with metric family.
2015-03-15 17:47:56 +03:00
if metricFamily . GetName ( ) != desc . fqName {
return fmt . Errorf (
2015-06-08 23:12:07 +03:00
"collected metric %s %s has name %q but should have %q" ,
metricFamily . GetName ( ) , dtoMetric , metricFamily . GetName ( ) , desc . fqName ,
2015-03-15 17:47:56 +03:00
)
}
2014-05-07 22:08:33 +04:00
if metricFamily . GetHelp ( ) != desc . help {
return fmt . Errorf (
2015-06-08 23:12:07 +03:00
"collected metric %s %s has help %q but should have %q" ,
metricFamily . GetName ( ) , dtoMetric , metricFamily . GetHelp ( ) , desc . help ,
2014-05-07 22:08:33 +04:00
)
}
2013-09-11 19:22:00 +04:00
2014-05-07 22:08:33 +04:00
// Is the desc consistent with the content of the metric?
lpsFromDesc := make ( [ ] * dto . LabelPair , 0 , len ( dtoMetric . Label ) )
lpsFromDesc = append ( lpsFromDesc , desc . constLabelPairs ... )
for _ , l := range desc . variableLabels {
lpsFromDesc = append ( lpsFromDesc , & dto . LabelPair {
Name : proto . String ( l ) ,
} )
}
if len ( lpsFromDesc ) != len ( dtoMetric . Label ) {
return fmt . Errorf (
2015-06-08 23:12:07 +03:00
"labels in collected metric %s %s are inconsistent with descriptor %s" ,
metricFamily . GetName ( ) , dtoMetric , desc ,
2014-05-07 22:08:33 +04:00
)
}
sort . Sort ( LabelPairSorter ( lpsFromDesc ) )
for i , lpFromDesc := range lpsFromDesc {
lpFromMetric := dtoMetric . Label [ i ]
if lpFromDesc . GetName ( ) != lpFromMetric . GetName ( ) ||
lpFromDesc . Value != nil && lpFromDesc . GetValue ( ) != lpFromMetric . GetValue ( ) {
return fmt . Errorf (
2015-06-08 23:12:07 +03:00
"labels in collected metric %s %s are inconsistent with descriptor %s" ,
metricFamily . GetName ( ) , dtoMetric , desc ,
2014-05-07 22:08:33 +04:00
)
2013-09-11 19:22:00 +04:00
}
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
r . mtx . RLock ( ) // Remaining checks need the read lock.
defer r . mtx . RUnlock ( )
// Is the desc registered?
if _ , exist := r . descIDs [ desc . id ] ; ! exist {
2015-06-08 23:12:07 +03:00
return fmt . Errorf (
"collected metric %s %s with unregistered descriptor %s" ,
metricFamily . GetName ( ) , dtoMetric , desc ,
)
2012-05-20 01:59:25 +04:00
}
2013-01-19 17:48:30 +04:00
2013-06-27 20:46:16 +04:00
return nil
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) getBuf ( ) * bytes . Buffer {
select {
case buf := <- r . bufPool :
return buf
default :
return & bytes . Buffer { }
}
2013-01-08 19:56:19 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) giveBuf ( buf * bytes . Buffer ) {
buf . Reset ( )
select {
case r . bufPool <- buf :
default :
2013-01-19 17:48:30 +04:00
}
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) getMetricFamily ( ) * dto . MetricFamily {
select {
case mf := <- r . metricFamilyPool :
return mf
default :
return & dto . MetricFamily { }
}
2013-02-12 05:36:06 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) giveMetricFamily ( mf * dto . MetricFamily ) {
mf . Reset ( )
select {
case r . metricFamilyPool <- mf :
default :
}
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) getMetric ( ) * dto . Metric {
select {
case m := <- r . metricPool :
return m
default :
return & dto . Metric { }
}
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) giveMetric ( m * dto . Metric ) {
m . Reset ( )
select {
case r . metricPool <- m :
default :
}
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
func newRegistry ( ) * registry {
return & registry {
collectorsByID : map [ uint64 ] Collector { } ,
descIDs : map [ uint64 ] struct { } { } ,
dimHashesByName : map [ string ] uint64 { } ,
bufPool : make ( chan * bytes . Buffer , numBufs ) ,
metricFamilyPool : make ( chan * dto . MetricFamily , numMetricFamilies ) ,
metricPool : make ( chan * dto . Metric , numMetrics ) ,
}
}
2013-06-27 20:46:16 +04:00
2014-12-11 04:05:35 +03:00
func newDefaultRegistry ( ) * registry {
r := newRegistry ( )
r . Register ( NewProcessCollector ( os . Getpid ( ) , "" ) )
r . Register ( NewGoCollector ( ) )
return r
}
2014-07-04 19:08:57 +04:00
// decorateWriter wraps a writer to handle gzip compression if requested. It
// returns the decorated writer and the appropriate "Content-Encoding" header
// (which is empty if no compression is enabled).
func decorateWriter ( request * http . Request , writer io . Writer ) ( io . Writer , string ) {
header := request . Header . Get ( acceptEncodingHeader )
parts := strings . Split ( header , "," )
for _ , part := range parts {
part := strings . TrimSpace ( part )
if part == "gzip" || strings . HasPrefix ( part , "gzip;" ) {
return gzip . NewWriter ( writer ) , "gzip"
}
}
return writer , ""
}
2014-05-07 22:08:33 +04:00
type metricSorter [ ] * dto . Metric
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( s metricSorter ) Len ( ) int {
return len ( s )
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( s metricSorter ) Swap ( i , j int ) {
s [ i ] , s [ j ] = s [ j ] , s [ i ]
}
2012-05-21 12:23:22 +04:00
2014-05-07 22:08:33 +04:00
func ( s metricSorter ) Less ( i , j int ) bool {
2015-03-15 17:47:56 +03:00
if len ( s [ i ] . Label ) != len ( s [ j ] . Label ) {
// This should not happen. The metrics are
// inconsistent. However, we have to deal with the fact, as
// people might use custom collectors or metric family injection
// to create inconsistent metrics. So let's simply compare the
// number of labels in this case. That will still yield
// reproducible sorting.
return len ( s [ i ] . Label ) < len ( s [ j ] . Label )
}
2014-05-07 22:08:33 +04:00
for n , lp := range s [ i ] . Label {
vi := lp . GetValue ( )
vj := s [ j ] . Label [ n ] . GetValue ( )
if vi != vj {
return vi < vj
2012-05-20 01:59:25 +04:00
}
}
2015-11-10 01:14:07 +03:00
2015-11-10 17:37:58 +03:00
// We should never arrive here. Multiple metrics with the same
// label set in the same scrape will lead to undefined ingestion
// behavior. However, as above, we have to provide stable sorting
// here, even for inconsistent metrics. So sort equal metrics
// by their timestamp, with missing timestamps (implying "now")
// coming last.
if s [ i ] . TimestampMs == nil {
2015-11-10 01:14:07 +03:00
return false
2015-11-10 17:37:58 +03:00
}
if s [ j ] . TimestampMs == nil {
2015-11-10 01:14:07 +03:00
return true
2015-11-09 16:36:26 +03:00
}
2015-11-10 17:37:58 +03:00
return s [ i ] . GetTimestampMs ( ) < s [ j ] . GetTimestampMs ( )
2012-05-20 01:59:25 +04:00
}