From ad1b9f77541bab2b732a71ddf841cdfabea984a9 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 9 Jul 2018 01:03:22 +0200 Subject: [PATCH] Introduce unchecked Collectors Fixes #47 . See there for more detailed discussion. Signed-off-by: beorn7 --- prometheus/collector.go | 39 +++++++----- prometheus/doc.go | 12 +++- prometheus/registry.go | 120 +++++++++++++++++++++++++++--------- prometheus/registry_test.go | 29 ++++++++- 4 files changed, 155 insertions(+), 45 deletions(-) diff --git a/prometheus/collector.go b/prometheus/collector.go index 623d3d8..3c9bae2 100644 --- a/prometheus/collector.go +++ b/prometheus/collector.go @@ -29,24 +29,35 @@ type Collector interface { // collected by this Collector to the provided channel and returns once // the last descriptor has been sent. The sent descriptors fulfill the // consistency and uniqueness requirements described in the Desc - // documentation. (It is valid if one and the same Collector sends - // duplicate descriptors. Those duplicates are simply ignored. However, - // two different Collectors must not send duplicate descriptors.) This - // method idempotently sends the same descriptors throughout the - // lifetime of the Collector. If a Collector encounters an error while - // executing this method, it must send an invalid descriptor (created - // with NewInvalidDesc) to signal the error to the registry. + // documentation. + // + // It is valid if one and the same Collector sends duplicate + // descriptors. Those duplicates are simply ignored. However, two + // different Collectors must not send duplicate descriptors. + // + // Sending no descriptor at all marks the Collector as “unchecked”, + // i.e. no checks will be performed at registration time, and the + // Collector may yield any Metric it sees fit in its Collect method. + // + // This method idempotently sends the same descriptors throughout the + // lifetime of the Collector. + // + // If a Collector encounters an error while executing this method, it + // must send an invalid descriptor (created with NewInvalidDesc) to + // signal the error to the registry. Describe(chan<- *Desc) // Collect is called by the Prometheus registry when collecting // metrics. The implementation sends each collected metric via the // provided channel and returns once the last metric has been sent. The - // descriptor of each sent metric is one of those returned by - // Describe. Returned metrics that share the same descriptor must differ - // in their variable label values. This method may be called - // concurrently and must therefore be implemented in a concurrency safe - // way. Blocking occurs at the expense of total performance of rendering - // all registered metrics. Ideally, Collector implementations support - // concurrent readers. + // descriptor of each sent metric is one of those returned by Describe + // (unless the Collector is unchecked, see above). Returned metrics that + // share the same descriptor must differ in their variable label + // values. + // + // This method may be called concurrently and must therefore be + // implemented in a concurrency safe way. Blocking occurs at the expense + // of total performance of rendering all registered metrics. Ideally, + // Collector implementations support concurrent readers. Collect(chan<- Metric) } diff --git a/prometheus/doc.go b/prometheus/doc.go index 83c3657..5d9525d 100644 --- a/prometheus/doc.go +++ b/prometheus/doc.go @@ -121,7 +121,17 @@ // NewConstSummary (and their respective Must… versions). That will happen in // the Collect method. The Describe method has to return separate Desc // instances, representative of the “throw-away” metrics to be created later. -// NewDesc comes in handy to create those Desc instances. +// NewDesc comes in handy to create those Desc instances. Alternatively, you +// could return no Desc at all, which will marke the Collector “unchecked”. No +// checks are porformed at registration time, but metric consistency will still +// be ensured at scrape time, i.e. any inconsistencies will lead to scrape +// errors. Thus, with unchecked Collectors, the responsibility to not collect +// metrics that lead to inconsistencies in the total scrape result lies with the +// implementer of the Collector. While this is not a desirable state, it is +// sometimes necessary. The typical use case is a situatios where the exact +// metrics to be returned by a Collector cannot be predicted at registration +// time, but the implementer has sufficient knowledge of the whole system to +// guarantee metric consistency. // // The Collector example illustrates the use case. You can also look at the // source code of the processCollector (mirroring process metrics), the diff --git a/prometheus/registry.go b/prometheus/registry.go index fdb7bad..5c5cdfe 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -15,7 +15,6 @@ package prometheus import ( "bytes" - "errors" "fmt" "os" "runtime" @@ -68,7 +67,8 @@ func NewRegistry() *Registry { // NewPedanticRegistry returns a registry that checks during collection if each // collected Metric is consistent with its reported Desc, and if the Desc has -// actually been registered with the registry. +// actually been registered with the registry. Unchecked Collectors (those whose +// Describe methed does not yield any descriptors) are excluded from the check. // // Usually, a Registry will be happy as long as the union of all collected // Metrics is consistent and valid even if some metrics are not consistent with @@ -98,6 +98,14 @@ type Registerer interface { // returned error is an instance of AlreadyRegisteredError, which // contains the previously registered Collector. // + // A Collector whose Describe method does not yield any Desc is treated + // as unchecked. Registration will always succeed. No check for + // re-registering (see previous paragraph) is performed. Thus, the + // caller is responsible for not double-registering the same unchecked + // Collector, and for providing a Collector that will not cause + // inconsistent metrics on collection. (This would lead to scrape + // errors.) + // // It is in general not safe to register the same Collector multiple // times concurrently. Register(Collector) error @@ -108,7 +116,9 @@ type Registerer interface { // Unregister unregisters the Collector that equals the Collector passed // in as an argument. (Two Collectors are considered equal if their // Describe method yields the same set of descriptors.) The function - // returns whether a Collector was unregistered. + // returns whether a Collector was unregistered. Note that an unchecked + // Collector cannot be unregistered (as its Describe method does not + // yield any descriptor). // // Note that even after unregistering, it will not be possible to // register a new Collector that is inconsistent with the unregistered @@ -243,6 +253,7 @@ type Registry struct { collectorsByID map[uint64]Collector // ID is a hash of the descIDs. descIDs map[uint64]struct{} dimHashesByName map[string]uint64 + uncheckedCollectors []Collector pedanticChecksEnabled bool } @@ -300,9 +311,10 @@ func (r *Registry) Register(c Collector) error { } } } - // Did anything happen at all? + // A Collector yielding no Desc at all is considered unchecked. if len(newDescIDs) == 0 { - return errors.New("collector has no descriptors") + r.uncheckedCollectors = append(r.uncheckedCollectors, c) + return nil } if existing, exists := r.collectorsByID[collectorID]; exists { return AlreadyRegisteredError{ @@ -376,19 +388,24 @@ func (r *Registry) MustRegister(cs ...Collector) { // Gather implements Gatherer. func (r *Registry) Gather() ([]*dto.MetricFamily, error) { var ( - metricChan = make(chan Metric, capMetricChan) - metricHashes = map[uint64]struct{}{} - wg sync.WaitGroup - errs MultiError // The collected errors to return in the end. - registeredDescIDs map[uint64]struct{} // Only used for pedantic checks + checkedMetricChan = make(chan Metric, capMetricChan) + uncheckedMetricChan = make(chan Metric, capMetricChan) + metricHashes = map[uint64]struct{}{} + wg sync.WaitGroup + errs MultiError // The collected errors to return in the end. + registeredDescIDs map[uint64]struct{} // Only used for pedantic checks ) r.mtx.RLock() - goroutineBudget := len(r.collectorsByID) + goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) - collectors := make(chan Collector, len(r.collectorsByID)) + checkedCollectors := make(chan Collector, len(r.collectorsByID)) + uncheckedCollectors := make(chan Collector, len(r.uncheckedCollectors)) for _, collector := range r.collectorsByID { - collectors <- collector + checkedCollectors <- collector + } + for _, collector := range r.uncheckedCollectors { + uncheckedCollectors <- collector } // In case pedantic checks are enabled, we have to copy the map before // giving up the RLock. @@ -405,12 +422,14 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { collectWorker := func() { for { select { - case collector := <-collectors: - collector.Collect(metricChan) - wg.Done() + case collector := <-checkedCollectors: + collector.Collect(checkedMetricChan) + case collector := <-uncheckedCollectors: + collector.Collect(uncheckedMetricChan) default: return } + wg.Done() } } @@ -418,51 +437,94 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { go collectWorker() goroutineBudget-- - // Close the metricChan once all collectors are collected. + // Close checkedMetricChan and uncheckedMetricChan once all collectors + // are collected. go func() { wg.Wait() - close(metricChan) + close(checkedMetricChan) + close(uncheckedMetricChan) }() - // Drain metricChan in case of premature return. + // Drain checkedMetricChan and uncheckedMetricChan in case of premature return. defer func() { - for range metricChan { + if checkedMetricChan != nil { + for range checkedMetricChan { + } + } + if uncheckedMetricChan != nil { + for range uncheckedMetricChan { + } } }() -collectLoop: + // Copy the channel references so we can nil them out later to remove + // them from the select statements below. + cmc := checkedMetricChan + umc := uncheckedMetricChan + for { select { - case metric, ok := <-metricChan: + case metric, ok := <-cmc: if !ok { - // metricChan is closed, we are done. - break collectLoop + cmc = nil + break } errs.Append(processMetric( metric, metricFamiliesByName, metricHashes, registeredDescIDs, )) + case metric, ok := <-umc: + if !ok { + umc = nil + break + } + errs.Append(processMetric( + metric, metricFamiliesByName, + metricHashes, + nil, + )) default: - if goroutineBudget <= 0 || len(collectors) == 0 { + if goroutineBudget <= 0 || len(checkedCollectors)+len(uncheckedCollectors) == 0 { // All collectors are already being worked on or // we have already as many goroutines started as - // there are collectors. Just process metrics - // from now on. - for metric := range metricChan { + // there are collectors. Do the same as above, + // just without the default. + select { + case metric, ok := <-cmc: + if !ok { + cmc = nil + break + } errs.Append(processMetric( metric, metricFamiliesByName, metricHashes, registeredDescIDs, )) + case metric, ok := <-umc: + if !ok { + umc = nil + break + } + errs.Append(processMetric( + metric, metricFamiliesByName, + metricHashes, + nil, + )) } - break collectLoop + break } // Start more workers. go collectWorker() goroutineBudget-- runtime.Gosched() } + // Once both checkedMetricChan and uncheckdMetricChan are closed + // and drained, the contraption above will nil out cmc and umc, + // and then we can leave the collect loop here. + if cmc == nil && umc == nil { + break + } } return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap() } diff --git a/prometheus/registry_test.go b/prometheus/registry_test.go index 653df4f..10d4314 100644 --- a/prometheus/registry_test.go +++ b/prometheus/registry_test.go @@ -34,7 +34,22 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +// uncheckedCollector wraps a Collector but its Describe method yields no Desc. +type uncheckedCollector struct { + c prometheus.Collector +} + +func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {} +func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) { + u.c.Collect(c) +} + func testHandler(t testing.TB) { + // TODO(beorn7): This test is a bit too "end-to-end". It tests quite a + // few moving parts that are not strongly coupled. They could/should be + // tested separately. However, the changes planned for v0.10 will + // require a major rework of this test anyway, at which time I will + // structure it in a better way. metricVec := prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -496,6 +511,18 @@ collected metric's label constname is not utf8: "\xff" externalMetricFamilyWithInvalidLabelValue, }, }, + { // 17 + headers: map[string]string{ + "Accept": "application/json", + }, + out: output{ + headers: map[string]string{ + "Content-Type": `text/plain; version=0.0.4; charset=utf-8`, + }, + body: expectedMetricFamilyAsText, + }, + collector: uncheckedCollector{metricVec}, + }, } for i, scenario := range scenarios { registry := prometheus.NewPedanticRegistry() @@ -510,7 +537,7 @@ collected metric's label constname is not utf8: "\xff" } if scenario.collector != nil { - registry.Register(scenario.collector) + registry.MustRegister(scenario.collector) } writer := httptest.NewRecorder() handler := prometheus.InstrumentHandler("prometheus", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))