Merge pull request #456 from prometheus/beorn7/process_collector

Rework process collector
This commit is contained in:
Björn Rabenstein 2018-09-07 12:25:42 +02:00 committed by GitHub
commit 7858729281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 30 deletions

View File

@ -13,11 +13,17 @@
package prometheus package prometheus
import "github.com/prometheus/procfs" import (
"errors"
"os"
"github.com/prometheus/procfs"
)
type processCollector struct { type processCollector struct {
collectFn func(chan<- Metric) collectFn func(chan<- Metric)
pidFn func() (int, error) pidFn func() (int, error)
reportErrors bool
cpuTotal *Desc cpuTotal *Desc
openFDs, maxFDs *Desc openFDs, maxFDs *Desc
vsize, maxVsize *Desc vsize, maxVsize *Desc
@ -25,34 +31,56 @@ type processCollector struct {
startTime *Desc startTime *Desc
} }
// ProcessCollectorOpts defines the behavior of a process metrics collector
// created with NewProcessCollector.
type ProcessCollectorOpts struct {
// PidFn returns the PID of the process the collector collects metrics
// for. It is called upon each collection. By default, the PID of the
// current process is used, as determined on construction time by
// calling os.Getpid().
PidFn func() (int, error)
// If non-empty, each of the collected metrics is prefixed by the
// provided string and an underscore ("_").
Namespace string
// If true, any error encountered during collection is reported as an
// invalid metric (see NewInvalidMetric). Otherwise, errors are ignored
// and the collected metrics will be incomplete. (Possibly, no metrics
// will be collected at all.) While that's usually not desired, it is
// appropriate for the common "mix-in" of process metrics, where process
// metrics are nice to have, but failing to collect them should not
// disrupt the collection of the remaining metrics.
ReportErrors bool
}
// NewProcessCollector returns a collector which exports the current state of // NewProcessCollector returns a collector which exports the current state of
// process metrics including CPU, memory and file descriptor usage as well as // process metrics including CPU, memory and file descriptor usage as well as
// the process start time for the given process ID under the given namespace. // the process start time. The detailed behavior is defined by the provided
// ProcessCollectorOpts. The zero value of ProcessCollectorOpts creates a
// collector for the current process with an empty namespace string and no error
// reporting.
// //
// Currently, the collector depends on a Linux-style proc filesystem and // Currently, the collector depends on a Linux-style proc filesystem and
// therefore only exports metrics for Linux. // therefore only exports metrics for Linux.
func NewProcessCollector(pid int, namespace string) Collector { //
return NewProcessCollectorPIDFn( // Note: An older version of this function had the following signature:
func() (int, error) { return pid, nil }, //
namespace, // NewProcessCollector(pid int, namespace string) Collector
) //
} // Most commonly, it was called as
//
// NewProcessCollectorPIDFn works like NewProcessCollector but the process ID is // NewProcessCollector(os.Getpid(), "")
// determined on each collect anew by calling the given pidFn function. //
func NewProcessCollectorPIDFn( // The following call of the current version is equivalent to the above:
pidFn func() (int, error), //
namespace string, // NewProcessCollector(ProcessCollectorOpts{})
) Collector { func NewProcessCollector(opts ProcessCollectorOpts) Collector {
ns := "" ns := ""
if len(namespace) > 0 { if len(opts.Namespace) > 0 {
ns = namespace + "_" ns = opts.Namespace + "_"
} }
c := processCollector{ c := &processCollector{
pidFn: pidFn, reportErrors: opts.ReportErrors,
collectFn: func(chan<- Metric) {},
cpuTotal: NewDesc( cpuTotal: NewDesc(
ns+"process_cpu_seconds_total", ns+"process_cpu_seconds_total",
"Total user and system CPU time spent in seconds.", "Total user and system CPU time spent in seconds.",
@ -90,12 +118,23 @@ func NewProcessCollectorPIDFn(
), ),
} }
if opts.PidFn == nil {
pid := os.Getpid()
c.pidFn = func() (int, error) { return pid, nil }
} else {
c.pidFn = opts.PidFn
}
// Set up process metric collection if supported by the runtime. // Set up process metric collection if supported by the runtime.
if _, err := procfs.NewStat(); err == nil { if _, err := procfs.NewStat(); err == nil {
c.collectFn = c.processCollect c.collectFn = c.processCollect
} else {
c.collectFn = func(ch chan<- Metric) {
c.reportError(ch, nil, errors.New("process metrics not supported on this platform"))
}
} }
return &c return c
} }
// Describe returns all descriptions of the collector. // Describe returns all descriptions of the collector.
@ -114,16 +153,16 @@ func (c *processCollector) Collect(ch chan<- Metric) {
c.collectFn(ch) c.collectFn(ch)
} }
// TODO(ts): Bring back error reporting by reverting 7faf9e7 as soon as the
// client allows users to configure the error behavior.
func (c *processCollector) processCollect(ch chan<- Metric) { func (c *processCollector) processCollect(ch chan<- Metric) {
pid, err := c.pidFn() pid, err := c.pidFn()
if err != nil { if err != nil {
c.reportError(ch, nil, err)
return return
} }
p, err := procfs.NewProc(pid) p, err := procfs.NewProc(pid)
if err != nil { if err != nil {
c.reportError(ch, nil, err)
return return
} }
@ -133,15 +172,33 @@ func (c *processCollector) processCollect(ch chan<- Metric) {
ch <- MustNewConstMetric(c.rss, GaugeValue, float64(stat.ResidentMemory())) ch <- MustNewConstMetric(c.rss, GaugeValue, float64(stat.ResidentMemory()))
if startTime, err := stat.StartTime(); err == nil { if startTime, err := stat.StartTime(); err == nil {
ch <- MustNewConstMetric(c.startTime, GaugeValue, startTime) ch <- MustNewConstMetric(c.startTime, GaugeValue, startTime)
} else {
c.reportError(ch, c.startTime, err)
} }
} else {
c.reportError(ch, nil, err)
} }
if fds, err := p.FileDescriptorsLen(); err == nil { if fds, err := p.FileDescriptorsLen(); err == nil {
ch <- MustNewConstMetric(c.openFDs, GaugeValue, float64(fds)) ch <- MustNewConstMetric(c.openFDs, GaugeValue, float64(fds))
} else {
c.reportError(ch, c.openFDs, err)
} }
if limits, err := p.NewLimits(); err == nil { if limits, err := p.NewLimits(); err == nil {
ch <- MustNewConstMetric(c.maxFDs, GaugeValue, float64(limits.OpenFiles)) ch <- MustNewConstMetric(c.maxFDs, GaugeValue, float64(limits.OpenFiles))
ch <- MustNewConstMetric(c.maxVsize, GaugeValue, float64(limits.AddressSpace)) ch <- MustNewConstMetric(c.maxVsize, GaugeValue, float64(limits.AddressSpace))
} else {
c.reportError(ch, nil, err)
} }
} }
func (c *processCollector) reportError(ch chan<- Metric, desc *Desc, err error) {
if !c.reportErrors {
return
}
if desc == nil {
desc = NewInvalidDesc(err)
}
ch <- NewInvalidMetric(desc, err)
}

View File

@ -17,12 +17,15 @@ package prometheus
import ( import (
"bytes" "bytes"
"errors"
"os" "os"
"regexp" "regexp"
"testing" "testing"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/procfs" "github.com/prometheus/procfs"
dto "github.com/prometheus/client_model/go"
) )
func TestProcessCollector(t *testing.T) { func TestProcessCollector(t *testing.T) {
@ -31,12 +34,14 @@ func TestProcessCollector(t *testing.T) {
} }
registry := NewRegistry() registry := NewRegistry()
if err := registry.Register(NewProcessCollector(os.Getpid(), "")); err != nil { if err := registry.Register(NewProcessCollector(ProcessCollectorOpts{})); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := registry.Register(NewProcessCollectorPIDFn( if err := registry.Register(NewProcessCollector(ProcessCollectorOpts{
func() (int, error) { return os.Getpid(), nil }, "foobar"), PidFn: func() (int, error) { return os.Getpid(), nil },
); err != nil { Namespace: "foobar",
ReportErrors: true, // No errors expected, just to see if none are reported.
})); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -72,4 +77,27 @@ func TestProcessCollector(t *testing.T) {
t.Errorf("want body to match %s\n%s", re, buf.String()) t.Errorf("want body to match %s\n%s", re, buf.String())
} }
} }
brokenProcessCollector := NewProcessCollector(ProcessCollectorOpts{
PidFn: func() (int, error) { return 0, errors.New("boo") },
ReportErrors: true,
})
ch := make(chan Metric)
go func() {
brokenProcessCollector.Collect(ch)
close(ch)
}()
n := 0
for m := range ch {
n++
pb := &dto.Metric{}
err := m.Write(pb)
if err == nil {
t.Error("metric collected from broken process collector is unexpectedly valid")
}
}
if n != 1 {
t.Errorf("%d metrics collected, want 1", n)
}
} }

View File

@ -16,7 +16,6 @@ package prometheus
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"os"
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
@ -54,7 +53,7 @@ var (
) )
func init() { func init() {
MustRegister(NewProcessCollector(os.Getpid(), "")) MustRegister(NewProcessCollector(ProcessCollectorOpts{}))
MustRegister(NewGoCollector()) MustRegister(NewGoCollector())
} }