Create a "merge gatherer"

This allows to finally get rid of the infamous injection hook in the
interface. The old SetMetricFamilyInjectionHook still exist as a
deprecated function but is now implemented with the new plumbing under
the hood.

Now that we have multiple Gatherer implementation, I renamed
push.Registry to push.FromGatherer.

This commit also improves the consistency checks, which happened as a
byproduct of the refactoring to allow checking in both the "merge
gatherer" Gatherers as well as in the normal Registry.
This commit is contained in:
beorn7 2016-08-05 14:57:49 +02:00
parent 1dc03a72f6
commit a6321dd0b1
8 changed files with 509 additions and 259 deletions

View File

@ -14,13 +14,16 @@
package prometheus_test package prometheus_test
import ( import (
"bytes"
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"runtime" "runtime"
"sort" "sort"
"strings"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -638,3 +641,111 @@ func ExampleAlreadyRegisteredError() {
} }
} }
} }
func ExampleGatherers() {
reg := prometheus.NewRegistry()
temp := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "temperature_kelvin",
Help: "Temperature in Kelvin.",
},
[]string{"location"},
)
reg.MustRegister(temp)
temp.WithLabelValues("outside").Set(273.14)
temp.WithLabelValues("inside").Set(298.44)
var parser expfmt.TextParser
text := `
# TYPE humidity_percent gauge
# HELP humidity_percent Humidity in %.
humidity_percent{location="outside"} 45.4
humidity_percent{location="inside"} 33.2
# TYPE temperature_kelvin gauge
# HELP temperature_kelvin Temperature in Kelvin.
temperature_kelvin{location="somewhere else"} 4.5
`
parseText := func() ([]*dto.MetricFamily, error) {
parsed, err := parser.TextToMetricFamilies(strings.NewReader(text))
if err != nil {
return nil, err
}
var result []*dto.MetricFamily
for _, mf := range parsed {
result = append(result, mf)
}
return result, nil
}
gatherers := prometheus.Gatherers{
reg,
prometheus.GathererFunc(parseText),
}
gathering, err := gatherers.Gather()
if err != nil {
fmt.Println(err)
}
out := &bytes.Buffer{}
for _, mf := range gathering {
if _, err := expfmt.MetricFamilyToText(out, mf); err != nil {
panic(err)
}
}
fmt.Print(out.String())
fmt.Println("----------")
// Note how the temperature_kelvin metric family has been merged from
// different sources. Now try
text = `
# TYPE humidity_percent gauge
# HELP humidity_percent Humidity in %.
humidity_percent{location="outside"} 45.4
humidity_percent{location="inside"} 33.2
# TYPE temperature_kelvin gauge
# HELP temperature_kelvin Temperature in Kelvin.
# Duplicate metric:
temperature_kelvin{location="outside"} 265.3
# Wrong labels:
temperature_kelvin 4.5
`
gathering, err = gatherers.Gather()
if err != nil {
fmt.Println(err)
}
// Note that still as many metrics as possible are returned:
out.Reset()
for _, mf := range gathering {
if _, err := expfmt.MetricFamilyToText(out, mf); err != nil {
panic(err)
}
}
fmt.Print(out.String())
// Output:
// # HELP humidity_percent Humidity in %.
// # TYPE humidity_percent gauge
// humidity_percent{location="inside"} 33.2
// humidity_percent{location="outside"} 45.4
// # HELP temperature_kelvin Temperature in Kelvin.
// # TYPE temperature_kelvin gauge
// temperature_kelvin{location="inside"} 298.44
// temperature_kelvin{location="outside"} 273.14
// temperature_kelvin{location="somewhere else"} 4.5
// ----------
// 2 error(s) occurred:
// * collected metric temperature_kelvin label:<name:"location" value:"outside" > gauge:<value:265.3 > was collected before with the same name and label values
// * collected metric temperature_kelvin gauge:<value:4.5 > has label dimensions inconsistent with previously collected metrics in the same metric family
// # HELP humidity_percent Humidity in %.
// # TYPE humidity_percent gauge
// humidity_percent{location="inside"} 33.2
// humidity_percent{location="outside"} 45.4
// # HELP temperature_kelvin Temperature in Kelvin.
// # TYPE temperature_kelvin gauge
// temperature_kelvin{location="inside"} 298.44
// temperature_kelvin{location="outside"} 273.14
}

View File

@ -22,10 +22,8 @@ import (
const separatorByte byte = 255 const separatorByte byte = 255
// A Metric models a single sample value with its meta data being exported to // A Metric models a single sample value with its meta data being exported to
// Prometheus. Implementers of Metric in this package inclued Gauge, Counter, // Prometheus. Implementations of Metric in this package are Gauge, Counter,
// Untyped, and Summary. Users can implement their own Metric types, but that // Histogram, Summary, and Untyped.
// should be rarely needed. See the example for SelfCollector, which is also an
// example for a user-implemented Metric.
type Metric interface { type Metric interface {
// Desc returns the descriptor for the Metric. This method idempotently // Desc returns the descriptor for the Metric. This method idempotently
// returns the same descriptor throughout the lifetime of the // returns the same descriptor throughout the lifetime of the
@ -36,16 +34,18 @@ type Metric interface {
// Write encodes the Metric into a "Metric" Protocol Buffer data // Write encodes the Metric into a "Metric" Protocol Buffer data
// transmission object. // transmission object.
// //
// Implementers of custom Metric types must observe concurrency safety // Metric implementations must observe concurrency safety as reads of
// as reads of this metric may occur at any time, and any blocking // this metric may occur at any time, and any blocking occurs at the
// occurs at the expense of total performance of rendering all // expense of total performance of rendering all registered
// registered metrics. Ideally Metric implementations should support // metrics. Ideally, Metric implementations should support concurrent
// concurrent readers. // readers.
// //
// While populating dto.Metric, it is recommended to sort labels // While populating dto.Metric, it is the responsibility of the
// lexicographically. (Implementers may find LabelPairSorter useful for // implementation to ensure validity of the Metric protobuf (like valid
// that.) Callers of Write should still make sure of sorting if they // UTF-8 strings or syntactically valid metric and label names). It is
// depend on it. // recommended to sort labels lexicographically. (Implementers may find
// LabelPairSorter useful for that.) Callers of Write should still make
// sure of sorting if they depend on it.
Write(*dto.Metric) error Write(*dto.Metric) error
// TODO(beorn7): The original rationale of passing in a pre-allocated // TODO(beorn7): The original rationale of passing in a pre-allocated
// dto.Metric protobuf to save allocations has disappeared. The // dto.Metric protobuf to save allocations has disappeared. The

View File

@ -88,13 +88,11 @@ func TestHandlerErrorHandling(t *testing.T) {
ErrorLog: logger, ErrorLog: logger,
ErrorHandling: PanicOnError, ErrorHandling: PanicOnError,
}) })
wantMsg := `error gathering metrics: 1 error(s) occurred: wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
* error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
` `
wantErrorBody := `An error has occurred during metrics gathering: wantErrorBody := `An error has occurred during metrics gathering:
1 error(s) occurred: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
* error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
` `
wantOKBody := `# HELP name docstring wantOKBody := `# HELP name docstring
# TYPE name counter # TYPE name counter
@ -110,10 +108,10 @@ the_count 0
t.Errorf("got HTTP status code %d, want %d", got, want) t.Errorf("got HTTP status code %d, want %d", got, want)
} }
if got := logBuf.String(); got != wantMsg { if got := logBuf.String(); got != wantMsg {
t.Errorf("got log message %q, want %q", got, wantMsg) t.Errorf("got log message:\n%s\nwant log mesage:\n%s\n", got, wantMsg)
} }
if got := writer.Body.String(); got != wantErrorBody { if got := writer.Body.String(); got != wantErrorBody {
t.Errorf("got body %q, want %q", got, wantErrorBody) t.Errorf("got body:\n%s\nwant body:\n%s\n", got, wantErrorBody)
} }
logBuf.Reset() logBuf.Reset()
writer.Body.Reset() writer.Body.Reset()

View File

@ -46,7 +46,7 @@ func ExampleRegistry() {
registry.MustRegister(completionTime) registry.MustRegister(completionTime)
completionTime.Set(float64(time.Now().Unix())) completionTime.Set(float64(time.Now().Unix()))
if err := push.Registry( if err := push.FromGatherer(
"db_backup", push.HostnameGroupingKey(), "db_backup", push.HostnameGroupingKey(),
"http://pushgateway:9091", "http://pushgateway:9091",
registry, registry,

View File

@ -44,14 +44,14 @@ import (
const contentTypeHeader = "Content-Type" const contentTypeHeader = "Content-Type"
// Registry triggers a metric collection by the provided Gatherer (which is // FromGatherer triggers a metric collection by the provided Gatherer (which is
// usually implemented by a prometheus.Registry, thus the name of the function) // usually implemented by a prometheus.Registry) and pushes all gathered metrics
// and pushes all gathered metrics to the Pushgateway specified by url, using // to the Pushgateway specified by url, using the provided job name and the
// the provided job name and the (optional) further grouping labels (the // (optional) further grouping labels (the grouping map may be nil). See the
// grouping map may be nil). See the Pushgateway documentation for detailed // Pushgateway documentation for detailed implications of the job and other
// implications of the job and other grouping labels. Neither the job name nor // grouping labels. Neither the job name nor any grouping label value may
// any grouping label value may contain a "/". The metrics pushed must not // contain a "/". The metrics pushed must not contain a job label of their own
// contain a job label of their own nor any of the grouping labels. // nor any of the grouping labels.
// //
// You can use just host:port or ip:port as url, in which case 'http://' is // You can use just host:port or ip:port as url, in which case 'http://' is
// added automatically. You can also include the schema in the URL. However, do // added automatically. You can also include the schema in the URL. However, do
@ -60,18 +60,18 @@ const contentTypeHeader = "Content-Type"
// Note that all previously pushed metrics with the same job and other grouping // Note that all previously pushed metrics with the same job and other grouping
// labels will be replaced with the metrics pushed by this call. (It uses HTTP // labels will be replaced with the metrics pushed by this call. (It uses HTTP
// method 'PUT' to push to the Pushgateway.) // method 'PUT' to push to the Pushgateway.)
func Registry(job string, grouping map[string]string, url string, reg prometheus.Gatherer) error { func FromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
return push(job, grouping, url, reg, "PUT") return push(job, grouping, url, g, "PUT")
} }
// RegistryAdd works like Registry, but only previously pushed metrics with the // AddFromGatherer works like FromGatherer, but only previously pushed metrics
// same name (and the same job and other grouping labels) will be replaced. (It // with the same name (and the same job and other grouping labels) will be
// uses HTTP method 'POST' to push to the Pushgateway.) // replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
func RegistryAdd(job string, grouping map[string]string, url string, reg prometheus.Gatherer) error { func AddFromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
return push(job, grouping, url, reg, "POST") return push(job, grouping, url, g, "POST")
} }
func push(job string, grouping map[string]string, pushURL string, reg prometheus.Gatherer, method string) error { func push(job string, grouping map[string]string, pushURL string, g prometheus.Gatherer, method string) error {
if !strings.Contains(pushURL, "://") { if !strings.Contains(pushURL, "://") {
pushURL = "http://" + pushURL pushURL = "http://" + pushURL
} }
@ -94,7 +94,7 @@ func push(job string, grouping map[string]string, pushURL string, reg prometheus
} }
pushURL = fmt.Sprintf("%s/metrics/job/%s", pushURL, strings.Join(urlComponents, "/")) pushURL = fmt.Sprintf("%s/metrics/job/%s", pushURL, strings.Join(urlComponents, "/"))
mfs, err := reg.Gather() mfs, err := g.Gather()
if err != nil { if err != nil {
return err return err
} }
@ -134,14 +134,14 @@ func push(job string, grouping map[string]string, pushURL string, reg prometheus
return nil return nil
} }
// Collectors works like Registry, but it does not use a Gatherer. Instead, it // Collectors works like FromGatherer, but it does not use a Gatherer. Instead,
// collects from the provided collectors directly. It is a convenient way to // it collects from the provided collectors directly. It is a convenient way to
// push only a few metrics. // push only a few metrics.
func Collectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error { func Collectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
return pushCollectors(job, grouping, url, "PUT", collectors...) return pushCollectors(job, grouping, url, "PUT", collectors...)
} }
// AddCollectors works like RegistryAdd, but it does not use a Gatherer. // AddCollectors works like AddFromGatherer, but it does not use a Gatherer.
// Instead, it collects from the provided collectors directly. It is a // Instead, it collects from the provided collectors directly. It is a
// convenient way to push only a few metrics. // convenient way to push only a few metrics.
func AddCollectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error { func AddCollectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {

View File

@ -150,7 +150,7 @@ func TestPush(t *testing.T) {
} }
// Push registry, all good. // Push registry, all good.
if err := Registry("testjob", HostnameGroupingKey(), pgwOK.URL, reg); err != nil { if err := FromGatherer("testjob", HostnameGroupingKey(), pgwOK.URL, reg); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if lastMethod != "PUT" { if lastMethod != "PUT" {
@ -161,7 +161,7 @@ func TestPush(t *testing.T) {
} }
// PushAdd registry, all good. // PushAdd registry, all good.
if err := RegistryAdd("testjob", map[string]string{"a": "x", "b": "y"}, pgwOK.URL, reg); err != nil { if err := AddFromGatherer("testjob", map[string]string{"a": "x", "b": "y"}, pgwOK.URL, reg); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if lastMethod != "POST" { if lastMethod != "POST" {

View File

@ -32,18 +32,20 @@ const (
capDescChan = 10 capDescChan = 10
) )
// DefaultRegistry is a Registry instance that has a ProcessCollector and a // DefaultRegisterer and DefaultGatherer are the implementations of the
// GoCollector pre-registered. DefaultRegisterer and DefaultGatherer are both // Registerer and Gatherer interface a number of convenience functions in this
// pointing to it. A number of convenience functions in this package act on // package act on. Initially, both variables point to the same Registry, which
// them. This approach to keep a default instance as global state mirrors the // has a process collector (see NewProcessCollector) and a Go collector (see
// approach of other packages in the Go standard library. Note that there are // NewGoCollector) already registered. This approach to keep default instances
// caveats. Change the variables with caution and only if you understand the // as global state mirrors the approach of other packages in the Go standard
// consequences. Users who want to avoid global state altogether should not // library. Note that there are caveats. Change the variables with caution and
// use the convenience function and act on custom instances instead. // only if you understand the consequences. Users who want to avoid global state
// altogether should not use the convenience function and act on custom
// instances instead.
var ( var (
DefaultRegistry = NewRegistry() defaultRegistry = NewRegistry()
DefaultRegisterer Registerer = DefaultRegistry DefaultRegisterer Registerer = defaultRegistry
DefaultGatherer Gatherer = DefaultRegistry DefaultGatherer Gatherer = defaultRegistry
) )
func init() { func init() {
@ -67,10 +69,9 @@ func NewRegistry() *Registry {
// //
// Usually, a Registry will be happy as long as the union of all collected // Usually, a Registry will be happy as long as the union of all collected
// Metrics is consistent and valid even if some metrics are not consistent with // Metrics is consistent and valid even if some metrics are not consistent with
// their own Desc or with one of the Descs provided by their // their own Desc or a Desc provided by their registered Collector. Well-behaved
// Collector. Well-behaved Collectors and Metrics will only provide consistent // Collectors and Metrics will only provide consistent Descs. This Registry is
// Descs. This Registry is useful to test the implementation of Collectors and // useful to test the implementation of Collectors and Metrics.
// Metrics.
func NewPedanticRegistry() *Registry { func NewPedanticRegistry() *Registry {
r := NewRegistry() r := NewRegistry()
r.pedanticChecksEnabled = true r.pedanticChecksEnabled = true
@ -80,13 +81,13 @@ func NewPedanticRegistry() *Registry {
// Registerer is the interface for the part of a registry in charge of // Registerer is the interface for the part of a registry in charge of
// registering and unregistering. Users of custom registries should use // registering and unregistering. Users of custom registries should use
// Registerer as type for registration purposes (rather then the Registry type // Registerer as type for registration purposes (rather then the Registry type
// directly). In that way, they are free to exchange the Registerer // directly). In that way, they are free to use custom Registerer implementation
// implementation (e.g. for testing purposes). // (e.g. for testing purposes).
type Registerer interface { type Registerer interface {
// Register registers a new Collector to be included in metrics // Register registers a new Collector to be included in metrics
// collection. It returns an error if the descriptors provided by the // collection. It returns an error if the descriptors provided by the
// Collector are invalid or if they - in combination with descriptors of // Collector are invalid or if they in combination with descriptors of
// already registered Collectors - do not fulfill the consistency and // already registered Collectors do not fulfill the consistency and
// uniqueness criteria described in the documentation of metric.Desc. // uniqueness criteria described in the documentation of metric.Desc.
// //
// If the provided Collector is equal to a Collector already registered // If the provided Collector is equal to a Collector already registered
@ -192,26 +193,41 @@ func Unregister(c Collector) bool {
return DefaultRegisterer.Unregister(c) return DefaultRegisterer.Unregister(c)
} }
// SetMetricFamilyInjectionHook sets a MetricFamily injection hook on the // GathererFunc turns a function into a Gatherer.
// DefaultRegistry. type GathererFunc func() ([]*dto.MetricFamily, error)
//
// It's a shortcut for DefaultRegistry.SetInjectionHook(hook). See there for // Gather implements Gatherer.
// more details. func (gf GathererFunc) Gather() ([]*dto.MetricFamily, error) {
// return gf()
// Deprecated: In the rare cases this call is needed, users should simply call
// DefaultRegistry.SetInjectionHook directly.
func SetMetricFamilyInjectionHook(hook func() []*dto.MetricFamily) {
DefaultRegistry.SetInjectionHook(hook)
} }
// AlreadyRegisteredError is returned by the Registerer.Register if the // SetMetricFamilyInjectionHook replaces the DefaultGatherer with one that
// Collector to be registered has already been registered before, or a different // gathers from the previous DefaultGatherers but then merges the MetricFamily
// Collector that collects the same metrics has been registered // protobufs returned from the provided hook function with the MetricFamily
// before. Registration fails in that case, but you can detect from the kind of // protobufs returned from the original DefaultGatherer.
// error what has happened. The error contains fields for the existing Collector //
// and the (rejected) new Collector that equals the existing one. This can be // Deprecated: This function manipulates the DefaultGatherer variable. Consider
// used to find out if an equal Collector has been registered before and switch // the implications, i.e. don't do this concurrently with any uses of the
// over to using the old one, as demonstrated in the example. // DefaultGatherer. In the rare cases where you need to inject MetricFamily
// protobufs directly, it is recommended to use a custom Registry and combine it
// with a custom Gatherer using the Gatherers type (see
// there). SetMetricFamilyInjectionHook only exists for compatibility reasons
// with previous versions of this package.
func SetMetricFamilyInjectionHook(hook func() []*dto.MetricFamily) {
DefaultGatherer = Gatherers{
DefaultGatherer,
GathererFunc(func() ([]*dto.MetricFamily, error) { return hook(), nil }),
}
}
// AlreadyRegisteredError is returned by the Register method if the Collector to
// be registered has already been registered before, or a different Collector
// that collects the same metrics has been registered before. Registration fails
// in that case, but you can detect from the kind of error what has
// happened. The error contains fields for the existing Collector and the
// (rejected) new Collector that equals the existing one. This can be used to
// find out if an equal Collector has been registered before and switch over to
// using the old one, as demonstrated in the example.
type AlreadyRegisteredError struct { type AlreadyRegisteredError struct {
ExistingCollector, NewCollector Collector ExistingCollector, NewCollector Collector
} }
@ -236,17 +252,31 @@ func (errs MultiError) Error() string {
return buf.String() return buf.String()
} }
// MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only
// contained error as error if len(errs is 1). In all other cases, it returns
// the MultiError directly. This is helpful for returning a MultiError in a way
// that only uses the MultiError if needed.
func (errs MultiError) MaybeUnwrap() error {
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return errs
}
}
// Registry registers Prometheus collectors, collects their metrics, and gathers // Registry registers Prometheus collectors, collects their metrics, and gathers
// them into MetricFamilies for exposition. It implements Registerer and // them into MetricFamilies for exposition. It implements both Registerer and
// Gatherer. The zero value is not usable. Create instances with NewRegistry or // Gatherer. The zero value is not usable. Create instances with NewRegistry or
// NewPedanticRegistry. // NewPedanticRegistry.
type Registry struct { type Registry struct {
mtx sync.RWMutex mtx sync.RWMutex
collectorsByID map[uint64]Collector // ID is a hash of the descIDs. collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
descIDs map[uint64]struct{} descIDs map[uint64]struct{}
dimHashesByName map[string]uint64 dimHashesByName map[string]uint64
metricFamilyInjectionHook func() []*dto.MetricFamily pedanticChecksEnabled bool
pedanticChecksEnabled bool
} }
// Register implements Registerer. // Register implements Registerer.
@ -381,6 +411,7 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
var ( var (
metricChan = make(chan Metric, capMetricChan) metricChan = make(chan Metric, capMetricChan)
metricHashes = map[uint64]struct{}{} metricHashes = map[uint64]struct{}{}
dimHashes = map[string]uint64{}
wg sync.WaitGroup wg sync.WaitGroup
errs MultiError // The collected errors to return in the end. errs MultiError // The collected errors to return in the end.
registeredDescIDs map[uint64]struct{} // Only used for pedantic checks registeredDescIDs map[uint64]struct{} // Only used for pedantic checks
@ -426,13 +457,6 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
// of metricFamiliesByName (and of metricHashes if checks are // of metricFamiliesByName (and of metricHashes if checks are
// enabled). Most likely not worth it. // enabled). Most likely not worth it.
desc := metric.Desc() desc := metric.Desc()
metricFamily, ok := metricFamiliesByName[desc.fqName]
if !ok {
metricFamily = &dto.MetricFamily{}
metricFamily.Name = proto.String(desc.fqName)
metricFamily.Help = proto.String(desc.help)
metricFamiliesByName[desc.fqName] = metricFamily
}
dtoMetric := &dto.Metric{} dtoMetric := &dto.Metric{}
if err := metric.Write(dtoMetric); err != nil { if err := metric.Write(dtoMetric); err != nil {
errs = append(errs, fmt.Errorf( errs = append(errs, fmt.Errorf(
@ -440,46 +464,171 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
)) ))
continue continue
} }
switch { metricFamily, ok := metricFamiliesByName[desc.fqName]
case metricFamily.Type != nil: if ok {
// Type already set. We are good. if metricFamily.GetHelp() != desc.help {
case dtoMetric.Gauge != nil: errs = append(errs, fmt.Errorf(
metricFamily.Type = dto.MetricType_GAUGE.Enum() "collected metric %s %s has help %q but should have %q",
case dtoMetric.Counter != nil: desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
metricFamily.Type = dto.MetricType_COUNTER.Enum() ))
case dtoMetric.Summary != nil: continue
metricFamily.Type = dto.MetricType_SUMMARY.Enum() }
case dtoMetric.Untyped != nil: // TODO(beorn7): Simplify switch once Desc has type.
metricFamily.Type = dto.MetricType_UNTYPED.Enum() switch metricFamily.GetType() {
case dtoMetric.Histogram != nil: case dto.MetricType_COUNTER:
metricFamily.Type = dto.MetricType_HISTOGRAM.Enum() if dtoMetric.Counter == nil {
default: errs = append(errs, fmt.Errorf(
errs = append(errs, fmt.Errorf( "collected metric %s %s should be a Counter",
"empty metric collected: %s", dtoMetric, desc.fqName, dtoMetric,
)) ))
continue continue
}
case dto.MetricType_GAUGE:
if dtoMetric.Gauge == nil {
errs = append(errs, fmt.Errorf(
"collected metric %s %s should be a Gauge",
desc.fqName, dtoMetric,
))
continue
}
case dto.MetricType_SUMMARY:
if dtoMetric.Summary == nil {
errs = append(errs, fmt.Errorf(
"collected metric %s %s should be a Summary",
desc.fqName, dtoMetric,
))
continue
}
case dto.MetricType_UNTYPED:
if dtoMetric.Untyped == nil {
errs = append(errs, fmt.Errorf(
"collected metric %s %s should be Untyped",
desc.fqName, dtoMetric,
))
continue
}
case dto.MetricType_HISTOGRAM:
if dtoMetric.Histogram == nil {
errs = append(errs, fmt.Errorf(
"collected metric %s %s should be a Histogram",
desc.fqName, dtoMetric,
))
continue
}
default:
panic("encountered MetricFamily with invalid type")
}
} else {
metricFamily = &dto.MetricFamily{}
metricFamily.Name = proto.String(desc.fqName)
metricFamily.Help = proto.String(desc.help)
// TODO(beorn7): Simplify switch once Desc has type.
switch {
case dtoMetric.Gauge != nil:
metricFamily.Type = dto.MetricType_GAUGE.Enum()
case dtoMetric.Counter != nil:
metricFamily.Type = dto.MetricType_COUNTER.Enum()
case dtoMetric.Summary != nil:
metricFamily.Type = dto.MetricType_SUMMARY.Enum()
case dtoMetric.Untyped != nil:
metricFamily.Type = dto.MetricType_UNTYPED.Enum()
case dtoMetric.Histogram != nil:
metricFamily.Type = dto.MetricType_HISTOGRAM.Enum()
default:
errs = append(errs, fmt.Errorf(
"empty metric collected: %s", dtoMetric,
))
continue
}
metricFamiliesByName[desc.fqName] = metricFamily
} }
if err := r.checkConsistency(metricFamily, dtoMetric, desc, metricHashes, registeredDescIDs); err != nil { if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes, dimHashes); err != nil {
errs = append(errs, err) errs = append(errs, err)
continue continue
} }
if r.pedanticChecksEnabled {
// Is the desc registered at all?
if _, exist := registeredDescIDs[desc.id]; !exist {
errs = append(errs, fmt.Errorf(
"collected metric %s %s with unregistered descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
))
continue
}
if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil {
errs = append(errs, err)
continue
}
}
metricFamily.Metric = append(metricFamily.Metric, dtoMetric) metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
} }
return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
}
r.mtx.RLock() // Gatherers is a slice of Gatherer instances that implements the Gatherer
if r.metricFamilyInjectionHook != nil { // interface itself. Its Gather method calls Gather on all Gatherers in the
for _, mf := range r.metricFamilyInjectionHook() { // slice in order and returns the merged results. Errors returned from the
// Gather calles are all returned in a flattened MultiError. Duplicate and
// inconsistent Metrics are skipped (first occurrence in slice order wins) and
// reported in the returned error.
//
// Gatherers can be used to merge the Gather results from multiple
// Registries. It also provides a way to directly inject existing MetricFamily
// protobufs into the gathering by creating a custom Gatherer with a Gather
// method that simply returns the existing MetricFamily protobufs. Note that no
// registration is involved (in contrast to Collector registration), so
// obviously registration-time checks cannot happen. Any inconsistencies between
// the gathered MetricFamilies are reported as errors by the Gather method, and
// inconsistent Metrics are dropped. Invalid parts of the MetricFamilies
// (e.g. syntactically invalid metric or label names) will go undetected.
type Gatherers []Gatherer
// Gather implements Gatherer.
func (gs Gatherers) Gather() ([]*dto.MetricFamily, error) {
var (
metricFamiliesByName = map[string]*dto.MetricFamily{}
metricHashes = map[uint64]struct{}{}
dimHashes = map[string]uint64{}
errs MultiError // The collected errors to return in the end.
)
for i, g := range gs {
mfs, err := g.Gather()
if err != nil {
if multiErr, ok := err.(MultiError); ok {
for _, err := range multiErr {
errs = append(errs, fmt.Errorf("[from Gatherer #%d] %s", i+1, err))
}
} else {
errs = append(errs, fmt.Errorf("[from Gatherer #%d] %s", i+1, err))
}
}
for _, mf := range mfs {
existingMF, exists := metricFamiliesByName[mf.GetName()] existingMF, exists := metricFamiliesByName[mf.GetName()]
if !exists { if exists {
if existingMF.GetHelp() != mf.GetHelp() {
errs = append(errs, fmt.Errorf(
"gathered metric family %s has help %q but should have %q",
mf.GetName(), mf.GetHelp(), existingMF.GetHelp(),
))
continue
}
if existingMF.GetType() != mf.GetType() {
errs = append(errs, fmt.Errorf(
"gathered metric family %s has type %s but should have %s",
mf.GetName(), mf.GetType(), existingMF.GetType(),
))
continue
}
} else {
existingMF = &dto.MetricFamily{} existingMF = &dto.MetricFamily{}
existingMF.Name = mf.Name existingMF.Name = mf.Name
existingMF.Help = mf.Help existingMF.Help = mf.Help
existingMF.Type = mf.Type existingMF.Type = mf.Type
metricFamiliesByName[mf.GetName()] = existingMF metricFamiliesByName[mf.GetName()] = existingMF
} }
for _, m := range mf.Metric { for _, m := range mf.Metric {
if err := r.checkConsistency(existingMF, m, nil, metricHashes, nil); err != nil { if err := checkMetricConsistency(existingMF, m, metricHashes, dimHashes); err != nil {
errs = append(errs, err) errs = append(errs, err)
continue continue
} }
@ -487,141 +636,7 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
} }
} }
} }
r.mtx.RUnlock() return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
// Now that MetricFamilies are all set, sort their Metrics
// lexicographically by their label values.
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
}
// Write out MetricFamilies sorted by their name, skipping those without
// metrics.
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
if len(mf.Metric) > 0 {
names = append(names, name)
}
}
sort.Strings(names)
result := make([]*dto.MetricFamily, 0, len(names))
for _, name := range names {
result = append(result, metricFamiliesByName[name])
}
// We cannot just `return result, errs`. Even if errs == nil, it will
// not be seen as nil through the error interface.
if len(errs) == 0 {
return result, nil
}
return result, errs
}
func (r *Registry) checkConsistency(
metricFamily *dto.MetricFamily,
dtoMetric *dto.Metric,
desc *Desc,
metricHashes map[uint64]struct{},
registeredDescIDs map[uint64]struct{},
) error {
// Type consistency with metric family.
if metricFamily.GetType() == dto.MetricType_GAUGE && dtoMetric.Gauge == nil ||
metricFamily.GetType() == dto.MetricType_COUNTER && dtoMetric.Counter == nil ||
metricFamily.GetType() == dto.MetricType_SUMMARY && dtoMetric.Summary == nil ||
metricFamily.GetType() == dto.MetricType_HISTOGRAM && dtoMetric.Histogram == nil ||
metricFamily.GetType() == dto.MetricType_UNTYPED && dtoMetric.Untyped == nil {
return fmt.Errorf(
"collected metric %s %s is not a %s",
metricFamily.GetName(), dtoMetric, metricFamily.GetType(),
)
}
// Is the metric unique (i.e. no other metric with the same name and the same label values)?
h := hashNew()
h = hashAdd(h, metricFamily.GetName())
h = hashAddByte(h, separatorByte)
// Make sure label pairs are sorted. We depend on it for the consistency
// check.
sort.Sort(LabelPairSorter(dtoMetric.Label))
for _, lp := range dtoMetric.Label {
h = hashAdd(h, lp.GetValue())
h = hashAddByte(h, separatorByte)
}
if _, exists := metricHashes[h]; exists {
return fmt.Errorf(
"collected metric %s %s was collected before with the same name and label values",
metricFamily.GetName(), dtoMetric,
)
}
metricHashes[h] = struct{}{}
if desc == nil || !r.pedanticChecksEnabled {
return nil // Nothing left to check if we have no desc.
}
// Desc help consistency with metric family help.
if metricFamily.GetHelp() != desc.help {
return fmt.Errorf(
"collected metric %s %s has help %q but should have %q",
metricFamily.GetName(), dtoMetric, metricFamily.GetHelp(), desc.help,
)
}
// Is the desc consistent with the content of the metric?
lpsFromDesc := make([]*dto.LabelPair, 0, len(dtoMetric.Label))
lpsFromDesc = append(lpsFromDesc, desc.constLabelPairs...)
for _, l := range desc.variableLabels {
lpsFromDesc = append(lpsFromDesc, &dto.LabelPair{
Name: proto.String(l),
})
}
if len(lpsFromDesc) != len(dtoMetric.Label) {
return fmt.Errorf(
"labels in collected metric %s %s are inconsistent with descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
)
}
sort.Sort(LabelPairSorter(lpsFromDesc))
for i, lpFromDesc := range lpsFromDesc {
lpFromMetric := dtoMetric.Label[i]
if lpFromDesc.GetName() != lpFromMetric.GetName() ||
lpFromDesc.Value != nil && lpFromDesc.GetValue() != lpFromMetric.GetValue() {
return fmt.Errorf(
"labels in collected metric %s %s are inconsistent with descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
)
}
}
// Is the desc registered?
if _, exist := registeredDescIDs[desc.id]; !exist {
return fmt.Errorf(
"collected metric %s %s with unregistered descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
)
}
return nil
}
// SetInjectionHook sets the provided hook to inject MetricFamilies. The hook is
// a function that is called whenever metrics are collected. The MetricFamily
// protobufs returned by the hook function are merged with the metrics collected
// in the usual way.
//
// This is a way to directly inject MetricFamily protobufs managed and owned by
// the caller. The caller has full responsibility. As no registration of the
// injected metrics has happened, there was no check at registration time. If
// the injection results in inconsistent metrics, the Collect call will return
// an error. Some problems may even go undetected, like invalid label names in
// the injected protobufs.
//
// The hook function must be callable at any time and concurrently.
func (r *Registry) SetInjectionHook(hook func() []*dto.MetricFamily) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.metricFamilyInjectionHook = hook
} }
// metricSorter is a sortable slice of *dto.Metric. // metricSorter is a sortable slice of *dto.Metric.
@ -667,3 +682,125 @@ func (s metricSorter) Less(i, j int) bool {
} }
return s[i].GetTimestampMs() < s[j].GetTimestampMs() return s[i].GetTimestampMs() < s[j].GetTimestampMs()
} }
// normalizeMetricFamilies returns a MetricFamily slice whith empty
// MetricFamilies pruned and the remaining MetricFamilies sorted by name within
// the slice, with the contained Metrics sorted within each MetricFamily.
func normalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
if len(mf.Metric) > 0 {
names = append(names, name)
}
}
sort.Strings(names)
result := make([]*dto.MetricFamily, 0, len(names))
for _, name := range names {
result = append(result, metricFamiliesByName[name])
}
return result
}
// checkMetricConsistency checks if the provided Metric is consistent with the
// provided MetricFamily. It also hashed the Metric labels and the MetricFamily
// name. If the resulting hash is alread in the provided metricHashes, an error
// is returned. If not, it is added to metricHashes. The provided dimHashes maps
// MetricFamily names to their dimHash (hashed sorted label names). If dimHashes
// doesn't yet contain a hash for the provided MetricFamily, it is
// added. Otherwise, an error is returned if the existing dimHashes in not equal
// the calculated dimHash.
func checkMetricConsistency(
metricFamily *dto.MetricFamily,
dtoMetric *dto.Metric,
metricHashes map[uint64]struct{},
dimHashes map[string]uint64,
) error {
// Type consistency with metric family.
if metricFamily.GetType() == dto.MetricType_GAUGE && dtoMetric.Gauge == nil ||
metricFamily.GetType() == dto.MetricType_COUNTER && dtoMetric.Counter == nil ||
metricFamily.GetType() == dto.MetricType_SUMMARY && dtoMetric.Summary == nil ||
metricFamily.GetType() == dto.MetricType_HISTOGRAM && dtoMetric.Histogram == nil ||
metricFamily.GetType() == dto.MetricType_UNTYPED && dtoMetric.Untyped == nil {
return fmt.Errorf(
"collected metric %s %s is not a %s",
metricFamily.GetName(), dtoMetric, metricFamily.GetType(),
)
}
// Is the metric unique (i.e. no other metric with the same name and the same label values)?
h := hashNew()
h = hashAdd(h, metricFamily.GetName())
h = hashAddByte(h, separatorByte)
dh := hashNew()
// Make sure label pairs are sorted. We depend on it for the consistency
// check.
sort.Sort(LabelPairSorter(dtoMetric.Label))
for _, lp := range dtoMetric.Label {
h = hashAdd(h, lp.GetValue())
h = hashAddByte(h, separatorByte)
dh = hashAdd(dh, lp.GetName())
dh = hashAddByte(dh, separatorByte)
}
if _, exists := metricHashes[h]; exists {
return fmt.Errorf(
"collected metric %s %s was collected before with the same name and label values",
metricFamily.GetName(), dtoMetric,
)
}
if dimHash, ok := dimHashes[metricFamily.GetName()]; ok {
if dimHash != dh {
return fmt.Errorf(
"collected metric %s %s has label dimensions inconsistent with previously collected metrics in the same metric family",
metricFamily.GetName(), dtoMetric,
)
}
} else {
dimHashes[metricFamily.GetName()] = dh
}
metricHashes[h] = struct{}{}
return nil
}
func checkDescConsistency(
metricFamily *dto.MetricFamily,
dtoMetric *dto.Metric,
desc *Desc,
) error {
// Desc help consistency with metric family help.
if metricFamily.GetHelp() != desc.help {
return fmt.Errorf(
"collected metric %s %s has help %q but should have %q",
metricFamily.GetName(), dtoMetric, metricFamily.GetHelp(), desc.help,
)
}
// Is the desc consistent with the content of the metric?
lpsFromDesc := make([]*dto.LabelPair, 0, len(dtoMetric.Label))
lpsFromDesc = append(lpsFromDesc, desc.constLabelPairs...)
for _, l := range desc.variableLabels {
lpsFromDesc = append(lpsFromDesc, &dto.LabelPair{
Name: proto.String(l),
})
}
if len(lpsFromDesc) != len(dtoMetric.Label) {
return fmt.Errorf(
"labels in collected metric %s %s are inconsistent with descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
)
}
sort.Sort(LabelPairSorter(lpsFromDesc))
for i, lpFromDesc := range lpsFromDesc {
lpFromMetric := dtoMetric.Label[i]
if lpFromDesc.GetName() != lpFromMetric.GetName() ||
lpFromDesc.Value != nil && lpFromDesc.GetValue() != lpFromMetric.GetValue() {
return fmt.Errorf(
"labels in collected metric %s %s are inconsistent with descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
)
}
}
return nil
}

View File

@ -185,7 +185,7 @@ metric: <
externalMetricFamilyWithSameName := &dto.MetricFamily{ externalMetricFamilyWithSameName := &dto.MetricFamily{
Name: proto.String("name"), Name: proto.String("name"),
Help: proto.String("inconsistent help string does not matter here"), Help: proto.String("docstring"),
Type: dto.MetricType_COUNTER.Enum(), Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{ Metric: []*dto.Metric{
{ {
@ -455,17 +455,21 @@ metric: <
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {
registry := prometheus.NewPedanticRegistry() registry := prometheus.NewPedanticRegistry()
gatherer := prometheus.Gatherer(registry)
if scenario.externalMF != nil { if scenario.externalMF != nil {
registry.SetInjectionHook(func() []*dto.MetricFamily { gatherer = prometheus.Gatherers{
return scenario.externalMF registry,
}) prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
return scenario.externalMF, nil
}),
}
} }
if scenario.collector != nil { if scenario.collector != nil {
registry.Register(scenario.collector) registry.Register(scenario.collector)
} }
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
handler := prometheus.InstrumentHandler("prometheus", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) handler := prometheus.InstrumentHandler("prometheus", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))
request, _ := http.NewRequest("GET", "/", nil) request, _ := http.NewRequest("GET", "/", nil)
for key, value := range scenario.headers { for key, value := range scenario.headers {
request.Header.Add(key, value) request.Header.Add(key, value)
@ -483,7 +487,7 @@ metric: <
if !bytes.Equal(scenario.out.body, writer.Body.Bytes()) { if !bytes.Equal(scenario.out.body, writer.Body.Bytes()) {
t.Errorf( t.Errorf(
"%d. expected %q for body, got %q", "%d. expected body:\n%s\ngot body:\n%s\n",
i, scenario.out.body, writer.Body.Bytes(), i, scenario.out.body, writer.Body.Bytes(),
) )
} }