Introduce unchecked Collectors

Fixes #47 . See there for more detailed discussion.

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2018-07-09 01:03:22 +02:00
parent d6a9817c4a
commit ad1b9f7754
4 changed files with 155 additions and 45 deletions

View File

@ -29,24 +29,35 @@ type Collector interface {
// collected by this Collector to the provided channel and returns once // collected by this Collector to the provided channel and returns once
// the last descriptor has been sent. The sent descriptors fulfill the // the last descriptor has been sent. The sent descriptors fulfill the
// consistency and uniqueness requirements described in the Desc // consistency and uniqueness requirements described in the Desc
// documentation. (It is valid if one and the same Collector sends // documentation.
// duplicate descriptors. Those duplicates are simply ignored. However, //
// two different Collectors must not send duplicate descriptors.) This // It is valid if one and the same Collector sends duplicate
// method idempotently sends the same descriptors throughout the // descriptors. Those duplicates are simply ignored. However, two
// lifetime of the Collector. If a Collector encounters an error while // different Collectors must not send duplicate descriptors.
// executing this method, it must send an invalid descriptor (created //
// with NewInvalidDesc) to signal the error to the registry. // Sending no descriptor at all marks the Collector as “unchecked”,
// i.e. no checks will be performed at registration time, and the
// Collector may yield any Metric it sees fit in its Collect method.
//
// This method idempotently sends the same descriptors throughout the
// lifetime of the Collector.
//
// If a Collector encounters an error while executing this method, it
// must send an invalid descriptor (created with NewInvalidDesc) to
// signal the error to the registry.
Describe(chan<- *Desc) Describe(chan<- *Desc)
// Collect is called by the Prometheus registry when collecting // Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the // metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent. The // provided channel and returns once the last metric has been sent. The
// descriptor of each sent metric is one of those returned by // descriptor of each sent metric is one of those returned by Describe
// Describe. Returned metrics that share the same descriptor must differ // (unless the Collector is unchecked, see above). Returned metrics that
// in their variable label values. This method may be called // share the same descriptor must differ in their variable label
// concurrently and must therefore be implemented in a concurrency safe // values.
// way. Blocking occurs at the expense of total performance of rendering //
// all registered metrics. Ideally, Collector implementations support // This method may be called concurrently and must therefore be
// concurrent readers. // implemented in a concurrency safe way. Blocking occurs at the expense
// of total performance of rendering all registered metrics. Ideally,
// Collector implementations support concurrent readers.
Collect(chan<- Metric) Collect(chan<- Metric)
} }

View File

@ -121,7 +121,17 @@
// NewConstSummary (and their respective Must… versions). That will happen in // NewConstSummary (and their respective Must… versions). That will happen in
// the Collect method. The Describe method has to return separate Desc // the Collect method. The Describe method has to return separate Desc
// instances, representative of the “throw-away” metrics to be created later. // instances, representative of the “throw-away” metrics to be created later.
// NewDesc comes in handy to create those Desc instances. // NewDesc comes in handy to create those Desc instances. Alternatively, you
// could return no Desc at all, which will marke the Collector “unchecked”. No
// checks are porformed at registration time, but metric consistency will still
// be ensured at scrape time, i.e. any inconsistencies will lead to scrape
// errors. Thus, with unchecked Collectors, the responsibility to not collect
// metrics that lead to inconsistencies in the total scrape result lies with the
// implementer of the Collector. While this is not a desirable state, it is
// sometimes necessary. The typical use case is a situatios where the exact
// metrics to be returned by a Collector cannot be predicted at registration
// time, but the implementer has sufficient knowledge of the whole system to
// guarantee metric consistency.
// //
// The Collector example illustrates the use case. You can also look at the // The Collector example illustrates the use case. You can also look at the
// source code of the processCollector (mirroring process metrics), the // source code of the processCollector (mirroring process metrics), the

View File

@ -15,7 +15,6 @@ package prometheus
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"os" "os"
"runtime" "runtime"
@ -68,7 +67,8 @@ func NewRegistry() *Registry {
// NewPedanticRegistry returns a registry that checks during collection if each // NewPedanticRegistry returns a registry that checks during collection if each
// collected Metric is consistent with its reported Desc, and if the Desc has // collected Metric is consistent with its reported Desc, and if the Desc has
// actually been registered with the registry. // actually been registered with the registry. Unchecked Collectors (those whose
// Describe methed does not yield any descriptors) are excluded from the check.
// //
// Usually, a Registry will be happy as long as the union of all collected // Usually, a Registry will be happy as long as the union of all collected
// Metrics is consistent and valid even if some metrics are not consistent with // Metrics is consistent and valid even if some metrics are not consistent with
@ -98,6 +98,14 @@ type Registerer interface {
// returned error is an instance of AlreadyRegisteredError, which // returned error is an instance of AlreadyRegisteredError, which
// contains the previously registered Collector. // contains the previously registered Collector.
// //
// A Collector whose Describe method does not yield any Desc is treated
// as unchecked. Registration will always succeed. No check for
// re-registering (see previous paragraph) is performed. Thus, the
// caller is responsible for not double-registering the same unchecked
// Collector, and for providing a Collector that will not cause
// inconsistent metrics on collection. (This would lead to scrape
// errors.)
//
// It is in general not safe to register the same Collector multiple // It is in general not safe to register the same Collector multiple
// times concurrently. // times concurrently.
Register(Collector) error Register(Collector) error
@ -108,7 +116,9 @@ type Registerer interface {
// Unregister unregisters the Collector that equals the Collector passed // Unregister unregisters the Collector that equals the Collector passed
// in as an argument. (Two Collectors are considered equal if their // in as an argument. (Two Collectors are considered equal if their
// Describe method yields the same set of descriptors.) The function // Describe method yields the same set of descriptors.) The function
// returns whether a Collector was unregistered. // returns whether a Collector was unregistered. Note that an unchecked
// Collector cannot be unregistered (as its Describe method does not
// yield any descriptor).
// //
// Note that even after unregistering, it will not be possible to // Note that even after unregistering, it will not be possible to
// register a new Collector that is inconsistent with the unregistered // register a new Collector that is inconsistent with the unregistered
@ -243,6 +253,7 @@ type Registry struct {
collectorsByID map[uint64]Collector // ID is a hash of the descIDs. collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
descIDs map[uint64]struct{} descIDs map[uint64]struct{}
dimHashesByName map[string]uint64 dimHashesByName map[string]uint64
uncheckedCollectors []Collector
pedanticChecksEnabled bool pedanticChecksEnabled bool
} }
@ -300,9 +311,10 @@ func (r *Registry) Register(c Collector) error {
} }
} }
} }
// Did anything happen at all? // A Collector yielding no Desc at all is considered unchecked.
if len(newDescIDs) == 0 { if len(newDescIDs) == 0 {
return errors.New("collector has no descriptors") r.uncheckedCollectors = append(r.uncheckedCollectors, c)
return nil
} }
if existing, exists := r.collectorsByID[collectorID]; exists { if existing, exists := r.collectorsByID[collectorID]; exists {
return AlreadyRegisteredError{ return AlreadyRegisteredError{
@ -376,7 +388,8 @@ func (r *Registry) MustRegister(cs ...Collector) {
// Gather implements Gatherer. // Gather implements Gatherer.
func (r *Registry) Gather() ([]*dto.MetricFamily, error) { func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
var ( var (
metricChan = make(chan Metric, capMetricChan) checkedMetricChan = make(chan Metric, capMetricChan)
uncheckedMetricChan = make(chan Metric, capMetricChan)
metricHashes = map[uint64]struct{}{} metricHashes = map[uint64]struct{}{}
wg sync.WaitGroup wg sync.WaitGroup
errs MultiError // The collected errors to return in the end. errs MultiError // The collected errors to return in the end.
@ -384,11 +397,15 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
) )
r.mtx.RLock() r.mtx.RLock()
goroutineBudget := len(r.collectorsByID) goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors)
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
collectors := make(chan Collector, len(r.collectorsByID)) checkedCollectors := make(chan Collector, len(r.collectorsByID))
uncheckedCollectors := make(chan Collector, len(r.uncheckedCollectors))
for _, collector := range r.collectorsByID { for _, collector := range r.collectorsByID {
collectors <- collector checkedCollectors <- collector
}
for _, collector := range r.uncheckedCollectors {
uncheckedCollectors <- collector
} }
// In case pedantic checks are enabled, we have to copy the map before // In case pedantic checks are enabled, we have to copy the map before
// giving up the RLock. // giving up the RLock.
@ -405,12 +422,14 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
collectWorker := func() { collectWorker := func() {
for { for {
select { select {
case collector := <-collectors: case collector := <-checkedCollectors:
collector.Collect(metricChan) collector.Collect(checkedMetricChan)
wg.Done() case collector := <-uncheckedCollectors:
collector.Collect(uncheckedMetricChan)
default: default:
return return
} }
wg.Done()
} }
} }
@ -418,51 +437,94 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
go collectWorker() go collectWorker()
goroutineBudget-- goroutineBudget--
// Close the metricChan once all collectors are collected. // Close checkedMetricChan and uncheckedMetricChan once all collectors
// are collected.
go func() { go func() {
wg.Wait() wg.Wait()
close(metricChan) close(checkedMetricChan)
close(uncheckedMetricChan)
}() }()
// Drain metricChan in case of premature return. // Drain checkedMetricChan and uncheckedMetricChan in case of premature return.
defer func() { defer func() {
for range metricChan { if checkedMetricChan != nil {
for range checkedMetricChan {
}
}
if uncheckedMetricChan != nil {
for range uncheckedMetricChan {
}
} }
}() }()
collectLoop: // Copy the channel references so we can nil them out later to remove
// them from the select statements below.
cmc := checkedMetricChan
umc := uncheckedMetricChan
for { for {
select { select {
case metric, ok := <-metricChan: case metric, ok := <-cmc:
if !ok { if !ok {
// metricChan is closed, we are done. cmc = nil
break collectLoop break
} }
errs.Append(processMetric( errs.Append(processMetric(
metric, metricFamiliesByName, metric, metricFamiliesByName,
metricHashes, metricHashes,
registeredDescIDs, registeredDescIDs,
)) ))
case metric, ok := <-umc:
if !ok {
umc = nil
break
}
errs.Append(processMetric(
metric, metricFamiliesByName,
metricHashes,
nil,
))
default: default:
if goroutineBudget <= 0 || len(collectors) == 0 { if goroutineBudget <= 0 || len(checkedCollectors)+len(uncheckedCollectors) == 0 {
// All collectors are already being worked on or // All collectors are already being worked on or
// we have already as many goroutines started as // we have already as many goroutines started as
// there are collectors. Just process metrics // there are collectors. Do the same as above,
// from now on. // just without the default.
for metric := range metricChan { select {
case metric, ok := <-cmc:
if !ok {
cmc = nil
break
}
errs.Append(processMetric( errs.Append(processMetric(
metric, metricFamiliesByName, metric, metricFamiliesByName,
metricHashes, metricHashes,
registeredDescIDs, registeredDescIDs,
)) ))
case metric, ok := <-umc:
if !ok {
umc = nil
break
} }
break collectLoop errs.Append(processMetric(
metric, metricFamiliesByName,
metricHashes,
nil,
))
}
break
} }
// Start more workers. // Start more workers.
go collectWorker() go collectWorker()
goroutineBudget-- goroutineBudget--
runtime.Gosched() runtime.Gosched()
} }
// Once both checkedMetricChan and uncheckdMetricChan are closed
// and drained, the contraption above will nil out cmc and umc,
// and then we can leave the collect loop here.
if cmc == nil && umc == nil {
break
}
} }
return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap() return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
} }

View File

@ -34,7 +34,22 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
) )
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
type uncheckedCollector struct {
c prometheus.Collector
}
func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {}
func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
u.c.Collect(c)
}
func testHandler(t testing.TB) { func testHandler(t testing.TB) {
// TODO(beorn7): This test is a bit too "end-to-end". It tests quite a
// few moving parts that are not strongly coupled. They could/should be
// tested separately. However, the changes planned for v0.10 will
// require a major rework of this test anyway, at which time I will
// structure it in a better way.
metricVec := prometheus.NewCounterVec( metricVec := prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -496,6 +511,18 @@ collected metric's label constname is not utf8: "\xff"
externalMetricFamilyWithInvalidLabelValue, externalMetricFamilyWithInvalidLabelValue,
}, },
}, },
{ // 17
headers: map[string]string{
"Accept": "application/json",
},
out: output{
headers: map[string]string{
"Content-Type": `text/plain; version=0.0.4; charset=utf-8`,
},
body: expectedMetricFamilyAsText,
},
collector: uncheckedCollector{metricVec},
},
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {
registry := prometheus.NewPedanticRegistry() registry := prometheus.NewPedanticRegistry()
@ -510,7 +537,7 @@ collected metric's label constname is not utf8: "\xff"
} }
if scenario.collector != nil { if scenario.collector != nil {
registry.Register(scenario.collector) registry.MustRegister(scenario.collector)
} }
writer := httptest.NewRecorder() writer := httptest.NewRecorder()
handler := prometheus.InstrumentHandler("prometheus", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})) handler := prometheus.InstrumentHandler("prometheus", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))