Merge pull request #45 from prometheus/beorn7/allow-collect-errors

Allow error reporting during metrics collection and simplify Register().
This commit is contained in:
Björn Rabenstein 2015-01-13 16:53:16 +01:00
commit aa848f77db
11 changed files with 124 additions and 66 deletions

View File

@ -33,7 +33,9 @@ type Collector interface {
// duplicate descriptors. Those duplicates are simply ignored. However,
// two different Collectors must not send duplicate descriptors.) This
// method idempotently sends the same descriptors throughout the
// lifetime of the Collector.
// lifetime of the Collector. If a Collector encounters an error while
// executing this method, it must send an invalid descriptor (created
// with NewInvalidDesc) to signal the error to the registry.
Describe(chan<- *Desc)
// Collect is called by Prometheus when collecting metrics. The
// implementation sends each collected metric via the provided channel

View File

@ -166,6 +166,16 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) *
return d
}
// NewInvalidDesc returns an invalid descriptor, i.e. a descriptor with the
// provided error set. If a collector returning such a descriptor is registered,
// registration will fail with the provided error. NewInvalidDesc can be used by
// a Collector to signal inability to describe itself.
func NewInvalidDesc(err error) *Desc {
return &Desc{
err: err,
}
}
func (d *Desc) String() string {
lpStrings := make([]string, 0, len(d.constLabelPairs))
for _, lp := range d.constLabelPairs {

View File

@ -49,8 +49,9 @@ func (cm *CallbackMetric) Desc() *prometheus.Desc {
return cm.desc
}
func (cm *CallbackMetric) Write(m *dto.Metric) {
func (cm *CallbackMetric) Write(m *dto.Metric) error {
m.Untyped = &dto.Untyped{Value: proto.Float64(cm.callback())}
return nil
}
func ExampleSelfCollector() {

View File

@ -74,7 +74,7 @@ func ExampleGaugeVec() {
}
func ExampleGaugeFunc() {
if _, err := prometheus.Register(prometheus.NewGaugeFunc(
if err := prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Subsystem: "runtime",
Name: "goroutines_count",
@ -95,7 +95,7 @@ func ExampleCounter() {
pushCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "repository_pushes", // Note: No help string...
})
_, err := prometheus.Register(pushCounter) // ... so this will return an error.
err := prometheus.Register(pushCounter) // ... so this will return an error.
if err != nil {
fmt.Println("Push counter couldn't be registered, no counting will happen:", err)
return
@ -106,7 +106,7 @@ func ExampleCounter() {
Name: "repository_pushes",
Help: "Number of pushes to external repository.",
})
_, err = prometheus.Register(pushCounter)
err = prometheus.Register(pushCounter)
if err != nil {
fmt.Println("Push counter couldn't be registered AGAIN, no counting will happen:", err)
return
@ -194,7 +194,7 @@ func ExampleRegister() {
Help: "Total number of tasks completed.",
})
// This will register fine.
if _, err := prometheus.Register(taskCounter); err != nil {
if err := prometheus.Register(taskCounter); err != nil {
fmt.Println(err)
} else {
fmt.Println("taskCounter registered.")
@ -219,7 +219,7 @@ func ExampleRegister() {
)
// Registering will fail because we already have a metric of that name.
if _, err := prometheus.Register(taskCounterVec); err != nil {
if err := prometheus.Register(taskCounterVec); err != nil {
fmt.Println("taskCounterVec not registered:", err)
} else {
fmt.Println("taskCounterVec registered.")
@ -231,7 +231,7 @@ func ExampleRegister() {
}
// Try registering taskCounterVec again.
if _, err := prometheus.Register(taskCounterVec); err != nil {
if err := prometheus.Register(taskCounterVec); err != nil {
fmt.Println("taskCounterVec not registered:", err)
} else {
fmt.Println("taskCounterVec registered.")
@ -253,7 +253,7 @@ func ExampleRegister() {
},
[]string{"worker_id"},
)
if _, err := prometheus.Register(taskCounterVec); err != nil {
if err := prometheus.Register(taskCounterVec); err != nil {
fmt.Println("taskCounterVec not registered:", err)
} else {
fmt.Println("taskCounterVec registered.")
@ -299,7 +299,7 @@ func ExampleRegister() {
ConstLabels: prometheus.Labels{"worker_id": "42"},
}
taskCounterForWorker42 := prometheus.NewCounter(counterOpts)
if _, err := prometheus.Register(taskCounterForWorker42); err != nil {
if err := prometheus.Register(taskCounterForWorker42); err != nil {
fmt.Println("taskCounterVForWorker42 not registered:", err)
} else {
fmt.Println("taskCounterForWorker42 registered.")
@ -313,7 +313,7 @@ func ExampleRegister() {
// counterOpts. Just change the ConstLabels.
counterOpts.ConstLabels = prometheus.Labels{"worker_id": "2001"}
taskCounterForWorker2001 := prometheus.NewCounter(counterOpts)
if _, err := prometheus.Register(taskCounterForWorker2001); err != nil {
if err := prometheus.Register(taskCounterForWorker2001); err != nil {
fmt.Println("taskCounterVForWorker2001 not registered:", err)
} else {
fmt.Println("taskCounterForWorker2001 registered.")

View File

@ -82,36 +82,38 @@ func (e *ExpvarCollector) Collect(ch chan<- Metric) {
}
var v interface{}
labels := make([]string, len(desc.variableLabels))
if err := json.Unmarshal([]byte(expVar.String()), &v); err == nil {
var processValue func(v interface{}, i int)
processValue = func(v interface{}, i int) {
if i >= len(labels) {
copiedLabels := append(make([]string, 0, len(labels)), labels...)
switch v := v.(type) {
case float64:
m = MustNewConstMetric(desc, UntypedValue, v, copiedLabels...)
case bool:
if v {
m = MustNewConstMetric(desc, UntypedValue, 1, copiedLabels...)
} else {
m = MustNewConstMetric(desc, UntypedValue, 0, copiedLabels...)
}
default:
return
}
ch <- m
return
}
vm, ok := v.(map[string]interface{})
if !ok {
return
}
for lv, val := range vm {
labels[i] = lv
processValue(val, i+1)
}
}
processValue(v, 0)
if err := json.Unmarshal([]byte(expVar.String()), &v); err != nil {
ch <- NewInvalidMetric(desc, err)
continue
}
var processValue func(v interface{}, i int)
processValue = func(v interface{}, i int) {
if i >= len(labels) {
copiedLabels := append(make([]string, 0, len(labels)), labels...)
switch v := v.(type) {
case float64:
m = MustNewConstMetric(desc, UntypedValue, v, copiedLabels...)
case bool:
if v {
m = MustNewConstMetric(desc, UntypedValue, 1, copiedLabels...)
} else {
m = MustNewConstMetric(desc, UntypedValue, 0, copiedLabels...)
}
default:
return
}
ch <- m
return
}
vm, ok := v.(map[string]interface{})
if !ok {
return
}
for lv, val := range vm {
labels[i] = lv
processValue(val, i+1)
}
}
processValue(v, 0)
}
}

View File

@ -31,7 +31,7 @@ func TestGoCollector(t *testing.T) {
select {
case metric := <-ch:
switch m := metric.(type) {
// Attention, this als catches Counter ...
// Attention, this also catches Counter...
case Gauge:
pb := &dto.Metric{}
m.Write(pb)
@ -43,7 +43,7 @@ func TestGoCollector(t *testing.T) {
}
if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 {
t.Errorf("want 1 new goroutine, got %f", diff)
t.Errorf("want 1 new goroutine, got %d", diff)
}
return

View File

@ -27,7 +27,9 @@ import (
type Metric interface {
// Desc returns the descriptor for the Metric. This method idempotently
// returns the same descriptor throughout the lifetime of the
// Metric. The returned descriptor is immutable by contract.
// Metric. The returned descriptor is immutable by contract. A Metric
// unable to describe itself must return an invalid descriptor (created
// with NewInvalidDesc).
Desc() *Desc
// Write encodes the Metric into a "Metric" Protocol Buffer data
// transmission object.
@ -46,7 +48,7 @@ type Metric interface {
//
// While populating dto.Metric, labels must be sorted lexicographically.
// (Implementers may find LabelPairSorter useful for that.)
Write(*dto.Metric)
Write(*dto.Metric) error
}
// Opts bundles the options for creating most Metric types. Each metric
@ -144,3 +146,19 @@ func (s hashSorter) Swap(i, j int) {
func (s hashSorter) Less(i, j int) bool {
return s[i] < s[j]
}
type invalidMetric struct {
desc *Desc
err error
}
// NewInvalidMetric returns a metric whose Write method always returns the
// provided error. It is useful if a Collector finds itself unable to collect
// a metric and wishes to report an error to the registry.
func NewInvalidMetric(desc *Desc, err error) Metric {
return &invalidMetric{desc, err}
}
func (m *invalidMetric) Desc() *Desc { return m.desc }
func (m *invalidMetric) Write(*dto.Metric) error { return m.err }

View File

@ -92,10 +92,23 @@ func noopCollect(ch chan<- Metric) {}
func (c *processCollector) procfsCollect(ch chan<- Metric) {
p, err := procfs.NewProc(c.pidFn())
if err != nil {
// Report collect errors for all metrics.
ch <- NewInvalidMetric(c.cpuTotal.Desc(), err)
ch <- NewInvalidMetric(c.openFDs.Desc(), err)
ch <- NewInvalidMetric(c.maxFDs.Desc(), err)
ch <- NewInvalidMetric(c.vsize.Desc(), err)
ch <- NewInvalidMetric(c.rss.Desc(), err)
ch <- NewInvalidMetric(c.startTime.Desc(), err)
return
}
if stat, err := p.NewStat(); err == nil {
if stat, err := p.NewStat(); err != nil {
// Report collect errors for metrics depending on stat.
ch <- NewInvalidMetric(c.vsize.Desc(), err)
ch <- NewInvalidMetric(c.rss.Desc(), err)
ch <- NewInvalidMetric(c.startTime.Desc(), err)
ch <- NewInvalidMetric(c.cpuTotal.Desc(), err)
} else {
c.cpuTotal.Set(stat.CPUTime())
ch <- c.cpuTotal
c.vsize.Set(float64(stat.VirtualMemory()))
@ -103,18 +116,24 @@ func (c *processCollector) procfsCollect(ch chan<- Metric) {
c.rss.Set(float64(stat.ResidentMemory()))
ch <- c.rss
if startTime, err := stat.StartTime(); err == nil {
if startTime, err := stat.StartTime(); err != nil {
ch <- NewInvalidMetric(c.startTime.Desc(), err)
} else {
c.startTime.Set(startTime)
ch <- c.startTime
}
}
if fds, err := p.FileDescriptorsLen(); err == nil {
if fds, err := p.FileDescriptorsLen(); err != nil {
ch <- NewInvalidMetric(c.openFDs.Desc(), err)
} else {
c.openFDs.Set(float64(fds))
ch <- c.openFDs
}
if limits, err := p.NewLimits(); err == nil {
if limits, err := p.NewLimits(); err != nil {
ch <- NewInvalidMetric(c.maxFDs.Desc(), err)
} else {
c.maxFDs.Set(float64(limits.OpenFiles))
ch <- c.maxFDs
}

View File

@ -105,24 +105,23 @@ func UninstrumentedHandler() http.Handler {
// returns an error if the descriptors provided by the Collector are invalid or
// if they - in combination with descriptors of already registered Collectors -
// do not fulfill the consistency and uniqueness criteria described in the Desc
// documentation. If the registration is successful, the registered Collector
// is returned.
// documentation.
//
// Do not register the same Collector multiple times concurrently. (Registering
// the same Collector twice would result in an error anyway, but on top of that,
// it is not safe to do so concurrently.)
func Register(m Collector) (Collector, error) {
return defRegistry.Register(m)
func Register(m Collector) error {
_, err := defRegistry.Register(m)
return err
}
// MustRegister works like Register but panics where Register would have
// returned an error.
func MustRegister(m Collector) Collector {
m, err := Register(m)
func MustRegister(m Collector) {
err := Register(m)
if err != nil {
panic(err)
}
return m
}
// RegisterOrGet works like Register but does not return an error if a Collector
@ -447,7 +446,12 @@ func (r *registry) writePB(w io.Writer, writeEncoded encoder) (int, error) {
}
dtoMetric := r.getMetric()
defer r.giveMetric(dtoMetric)
metric.Write(dtoMetric)
if err := metric.Write(dtoMetric); err != nil {
// TODO: Consider different means of error reporting so
// that a single erroneous metric could be skipped
// instead of blowing up the whole collection.
return 0, fmt.Errorf("error collecting metric %v: %s", desc, err)
}
switch {
case metricFamily.Type != nil:
// Type already set. We are good.

View File

@ -242,7 +242,7 @@ func (s *summary) Observe(v float64) {
}
}
func (s *summary) Write(out *dto.Metric) {
func (s *summary) Write(out *dto.Metric) error {
sum := &dto.Summary{}
qs := make([]*dto.Quantile, 0, len(s.objectives))
@ -277,6 +277,7 @@ func (s *summary) Write(out *dto.Metric) {
out.Summary = sum
out.Label = s.labelPairs
return nil
}
func (s *summary) newStream() *quantile.Stream {

View File

@ -98,12 +98,12 @@ func (v *value) Sub(val float64) {
v.Add(val * -1)
}
func (v *value) Write(out *dto.Metric) {
func (v *value) Write(out *dto.Metric) error {
v.mtx.RLock()
val := v.val
v.mtx.RUnlock()
populateMetric(v.valType, val, v.labelPairs, out)
return populateMetric(v.valType, val, v.labelPairs, out)
}
// valueFunc is a generic metric for simple values retrieved on collect time
@ -141,8 +141,8 @@ func (v *valueFunc) Desc() *Desc {
return v.desc
}
func (v *valueFunc) Write(out *dto.Metric) {
populateMetric(v.valType, v.function(), v.labelPairs, out)
func (v *valueFunc) Write(out *dto.Metric) error {
return populateMetric(v.valType, v.function(), v.labelPairs, out)
}
// NewConstMetric returns a metric with one fixed value that cannot be
@ -184,8 +184,8 @@ func (m *constMetric) Desc() *Desc {
return m.desc
}
func (m *constMetric) Write(out *dto.Metric) {
populateMetric(m.valType, m.val, m.labelPairs, out)
func (m *constMetric) Write(out *dto.Metric) error {
return populateMetric(m.valType, m.val, m.labelPairs, out)
}
func populateMetric(
@ -193,7 +193,7 @@ func populateMetric(
v float64,
labelPairs []*dto.LabelPair,
m *dto.Metric,
) {
) error {
m.Label = labelPairs
switch t {
case CounterValue:
@ -203,8 +203,9 @@ func populateMetric(
case UntypedValue:
m.Untyped = &dto.Untyped{Value: proto.Float64(v)}
default:
panic(fmt.Errorf("encountered unknown type %v", t))
return fmt.Errorf("encountered unknown type %v", t)
}
return nil
}
func makeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair {