2015-02-02 17:14:36 +03:00
// Copyright 2014 The Prometheus Authors
2014-05-07 22:08:33 +04:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2015-02-02 17:14:36 +03:00
// Copyright (c) 2013, The Prometheus Authors
2013-02-12 05:36:06 +04:00
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be found
// in the LICENSE file.
2012-05-20 01:59:25 +04:00
2013-04-03 20:33:32 +04:00
package prometheus
2012-05-20 01:59:25 +04:00
import (
2014-05-07 22:08:33 +04:00
"bytes"
2014-12-11 04:05:35 +03:00
"compress/gzip"
2014-05-07 22:08:33 +04:00
"errors"
2013-01-19 17:48:30 +04:00
"fmt"
2014-05-07 22:08:33 +04:00
"hash/fnv"
2013-01-19 17:48:30 +04:00
"io"
2012-05-20 01:59:25 +04:00
"net/http"
2014-07-01 22:20:42 +04:00
"net/url"
2014-12-11 04:05:35 +03:00
"os"
2013-01-19 17:48:30 +04:00
"sort"
2014-07-04 19:08:57 +04:00
"strings"
2012-05-20 01:59:25 +04:00
"sync"
2013-06-27 20:46:16 +04:00
dto "github.com/prometheus/client_model/go"
2015-02-14 02:00:34 +03:00
"github.com/golang/protobuf/proto"
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
"github.com/prometheus/client_golang/_vendor/goautoneg"
2014-06-26 15:58:15 +04:00
"github.com/prometheus/client_golang/model"
2014-04-23 15:40:37 +04:00
"github.com/prometheus/client_golang/text"
2012-05-20 01:59:25 +04:00
)
2014-05-07 22:08:33 +04:00
var (
2014-12-11 04:05:35 +03:00
defRegistry = newDefaultRegistry ( )
2014-05-07 22:08:33 +04:00
errAlreadyReg = errors . New ( "duplicate metrics collector registration attempted" )
)
// Constants relevant to the HTTP interface.
2012-05-24 22:02:44 +04:00
const (
2014-05-07 22:08:33 +04:00
// APIVersion is the version of the format of the exported data. This
// will match this library's version, which subscribes to the Semantic
// Versioning scheme.
APIVersion = "0.0.4"
// DelimitedTelemetryContentType is the content type set on telemetry
// data responses in delimited protobuf format.
2014-07-15 17:34:52 +04:00
DelimitedTelemetryContentType = ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited `
2014-05-07 22:08:33 +04:00
// TextTelemetryContentType is the content type set on telemetry data
// responses in text format.
TextTelemetryContentType = ` text/plain; version= ` + APIVersion
// ProtoTextTelemetryContentType is the content type set on telemetry
// data responses in protobuf text format. (Only used for debugging.)
2014-07-15 17:34:52 +04:00
ProtoTextTelemetryContentType = ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=text `
2014-05-07 22:08:33 +04:00
// ProtoCompactTextTelemetryContentType is the content type set on
// telemetry data responses in protobuf compact text format. (Only used
// for debugging.)
2014-07-15 17:34:52 +04:00
ProtoCompactTextTelemetryContentType = ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=compact-text `
2014-05-07 22:08:33 +04:00
// Constants for object pools.
numBufs = 4
numMetricFamilies = 1000
numMetrics = 10000
// Capacity for the channel to collect metrics and descriptors.
capMetricChan = 1000
capDescChan = 10
2014-07-04 19:08:57 +04:00
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
acceptHeader = "Accept"
2013-02-12 05:36:06 +04:00
)
2012-05-20 01:59:25 +04:00
2014-05-07 22:08:33 +04:00
// Handler returns the HTTP handler for the global Prometheus registry. It is
// already instrumented with InstrumentHandler (using "prometheus" as handler
// name). Usually the handler is used to handle the "/metrics" endpoint.
func Handler ( ) http . Handler {
return InstrumentHandler ( "prometheus" , defRegistry )
}
2014-04-23 15:40:37 +04:00
2014-05-07 22:08:33 +04:00
// UninstrumentedHandler works in the same way as Handler, but the returned HTTP
// handler is not instrumented. This is useful if no instrumentation is desired
// (for whatever reason) or if the instrumentation has to happen with a
// different handler name (or with a different instrumentation approach
// altogether). See the InstrumentHandler example.
func UninstrumentedHandler ( ) http . Handler {
return defRegistry
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
// Register registers a new Collector to be included in metrics collection. It
// returns an error if the descriptors provided by the Collector are invalid or
// if they - in combination with descriptors of already registered Collectors -
// do not fulfill the consistency and uniqueness criteria described in the Desc
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
// documentation.
2014-05-07 22:08:33 +04:00
//
// Do not register the same Collector multiple times concurrently. (Registering
// the same Collector twice would result in an error anyway, but on top of that,
// it is not safe to do so concurrently.)
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
func Register ( m Collector ) error {
_ , err := defRegistry . Register ( m )
return err
2014-05-07 22:08:33 +04:00
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
// MustRegister works like Register but panics where Register would have
// returned an error.
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
func MustRegister ( m Collector ) {
err := Register ( m )
2014-05-07 22:08:33 +04:00
if err != nil {
panic ( err )
}
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
// RegisterOrGet works like Register but does not return an error if a Collector
// is registered that equals a previously registered Collector. (Two Collectors
// are considered equal if their Describe method yields the same set of
// descriptors.) Instead, the previously registered Collector is returned (which
// is helpful if the new and previously registered Collectors are equal but not
// identical, i.e. not pointers to the same object).
//
// As for Register, it is still not safe to call RegisterOrGet with the same
// Collector multiple times concurrently.
func RegisterOrGet ( m Collector ) ( Collector , error ) {
return defRegistry . RegisterOrGet ( m )
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
// MustRegisterOrGet works like Register but panics where RegisterOrGet would
// have returned an error.
func MustRegisterOrGet ( m Collector ) Collector {
existing , err := RegisterOrGet ( m )
if err != nil {
panic ( err )
}
return existing
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
// Unregister unregisters the Collector that equals the Collector passed in as
// an argument. (Two Collectors are considered equal if their Describe method
// yields the same set of descriptors.) The function returns whether a Collector
// was unregistered.
func Unregister ( c Collector ) bool {
return defRegistry . Unregister ( c )
2013-02-18 08:09:21 +04:00
}
2014-05-07 22:08:33 +04:00
// SetMetricFamilyInjectionHook sets a function that is called whenever metrics
// are collected. The hook function must be set before metrics collection begins
// (i.e. call SetMetricFamilyInjectionHook before setting the HTTP handler.) The
// MetricFamily protobufs returned by the hook function are added to the
// delivered metrics. Each returned MetricFamily must have a unique name (also
// taking into account the MetricFamilies created in the regular way).
2013-02-12 05:36:06 +04:00
//
2014-05-07 22:08:33 +04:00
// This is a way to directly inject MetricFamily protobufs managed and owned by
// the caller. The caller has full responsibility. No sanity checks are
// performed on the returned protobufs (besides the name checks described
// above). The function must be callable at any time and concurrently.
func SetMetricFamilyInjectionHook ( hook func ( ) [ ] * dto . MetricFamily ) {
defRegistry . metricFamilyInjectionHook = hook
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
// PanicOnCollectError sets the behavior whether a panic is caused upon an error
2015-02-19 17:34:04 +03:00
// while metrics are collected and served to the HTTP endpoint. By default, an
2014-05-07 22:08:33 +04:00
// internal server error (status code 500) is served with an error message.
func PanicOnCollectError ( b bool ) {
defRegistry . panicOnCollectError = b
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
// EnableCollectChecks enables (or disables) additional consistency checks
2014-06-23 13:45:49 +04:00
// during metrics collection. These additional checks are not enabled by default
// because they inflict a performance penalty and the errors they check for can
// only happen if the used Metric and Collector types have internal programming
// errors. It can be helpful to enable these checks while working with custom
// Collectors or Metrics whose correctness is not well established yet.
2014-05-07 22:08:33 +04:00
func EnableCollectChecks ( b bool ) {
defRegistry . collectChecksEnabled = b
2013-01-19 17:48:30 +04:00
}
2014-07-01 22:20:42 +04:00
// Push triggers a metric collection and pushes all collected metrics to the
// Pushgateway specified by addr. See the Pushgateway documentation for detailed
// implications of the job and instance parameter. instance can be left
// empty. The Pushgateway will then use the client's IP number instead. Use just
// host:port or ip:port ass addr. (Don't add 'http://' or any path.)
//
// Note that all previously pushed metrics with the same job and instance will
// be replaced with the metrics pushed by this call. (It uses HTTP method 'PUT'
// to push to the Pushgateway.)
func Push ( job , instance , addr string ) error {
return defRegistry . Push ( job , instance , addr , "PUT" )
}
// PushAdd works like Push, but only previously pushed metrics with the same
// name (and the same job and instance) will be replaced. (It uses HTTP method
// 'POST' to push to the Pushgateway.)
func PushAdd ( job , instance , addr string ) error {
return defRegistry . Push ( job , instance , addr , "POST" )
}
2014-05-07 22:08:33 +04:00
// encoder is a function that writes a dto.MetricFamily to an io.Writer in a
// certain encoding. It returns the number of bytes written and any error
// encountered. Note that ext.WriteDelimited and text.MetricFamilyToText are
// encoders.
type encoder func ( io . Writer , * dto . MetricFamily ) ( int , error )
type registry struct {
mtx sync . RWMutex
collectorsByID map [ uint64 ] Collector // ID is a hash of the descIDs.
descIDs map [ uint64 ] struct { }
dimHashesByName map [ string ] uint64
bufPool chan * bytes . Buffer
metricFamilyPool chan * dto . MetricFamily
metricPool chan * dto . Metric
metricFamilyInjectionHook func ( ) [ ] * dto . MetricFamily
panicOnCollectError , collectChecksEnabled bool
2014-04-02 15:31:22 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) Register ( c Collector ) ( Collector , error ) {
descChan := make ( chan * Desc , capDescChan )
go func ( ) {
c . Describe ( descChan )
close ( descChan )
} ( )
newDescIDs := map [ uint64 ] struct { } { }
newDimHashesByName := map [ string ] uint64 { }
2014-06-26 15:58:15 +04:00
var collectorID uint64 // Just a sum of all desc IDs.
2014-05-07 22:08:33 +04:00
var duplicateDescErr error
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
// Coduct various tests...
for desc := range descChan {
// Is the descriptor valid at all?
if desc . err != nil {
return c , fmt . Errorf ( "descriptor %s is invalid: %s" , desc , desc . err )
}
2013-04-19 16:11:01 +04:00
2014-05-07 22:08:33 +04:00
// Is the descID unique?
// (In other words: Is the fqName + constLabel combination unique?)
if _ , exists := r . descIDs [ desc . id ] ; exists {
duplicateDescErr = fmt . Errorf ( "descriptor %s already exists with the same fully-qualified name and const label values" , desc )
}
// If it is not a duplicate desc in this collector, add it to
2014-06-26 15:58:15 +04:00
// the collectorID. (We allow duplicate descs within the same
2014-05-07 22:08:33 +04:00
// collector, but their existence must be a no-op.)
if _ , exists := newDescIDs [ desc . id ] ; ! exists {
newDescIDs [ desc . id ] = struct { } { }
2014-06-26 15:58:15 +04:00
collectorID += desc . id
2014-05-07 22:08:33 +04:00
}
2013-04-19 16:11:01 +04:00
2014-05-07 22:08:33 +04:00
// Are all the label names and the help string consistent with
// previous descriptors of the same name?
// First check existing descriptors...
if dimHash , exists := r . dimHashesByName [ desc . fqName ] ; exists {
if dimHash != desc . dimHash {
return nil , fmt . Errorf ( "a previously registered descriptor with the same fully-qualified name as %s has different label names or a different help string" , desc )
}
} else {
// ...then check the new descriptors already seen.
if dimHash , exists := newDimHashesByName [ desc . fqName ] ; exists {
if dimHash != desc . dimHash {
return nil , fmt . Errorf ( "descriptors reported by collector have inconsistent label names or help strings for the same fully-qualified name, offender is %s" , desc )
}
} else {
newDimHashesByName [ desc . fqName ] = desc . dimHash
}
}
}
// Did anything happen at all?
if len ( newDescIDs ) == 0 {
return nil , errors . New ( "collector has no descriptors" )
}
if existing , exists := r . collectorsByID [ collectorID ] ; exists {
return existing , errAlreadyReg
}
// If the collectorID is new, but at least one of the descs existed
// before, we are in trouble.
if duplicateDescErr != nil {
return nil , duplicateDescErr
}
2013-04-19 16:11:01 +04:00
2014-05-07 22:08:33 +04:00
// Only after all tests have passed, actually register.
r . collectorsByID [ collectorID ] = c
for hash := range newDescIDs {
r . descIDs [ hash ] = struct { } { }
}
for name , dimHash := range newDimHashesByName {
r . dimHashesByName [ name ] = dimHash
}
return c , nil
2013-04-19 16:11:01 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) RegisterOrGet ( m Collector ) ( Collector , error ) {
existing , err := r . Register ( m )
if err != nil && err != errAlreadyReg {
return nil , err
}
return existing , nil
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) Unregister ( c Collector ) bool {
descChan := make ( chan * Desc , capDescChan )
go func ( ) {
c . Describe ( descChan )
close ( descChan )
} ( )
descIDs := map [ uint64 ] struct { } { }
2014-06-26 15:58:15 +04:00
var collectorID uint64 // Just a sum of the desc IDs.
2014-05-07 22:08:33 +04:00
for desc := range descChan {
if _ , exists := descIDs [ desc . id ] ; ! exists {
2014-06-26 15:58:15 +04:00
collectorID += desc . id
2014-05-07 22:08:33 +04:00
descIDs [ desc . id ] = struct { } { }
2013-01-19 17:48:30 +04:00
}
}
2014-05-07 22:08:33 +04:00
r . mtx . RLock ( )
if _ , exists := r . collectorsByID [ collectorID ] ; ! exists {
r . mtx . RUnlock ( )
return false
}
r . mtx . RUnlock ( )
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
r . mtx . Lock ( )
defer r . mtx . Unlock ( )
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
delete ( r . collectorsByID , collectorID )
for id := range descIDs {
delete ( r . descIDs , id )
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
// dimHashesByName is left untouched as those must be consistent
// throughout the lifetime of a program.
return true
}
2013-01-19 17:48:30 +04:00
2014-07-01 22:20:42 +04:00
func ( r * registry ) Push ( job , instance , addr , method string ) error {
u := fmt . Sprintf ( "http://%s/metrics/jobs/%s" , addr , url . QueryEscape ( job ) )
if instance != "" {
u += "/instances/" + url . QueryEscape ( instance )
}
buf := r . getBuf ( )
defer r . giveBuf ( buf )
if _ , err := r . writePB ( buf , text . WriteProtoDelimited ) ; err != nil {
if r . panicOnCollectError {
panic ( err )
}
return err
}
req , err := http . NewRequest ( method , u , buf )
2014-07-30 21:02:39 +04:00
if err != nil {
return err
}
2014-07-02 20:57:34 +04:00
req . Header . Set ( contentTypeHeader , DelimitedTelemetryContentType )
2014-07-01 22:20:42 +04:00
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return err
}
defer resp . Body . Close ( )
if resp . StatusCode != 202 {
return fmt . Errorf ( "unexpected status code %d while pushing to %s" , resp . StatusCode , u )
}
return nil
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) ServeHTTP ( w http . ResponseWriter , req * http . Request ) {
enc , contentType := chooseEncoder ( req )
buf := r . getBuf ( )
defer r . giveBuf ( buf )
2014-07-04 19:08:57 +04:00
writer , encoding := decorateWriter ( req , buf )
if _ , err := r . writePB ( writer , enc ) ; err != nil {
2014-05-07 22:08:33 +04:00
if r . panicOnCollectError {
2013-01-19 17:48:30 +04:00
panic ( err )
}
2014-06-20 22:40:48 +04:00
http . Error ( w , "An error has occurred:\n\n" + err . Error ( ) , http . StatusInternalServerError )
2014-05-07 22:08:33 +04:00
return
2013-01-19 17:48:30 +04:00
}
2014-07-04 19:08:57 +04:00
if closer , ok := writer . ( io . Closer ) ; ok {
closer . Close ( )
}
2014-07-02 20:57:34 +04:00
header := w . Header ( )
2014-05-07 22:08:33 +04:00
header . Set ( contentTypeHeader , contentType )
2014-07-02 20:57:34 +04:00
header . Set ( contentLengthHeader , fmt . Sprint ( buf . Len ( ) ) )
2014-07-04 19:08:57 +04:00
if encoding != "" {
header . Set ( contentEncodingHeader , encoding )
}
2014-05-07 22:08:33 +04:00
w . Write ( buf . Bytes ( ) )
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) writePB ( w io . Writer , writeEncoded encoder ) ( int , error ) {
var metricHashes map [ uint64 ] struct { }
if r . collectChecksEnabled {
metricHashes = make ( map [ uint64 ] struct { } )
}
metricChan := make ( chan Metric , capMetricChan )
wg := sync . WaitGroup { }
2015-02-01 00:08:48 +03:00
r . mtx . RLock ( )
metricFamiliesByName := make ( map [ string ] * dto . MetricFamily , len ( r . dimHashesByName ) )
2014-05-07 22:08:33 +04:00
// Scatter.
// (Collectors could be complex and slow, so we call them all at once.)
wg . Add ( len ( r . collectorsByID ) )
go func ( ) {
wg . Wait ( )
close ( metricChan )
} ( )
for _ , collector := range r . collectorsByID {
go func ( collector Collector ) {
defer wg . Done ( )
collector . Collect ( metricChan )
} ( collector )
}
r . mtx . RUnlock ( )
2014-12-19 15:54:04 +03:00
// Drain metricChan in case of premature return.
defer func ( ) {
2015-01-09 16:49:17 +03:00
for _ = range metricChan {
2014-12-19 15:54:04 +03:00
}
} ( )
2014-05-07 22:08:33 +04:00
// Gather.
for metric := range metricChan {
// This could be done concurrently, too, but it required locking
// of metricFamiliesByName (and of metricHashes if checks are
// enabled). Most likely not worth it.
desc := metric . Desc ( )
metricFamily , ok := metricFamiliesByName [ desc . fqName ]
if ! ok {
metricFamily = r . getMetricFamily ( )
defer r . giveMetricFamily ( metricFamily )
metricFamily . Name = proto . String ( desc . fqName )
metricFamily . Help = proto . String ( desc . help )
metricFamiliesByName [ desc . fqName ] = metricFamily
}
dtoMetric := r . getMetric ( )
defer r . giveMetric ( dtoMetric )
Allow error reporting during metrics collection and simplify Register().
Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.
The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).
WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).
Explaining the approach in more detail:
- During registration / describe: Error handling was actually already
in place (for invalid descriptors, which carry an error anyway). I
only added a convenience function to create an invalid descriptor
with a given error on purpose.
- Metrics are now treated in a similar way. The Write method returns
an error now (the only change in interface). An "invalid metric" is
provided that can be sent via the channel to signal that that metric
could not be collected. It alse transports an error.
NON-GOALS OF THIS COMMIT:
This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
2015-01-12 21:16:09 +03:00
if err := metric . Write ( dtoMetric ) ; err != nil {
// TODO: Consider different means of error reporting so
// that a single erroneous metric could be skipped
// instead of blowing up the whole collection.
return 0 , fmt . Errorf ( "error collecting metric %v: %s" , desc , err )
}
2014-05-07 22:08:33 +04:00
switch {
case metricFamily . Type != nil :
// Type already set. We are good.
case dtoMetric . Gauge != nil :
metricFamily . Type = dto . MetricType_GAUGE . Enum ( )
case dtoMetric . Counter != nil :
metricFamily . Type = dto . MetricType_COUNTER . Enum ( )
case dtoMetric . Summary != nil :
metricFamily . Type = dto . MetricType_SUMMARY . Enum ( )
case dtoMetric . Untyped != nil :
metricFamily . Type = dto . MetricType_UNTYPED . Enum ( )
default :
return 0 , fmt . Errorf ( "empty metric collected: %s" , dtoMetric )
}
if r . collectChecksEnabled {
if err := r . checkConsistency ( metricFamily , dtoMetric , desc , metricHashes ) ; err != nil {
return 0 , err
}
}
metricFamily . Metric = append ( metricFamily . Metric , dtoMetric )
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
if r . metricFamilyInjectionHook != nil {
for _ , mf := range r . metricFamilyInjectionHook ( ) {
if _ , exists := metricFamiliesByName [ mf . GetName ( ) ] ; exists {
return 0 , fmt . Errorf ( "metric family with duplicate name injected: %s" , mf )
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
metricFamiliesByName [ mf . GetName ( ) ] = mf
2013-01-19 17:48:30 +04:00
}
}
2014-05-07 22:08:33 +04:00
// Now that MetricFamilies are all set, sort their Metrics
// lexicographically by their label values.
for _ , mf := range metricFamiliesByName {
sort . Sort ( metricSorter ( mf . Metric ) )
}
// Write out MetricFamilies sorted by their name.
names := make ( [ ] string , 0 , len ( metricFamiliesByName ) )
for name := range metricFamiliesByName {
names = append ( names , name )
}
sort . Strings ( names )
var written int
for _ , name := range names {
w , err := writeEncoded ( w , metricFamiliesByName [ name ] )
written += w
if err != nil {
return written , err
}
}
return written , nil
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) checkConsistency ( metricFamily * dto . MetricFamily , dtoMetric * dto . Metric , desc * Desc , metricHashes map [ uint64 ] struct { } ) error {
// Type consistency with metric family.
if metricFamily . GetType ( ) == dto . MetricType_GAUGE && dtoMetric . Gauge == nil ||
metricFamily . GetType ( ) == dto . MetricType_COUNTER && dtoMetric . Counter == nil ||
metricFamily . GetType ( ) == dto . MetricType_SUMMARY && dtoMetric . Summary == nil ||
metricFamily . GetType ( ) == dto . MetricType_UNTYPED && dtoMetric . Untyped == nil {
return fmt . Errorf (
"collected metric %q is not a %s" ,
dtoMetric , metricFamily . Type ,
)
}
2012-05-20 01:59:25 +04:00
2014-05-07 22:08:33 +04:00
// Desc consistency with metric family.
if metricFamily . GetHelp ( ) != desc . help {
return fmt . Errorf (
"collected metric %q has help %q but should have %q" ,
dtoMetric , desc . help , metricFamily . GetHelp ( ) ,
)
}
2013-09-11 19:22:00 +04:00
2014-05-07 22:08:33 +04:00
// Is the desc consistent with the content of the metric?
lpsFromDesc := make ( [ ] * dto . LabelPair , 0 , len ( dtoMetric . Label ) )
lpsFromDesc = append ( lpsFromDesc , desc . constLabelPairs ... )
for _ , l := range desc . variableLabels {
lpsFromDesc = append ( lpsFromDesc , & dto . LabelPair {
Name : proto . String ( l ) ,
} )
}
if len ( lpsFromDesc ) != len ( dtoMetric . Label ) {
return fmt . Errorf (
"labels in collected metric %q are inconsistent with descriptor %s" ,
dtoMetric , desc ,
)
}
sort . Sort ( LabelPairSorter ( lpsFromDesc ) )
for i , lpFromDesc := range lpsFromDesc {
lpFromMetric := dtoMetric . Label [ i ]
if lpFromDesc . GetName ( ) != lpFromMetric . GetName ( ) ||
lpFromDesc . Value != nil && lpFromDesc . GetValue ( ) != lpFromMetric . GetValue ( ) {
return fmt . Errorf (
"labels in collected metric %q are inconsistent with descriptor %s" ,
dtoMetric , desc ,
)
2013-09-11 19:22:00 +04:00
}
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
// Is the metric unique (i.e. no other metric with the same name and the same label values)?
h := fnv . New64a ( )
var buf bytes . Buffer
buf . WriteString ( desc . fqName )
2014-06-26 15:58:15 +04:00
buf . WriteByte ( model . SeparatorByte )
2014-05-07 22:08:33 +04:00
h . Write ( buf . Bytes ( ) )
for _ , lp := range dtoMetric . Label {
buf . Reset ( )
buf . WriteString ( lp . GetValue ( ) )
2014-06-26 15:58:15 +04:00
buf . WriteByte ( model . SeparatorByte )
2014-05-07 22:08:33 +04:00
h . Write ( buf . Bytes ( ) )
}
metricHash := h . Sum64 ( )
if _ , exists := metricHashes [ metricHash ] ; exists {
return fmt . Errorf (
"collected metric %q was collected before with the same name and label values" ,
dtoMetric ,
)
2013-01-19 17:48:30 +04:00
}
2014-05-07 22:08:33 +04:00
metricHashes [ metricHash ] = struct { } { }
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
r . mtx . RLock ( ) // Remaining checks need the read lock.
defer r . mtx . RUnlock ( )
// Is the desc registered?
if _ , exist := r . descIDs [ desc . id ] ; ! exist {
return fmt . Errorf ( "collected metric %q with unregistered descriptor %s" , dtoMetric , desc )
2012-05-20 01:59:25 +04:00
}
2013-01-19 17:48:30 +04:00
2013-06-27 20:46:16 +04:00
return nil
2012-05-20 01:59:25 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) getBuf ( ) * bytes . Buffer {
select {
case buf := <- r . bufPool :
return buf
default :
return & bytes . Buffer { }
}
2013-01-08 19:56:19 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) giveBuf ( buf * bytes . Buffer ) {
buf . Reset ( )
select {
case r . bufPool <- buf :
default :
2013-01-19 17:48:30 +04:00
}
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) getMetricFamily ( ) * dto . MetricFamily {
select {
case mf := <- r . metricFamilyPool :
return mf
default :
return & dto . MetricFamily { }
}
2013-02-12 05:36:06 +04:00
}
2014-05-07 22:08:33 +04:00
func ( r * registry ) giveMetricFamily ( mf * dto . MetricFamily ) {
mf . Reset ( )
select {
case r . metricFamilyPool <- mf :
default :
}
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) getMetric ( ) * dto . Metric {
select {
case m := <- r . metricPool :
return m
default :
return & dto . Metric { }
}
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
func ( r * registry ) giveMetric ( m * dto . Metric ) {
m . Reset ( )
select {
case r . metricPool <- m :
default :
}
}
2013-06-27 20:46:16 +04:00
2014-05-07 22:08:33 +04:00
func newRegistry ( ) * registry {
return & registry {
collectorsByID : map [ uint64 ] Collector { } ,
descIDs : map [ uint64 ] struct { } { } ,
dimHashesByName : map [ string ] uint64 { } ,
bufPool : make ( chan * bytes . Buffer , numBufs ) ,
metricFamilyPool : make ( chan * dto . MetricFamily , numMetricFamilies ) ,
metricPool : make ( chan * dto . Metric , numMetrics ) ,
}
}
2013-06-27 20:46:16 +04:00
2014-12-11 04:05:35 +03:00
func newDefaultRegistry ( ) * registry {
r := newRegistry ( )
r . Register ( NewProcessCollector ( os . Getpid ( ) , "" ) )
r . Register ( NewGoCollector ( ) )
return r
}
2014-05-07 22:08:33 +04:00
func chooseEncoder ( req * http . Request ) ( encoder , string ) {
2014-07-04 19:08:57 +04:00
accepts := goautoneg . ParseAccept ( req . Header . Get ( acceptHeader ) )
2014-05-07 22:08:33 +04:00
for _ , accept := range accepts {
switch {
case accept . Type == "application" &&
accept . SubType == "vnd.google.protobuf" &&
accept . Params [ "proto" ] == "io.prometheus.client.MetricFamily" :
switch accept . Params [ "encoding" ] {
case "delimited" :
return text . WriteProtoDelimited , DelimitedTelemetryContentType
case "text" :
return text . WriteProtoText , ProtoTextTelemetryContentType
case "compact-text" :
return text . WriteProtoCompactText , ProtoCompactTextTelemetryContentType
default :
2014-04-03 17:18:12 +04:00
continue
2013-06-27 20:46:16 +04:00
}
2014-05-07 22:08:33 +04:00
case accept . Type == "text" &&
accept . SubType == "plain" &&
( accept . Params [ "version" ] == "0.0.4" || accept . Params [ "version" ] == "" ) :
return text . MetricFamilyToText , TextTelemetryContentType
default :
continue
2013-06-27 20:46:16 +04:00
}
}
2014-05-07 22:08:33 +04:00
return text . MetricFamilyToText , TextTelemetryContentType
2013-06-27 20:46:16 +04:00
}
2014-07-04 19:08:57 +04:00
// decorateWriter wraps a writer to handle gzip compression if requested. It
// returns the decorated writer and the appropriate "Content-Encoding" header
// (which is empty if no compression is enabled).
func decorateWriter ( request * http . Request , writer io . Writer ) ( io . Writer , string ) {
header := request . Header . Get ( acceptEncodingHeader )
parts := strings . Split ( header , "," )
for _ , part := range parts {
part := strings . TrimSpace ( part )
if part == "gzip" || strings . HasPrefix ( part , "gzip;" ) {
return gzip . NewWriter ( writer ) , "gzip"
}
}
return writer , ""
}
2014-05-07 22:08:33 +04:00
type metricSorter [ ] * dto . Metric
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( s metricSorter ) Len ( ) int {
return len ( s )
}
2013-01-19 17:48:30 +04:00
2014-05-07 22:08:33 +04:00
func ( s metricSorter ) Swap ( i , j int ) {
s [ i ] , s [ j ] = s [ j ] , s [ i ]
}
2012-05-21 12:23:22 +04:00
2014-05-07 22:08:33 +04:00
func ( s metricSorter ) Less ( i , j int ) bool {
for n , lp := range s [ i ] . Label {
vi := lp . GetValue ( )
vj := s [ j ] . Label [ n ] . GetValue ( )
if vi != vj {
return vi < vj
2012-05-20 01:59:25 +04:00
}
}
2014-05-07 22:08:33 +04:00
return true
2012-05-20 01:59:25 +04:00
}