Allow error reporting during metrics collection and simplify Register().

Both are interface changes I want to get in before public
announcement. They only break rare usage cases, and are always easy to
fix, but still we want to avoid breaking changes after a wider
announcement of the project.

The change of Register() simply removes the return of the Collector,
which nobody was using in practice. It was just bloating the call
syntax. Note that this is different from RegisterOrGet(), which is
used at various occasions where you want to register something that
might or might not be registered already, but if it is, you want the
previously registered Collector back (because that's the relevant
one).

WRT error reporting: I first tried the obvious way of letting the
Collector methods Describe() and Collect() return error. However, I
had to conclude that that bloated _many_ calls and their handling in
very obnoxious ways. On the other hand, the case where you actually
want to report errors during registration or collection is very
rare. Hence, this approach has the wrong trade-off. The approach taken
here might at first appear clunky but is in practice quite handy,
mostly because there is almost no change for the "normal" case of "no
special error handling", but also because it plays well with the way
descriptors and metrics are handled (via channels).

Explaining the approach in more detail:

- During registration / describe: Error handling was actually already
  in place (for invalid descriptors, which carry an error anyway). I
  only added a convenience function to create an invalid descriptor
  with a given error on purpose.

- Metrics are now treated in a similar way. The Write method returns
  an error now (the only change in interface). An "invalid metric" is
  provided that can be sent via the channel to signal that that metric
  could not be collected. It alse transports an error.

NON-GOALS OF THIS COMMIT:

This is NOT yet the major improvement of the whole registry part,
where we want a public Registry interface and plenty of modular
configurations (for error handling, various auto-metrics, http
instrumentation, testing, ...). However, we can do that whole thing
without breaking existing interfaces. For now (which is a significant
issue) any error during collection will either cause a 500 HTTP
response or a panic (depending on registry config). Later, we
definitely want to have a possibility to skip (and only report
somehow) non-collectible metrics instead of aborting the whole scrape.
This commit is contained in:
Bjoern Rabenstein 2015-01-12 19:16:09 +01:00
parent af0ec0d3f1
commit 159e96f6c7
11 changed files with 123 additions and 66 deletions

View File

@ -33,7 +33,8 @@ type Collector interface {
// duplicate descriptors. Those duplicates are simply ignored. However, // duplicate descriptors. Those duplicates are simply ignored. However,
// two different Collectors must not send duplicate descriptors.) This // two different Collectors must not send duplicate descriptors.) This
// method idempotently sends the same descriptors throughout the // method idempotently sends the same descriptors throughout the
// lifetime of the Collector. // lifetime of the Collector. A Collector unable to describe itself must
// send an invalid descriptor (created with NewInvalidDesc).
Describe(chan<- *Desc) Describe(chan<- *Desc)
// Collect is called by Prometheus when collecting metrics. The // Collect is called by Prometheus when collecting metrics. The
// implementation sends each collected metric via the provided channel // implementation sends each collected metric via the provided channel

View File

@ -166,6 +166,16 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) *
return d return d
} }
// NewInvalidDesc returns an invalid descriptor, i.e. a descriptor with the
// provided error set. If a collector returning such a descriptor is registered,
// registration will fail with the provided error. NewInvalidDesc can be used by
// a Collector to signal inability to describe itself.
func NewInvalidDesc(err error) *Desc {
return &Desc{
err: err,
}
}
func (d *Desc) String() string { func (d *Desc) String() string {
lpStrings := make([]string, 0, len(d.constLabelPairs)) lpStrings := make([]string, 0, len(d.constLabelPairs))
for _, lp := range d.constLabelPairs { for _, lp := range d.constLabelPairs {

View File

@ -49,8 +49,9 @@ func (cm *CallbackMetric) Desc() *prometheus.Desc {
return cm.desc return cm.desc
} }
func (cm *CallbackMetric) Write(m *dto.Metric) { func (cm *CallbackMetric) Write(m *dto.Metric) error {
m.Untyped = &dto.Untyped{Value: proto.Float64(cm.callback())} m.Untyped = &dto.Untyped{Value: proto.Float64(cm.callback())}
return nil
} }
func ExampleSelfCollector() { func ExampleSelfCollector() {

View File

@ -74,7 +74,7 @@ func ExampleGaugeVec() {
} }
func ExampleGaugeFunc() { func ExampleGaugeFunc() {
if _, err := prometheus.Register(prometheus.NewGaugeFunc( if err := prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Subsystem: "runtime", Subsystem: "runtime",
Name: "goroutines_count", Name: "goroutines_count",
@ -95,7 +95,7 @@ func ExampleCounter() {
pushCounter := prometheus.NewCounter(prometheus.CounterOpts{ pushCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "repository_pushes", // Note: No help string... Name: "repository_pushes", // Note: No help string...
}) })
_, err := prometheus.Register(pushCounter) // ... so this will return an error. err := prometheus.Register(pushCounter) // ... so this will return an error.
if err != nil { if err != nil {
fmt.Println("Push counter couldn't be registered, no counting will happen:", err) fmt.Println("Push counter couldn't be registered, no counting will happen:", err)
return return
@ -106,7 +106,7 @@ func ExampleCounter() {
Name: "repository_pushes", Name: "repository_pushes",
Help: "Number of pushes to external repository.", Help: "Number of pushes to external repository.",
}) })
_, err = prometheus.Register(pushCounter) err = prometheus.Register(pushCounter)
if err != nil { if err != nil {
fmt.Println("Push counter couldn't be registered AGAIN, no counting will happen:", err) fmt.Println("Push counter couldn't be registered AGAIN, no counting will happen:", err)
return return
@ -194,7 +194,7 @@ func ExampleRegister() {
Help: "Total number of tasks completed.", Help: "Total number of tasks completed.",
}) })
// This will register fine. // This will register fine.
if _, err := prometheus.Register(taskCounter); err != nil { if err := prometheus.Register(taskCounter); err != nil {
fmt.Println(err) fmt.Println(err)
} else { } else {
fmt.Println("taskCounter registered.") fmt.Println("taskCounter registered.")
@ -219,7 +219,7 @@ func ExampleRegister() {
) )
// Registering will fail because we already have a metric of that name. // Registering will fail because we already have a metric of that name.
if _, err := prometheus.Register(taskCounterVec); err != nil { if err := prometheus.Register(taskCounterVec); err != nil {
fmt.Println("taskCounterVec not registered:", err) fmt.Println("taskCounterVec not registered:", err)
} else { } else {
fmt.Println("taskCounterVec registered.") fmt.Println("taskCounterVec registered.")
@ -231,7 +231,7 @@ func ExampleRegister() {
} }
// Try registering taskCounterVec again. // Try registering taskCounterVec again.
if _, err := prometheus.Register(taskCounterVec); err != nil { if err := prometheus.Register(taskCounterVec); err != nil {
fmt.Println("taskCounterVec not registered:", err) fmt.Println("taskCounterVec not registered:", err)
} else { } else {
fmt.Println("taskCounterVec registered.") fmt.Println("taskCounterVec registered.")
@ -253,7 +253,7 @@ func ExampleRegister() {
}, },
[]string{"worker_id"}, []string{"worker_id"},
) )
if _, err := prometheus.Register(taskCounterVec); err != nil { if err := prometheus.Register(taskCounterVec); err != nil {
fmt.Println("taskCounterVec not registered:", err) fmt.Println("taskCounterVec not registered:", err)
} else { } else {
fmt.Println("taskCounterVec registered.") fmt.Println("taskCounterVec registered.")
@ -299,7 +299,7 @@ func ExampleRegister() {
ConstLabels: prometheus.Labels{"worker_id": "42"}, ConstLabels: prometheus.Labels{"worker_id": "42"},
} }
taskCounterForWorker42 := prometheus.NewCounter(counterOpts) taskCounterForWorker42 := prometheus.NewCounter(counterOpts)
if _, err := prometheus.Register(taskCounterForWorker42); err != nil { if err := prometheus.Register(taskCounterForWorker42); err != nil {
fmt.Println("taskCounterVForWorker42 not registered:", err) fmt.Println("taskCounterVForWorker42 not registered:", err)
} else { } else {
fmt.Println("taskCounterForWorker42 registered.") fmt.Println("taskCounterForWorker42 registered.")
@ -313,7 +313,7 @@ func ExampleRegister() {
// counterOpts. Just change the ConstLabels. // counterOpts. Just change the ConstLabels.
counterOpts.ConstLabels = prometheus.Labels{"worker_id": "2001"} counterOpts.ConstLabels = prometheus.Labels{"worker_id": "2001"}
taskCounterForWorker2001 := prometheus.NewCounter(counterOpts) taskCounterForWorker2001 := prometheus.NewCounter(counterOpts)
if _, err := prometheus.Register(taskCounterForWorker2001); err != nil { if err := prometheus.Register(taskCounterForWorker2001); err != nil {
fmt.Println("taskCounterVForWorker2001 not registered:", err) fmt.Println("taskCounterVForWorker2001 not registered:", err)
} else { } else {
fmt.Println("taskCounterForWorker2001 registered.") fmt.Println("taskCounterForWorker2001 registered.")

View File

@ -82,36 +82,38 @@ func (e *ExpvarCollector) Collect(ch chan<- Metric) {
} }
var v interface{} var v interface{}
labels := make([]string, len(desc.variableLabels)) labels := make([]string, len(desc.variableLabels))
if err := json.Unmarshal([]byte(expVar.String()), &v); err == nil { if err := json.Unmarshal([]byte(expVar.String()), &v); err != nil {
var processValue func(v interface{}, i int) ch <- NewInvalidMetric(desc, err)
processValue = func(v interface{}, i int) { continue
if i >= len(labels) {
copiedLabels := append(make([]string, 0, len(labels)), labels...)
switch v := v.(type) {
case float64:
m = MustNewConstMetric(desc, UntypedValue, v, copiedLabels...)
case bool:
if v {
m = MustNewConstMetric(desc, UntypedValue, 1, copiedLabels...)
} else {
m = MustNewConstMetric(desc, UntypedValue, 0, copiedLabels...)
}
default:
return
}
ch <- m
return
}
vm, ok := v.(map[string]interface{})
if !ok {
return
}
for lv, val := range vm {
labels[i] = lv
processValue(val, i+1)
}
}
processValue(v, 0)
} }
var processValue func(v interface{}, i int)
processValue = func(v interface{}, i int) {
if i >= len(labels) {
copiedLabels := append(make([]string, 0, len(labels)), labels...)
switch v := v.(type) {
case float64:
m = MustNewConstMetric(desc, UntypedValue, v, copiedLabels...)
case bool:
if v {
m = MustNewConstMetric(desc, UntypedValue, 1, copiedLabels...)
} else {
m = MustNewConstMetric(desc, UntypedValue, 0, copiedLabels...)
}
default:
return
}
ch <- m
return
}
vm, ok := v.(map[string]interface{})
if !ok {
return
}
for lv, val := range vm {
labels[i] = lv
processValue(val, i+1)
}
}
processValue(v, 0)
} }
} }

View File

@ -31,7 +31,7 @@ func TestGoCollector(t *testing.T) {
select { select {
case metric := <-ch: case metric := <-ch:
switch m := metric.(type) { switch m := metric.(type) {
// Attention, this als catches Counter ... // Attention, this also catches Counter...
case Gauge: case Gauge:
pb := &dto.Metric{} pb := &dto.Metric{}
m.Write(pb) m.Write(pb)
@ -43,7 +43,7 @@ func TestGoCollector(t *testing.T) {
} }
if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 { if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 {
t.Errorf("want 1 new goroutine, got %f", diff) t.Errorf("want 1 new goroutine, got %d", diff)
} }
return return

View File

@ -27,7 +27,9 @@ import (
type Metric interface { type Metric interface {
// Desc returns the descriptor for the Metric. This method idempotently // Desc returns the descriptor for the Metric. This method idempotently
// returns the same descriptor throughout the lifetime of the // returns the same descriptor throughout the lifetime of the
// Metric. The returned descriptor is immutable by contract. // Metric. The returned descriptor is immutable by contract. A Metric
// unable to describe itself must return an invalid descriptor (created
// with NewInvalidDesc).
Desc() *Desc Desc() *Desc
// Write encodes the Metric into a "Metric" Protocol Buffer data // Write encodes the Metric into a "Metric" Protocol Buffer data
// transmission object. // transmission object.
@ -46,7 +48,7 @@ type Metric interface {
// //
// While populating dto.Metric, labels must be sorted lexicographically. // While populating dto.Metric, labels must be sorted lexicographically.
// (Implementers may find LabelPairSorter useful for that.) // (Implementers may find LabelPairSorter useful for that.)
Write(*dto.Metric) Write(*dto.Metric) error
} }
// Opts bundles the options for creating most Metric types. Each metric // Opts bundles the options for creating most Metric types. Each metric
@ -144,3 +146,19 @@ func (s hashSorter) Swap(i, j int) {
func (s hashSorter) Less(i, j int) bool { func (s hashSorter) Less(i, j int) bool {
return s[i] < s[j] return s[i] < s[j]
} }
type invalidMetric struct {
desc *Desc
err error
}
// NewInvalidMetric returns a metric whose Write method always returns the
// provided error. It is useful if a Collector finds itself unable to collect
// a metric and wishes to report an error to the registry.
func NewInvalidMetric(desc *Desc, err error) Metric {
return &invalidMetric{desc, err}
}
func (m *invalidMetric) Desc() *Desc { return m.desc }
func (m *invalidMetric) Write(*dto.Metric) error { return m.err }

View File

@ -92,10 +92,23 @@ func noopCollect(ch chan<- Metric) {}
func (c *processCollector) procfsCollect(ch chan<- Metric) { func (c *processCollector) procfsCollect(ch chan<- Metric) {
p, err := procfs.NewProc(c.pidFn()) p, err := procfs.NewProc(c.pidFn())
if err != nil { if err != nil {
// Report collect errors for all metrics.
ch <- NewInvalidMetric(c.cpuTotal.Desc(), err)
ch <- NewInvalidMetric(c.openFDs.Desc(), err)
ch <- NewInvalidMetric(c.maxFDs.Desc(), err)
ch <- NewInvalidMetric(c.vsize.Desc(), err)
ch <- NewInvalidMetric(c.rss.Desc(), err)
ch <- NewInvalidMetric(c.startTime.Desc(), err)
return return
} }
if stat, err := p.NewStat(); err == nil { if stat, err := p.NewStat(); err != nil {
// Report collect errors for metrics depending on stat.
ch <- NewInvalidMetric(c.vsize.Desc(), err)
ch <- NewInvalidMetric(c.rss.Desc(), err)
ch <- NewInvalidMetric(c.startTime.Desc(), err)
ch <- NewInvalidMetric(c.cpuTotal.Desc(), err)
} else {
c.cpuTotal.Set(stat.CPUTime()) c.cpuTotal.Set(stat.CPUTime())
ch <- c.cpuTotal ch <- c.cpuTotal
c.vsize.Set(float64(stat.VirtualMemory())) c.vsize.Set(float64(stat.VirtualMemory()))
@ -103,18 +116,24 @@ func (c *processCollector) procfsCollect(ch chan<- Metric) {
c.rss.Set(float64(stat.ResidentMemory())) c.rss.Set(float64(stat.ResidentMemory()))
ch <- c.rss ch <- c.rss
if startTime, err := stat.StartTime(); err == nil { if startTime, err := stat.StartTime(); err != nil {
ch <- NewInvalidMetric(c.startTime.Desc(), err)
} else {
c.startTime.Set(startTime) c.startTime.Set(startTime)
ch <- c.startTime ch <- c.startTime
} }
} }
if fds, err := p.FileDescriptorsLen(); err == nil { if fds, err := p.FileDescriptorsLen(); err != nil {
ch <- NewInvalidMetric(c.openFDs.Desc(), err)
} else {
c.openFDs.Set(float64(fds)) c.openFDs.Set(float64(fds))
ch <- c.openFDs ch <- c.openFDs
} }
if limits, err := p.NewLimits(); err == nil { if limits, err := p.NewLimits(); err != nil {
ch <- NewInvalidMetric(c.maxFDs.Desc(), err)
} else {
c.maxFDs.Set(float64(limits.OpenFiles)) c.maxFDs.Set(float64(limits.OpenFiles))
ch <- c.maxFDs ch <- c.maxFDs
} }

View File

@ -105,24 +105,23 @@ func UninstrumentedHandler() http.Handler {
// returns an error if the descriptors provided by the Collector are invalid or // returns an error if the descriptors provided by the Collector are invalid or
// if they - in combination with descriptors of already registered Collectors - // if they - in combination with descriptors of already registered Collectors -
// do not fulfill the consistency and uniqueness criteria described in the Desc // do not fulfill the consistency and uniqueness criteria described in the Desc
// documentation. If the registration is successful, the registered Collector // documentation.
// is returned.
// //
// Do not register the same Collector multiple times concurrently. (Registering // Do not register the same Collector multiple times concurrently. (Registering
// the same Collector twice would result in an error anyway, but on top of that, // the same Collector twice would result in an error anyway, but on top of that,
// it is not safe to do so concurrently.) // it is not safe to do so concurrently.)
func Register(m Collector) (Collector, error) { func Register(m Collector) error {
return defRegistry.Register(m) _, err := defRegistry.Register(m)
return err
} }
// MustRegister works like Register but panics where Register would have // MustRegister works like Register but panics where Register would have
// returned an error. // returned an error.
func MustRegister(m Collector) Collector { func MustRegister(m Collector) {
m, err := Register(m) err := Register(m)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return m
} }
// RegisterOrGet works like Register but does not return an error if a Collector // RegisterOrGet works like Register but does not return an error if a Collector
@ -447,7 +446,12 @@ func (r *registry) writePB(w io.Writer, writeEncoded encoder) (int, error) {
} }
dtoMetric := r.getMetric() dtoMetric := r.getMetric()
defer r.giveMetric(dtoMetric) defer r.giveMetric(dtoMetric)
metric.Write(dtoMetric) if err := metric.Write(dtoMetric); err != nil {
// TODO: Consider different means of error reporting so
// that a single erroneous metric could be skipped
// instead of blowing up the whole collection.
return 0, fmt.Errorf("error collecting metric %v: %s", desc, err)
}
switch { switch {
case metricFamily.Type != nil: case metricFamily.Type != nil:
// Type already set. We are good. // Type already set. We are good.

View File

@ -242,7 +242,7 @@ func (s *summary) Observe(v float64) {
} }
} }
func (s *summary) Write(out *dto.Metric) { func (s *summary) Write(out *dto.Metric) error {
sum := &dto.Summary{} sum := &dto.Summary{}
qs := make([]*dto.Quantile, 0, len(s.objectives)) qs := make([]*dto.Quantile, 0, len(s.objectives))
@ -277,6 +277,7 @@ func (s *summary) Write(out *dto.Metric) {
out.Summary = sum out.Summary = sum
out.Label = s.labelPairs out.Label = s.labelPairs
return nil
} }
func (s *summary) newStream() *quantile.Stream { func (s *summary) newStream() *quantile.Stream {

View File

@ -98,12 +98,12 @@ func (v *value) Sub(val float64) {
v.Add(val * -1) v.Add(val * -1)
} }
func (v *value) Write(out *dto.Metric) { func (v *value) Write(out *dto.Metric) error {
v.mtx.RLock() v.mtx.RLock()
val := v.val val := v.val
v.mtx.RUnlock() v.mtx.RUnlock()
populateMetric(v.valType, val, v.labelPairs, out) return populateMetric(v.valType, val, v.labelPairs, out)
} }
// valueFunc is a generic metric for simple values retrieved on collect time // valueFunc is a generic metric for simple values retrieved on collect time
@ -141,8 +141,8 @@ func (v *valueFunc) Desc() *Desc {
return v.desc return v.desc
} }
func (v *valueFunc) Write(out *dto.Metric) { func (v *valueFunc) Write(out *dto.Metric) error {
populateMetric(v.valType, v.function(), v.labelPairs, out) return populateMetric(v.valType, v.function(), v.labelPairs, out)
} }
// NewConstMetric returns a metric with one fixed value that cannot be // NewConstMetric returns a metric with one fixed value that cannot be
@ -184,8 +184,8 @@ func (m *constMetric) Desc() *Desc {
return m.desc return m.desc
} }
func (m *constMetric) Write(out *dto.Metric) { func (m *constMetric) Write(out *dto.Metric) error {
populateMetric(m.valType, m.val, m.labelPairs, out) return populateMetric(m.valType, m.val, m.labelPairs, out)
} }
func populateMetric( func populateMetric(
@ -193,7 +193,7 @@ func populateMetric(
v float64, v float64,
labelPairs []*dto.LabelPair, labelPairs []*dto.LabelPair,
m *dto.Metric, m *dto.Metric,
) { ) error {
m.Label = labelPairs m.Label = labelPairs
switch t { switch t {
case CounterValue: case CounterValue:
@ -203,8 +203,9 @@ func populateMetric(
case UntypedValue: case UntypedValue:
m.Untyped = &dto.Untyped{Value: proto.Float64(v)} m.Untyped = &dto.Untyped{Value: proto.Float64(v)}
default: default:
panic(fmt.Errorf("encountered unknown type %v", t)) return fmt.Errorf("encountered unknown type %v", t)
} }
return nil
} }
func makeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair { func makeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair {