From dbd48d666b7cab8fdc1c5fea89ad1db67f79ad03 Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Wed, 10 Dec 2014 18:01:46 -0500 Subject: [PATCH 1/2] Add ProcessCollector and GoCollector This change adds two new collectors to the prometheus package which export metrics about a given or the current process. * ProcessCollector exports metrics about cpu time, vss, rss, fd usage as well as the start time of a given process. * GoCollector exports currently only the number of active goroutines. --- prometheus/go_collector.go | 31 +++++++ prometheus/go_collector_test.go | 57 +++++++++++++ prometheus/process_collector.go | 121 +++++++++++++++++++++++++++ prometheus/process_collector_test.go | 54 ++++++++++++ 4 files changed, 263 insertions(+) create mode 100644 prometheus/go_collector.go create mode 100644 prometheus/go_collector_test.go create mode 100644 prometheus/process_collector.go create mode 100644 prometheus/process_collector_test.go diff --git a/prometheus/go_collector.go b/prometheus/go_collector.go new file mode 100644 index 0000000..d7b7a20 --- /dev/null +++ b/prometheus/go_collector.go @@ -0,0 +1,31 @@ +package prometheus + +import ( + "runtime" +) + +type goCollector struct { + goroutines Gauge +} + +// NewGoCollector returns a collector which exports metrics about the current +// go process. +func NewGoCollector() *goCollector { + return &goCollector{ + goroutines: NewGauge(GaugeOpts{ + Name: "process_goroutines", + Help: "Number of goroutines that currently exist.", + }), + } +} + +// Describe returns all descriptions of the collector. +func (c *goCollector) Describe(ch chan<- *Desc) { + ch <- c.goroutines.Desc() +} + +// Collect returns the current state of all metrics of the collector. +func (c *goCollector) Collect(ch chan<- Metric) { + c.goroutines.Set(float64(runtime.NumGoroutine())) + ch <- c.goroutines +} diff --git a/prometheus/go_collector_test.go b/prometheus/go_collector_test.go new file mode 100644 index 0000000..60af3b5 --- /dev/null +++ b/prometheus/go_collector_test.go @@ -0,0 +1,57 @@ +package prometheus + +import ( + "reflect" + "testing" + "time" + + dto "github.com/prometheus/client_model/go" +) + +func TestGoCollector(t *testing.T) { + var ( + c = NewGoCollector() + ch = make(chan Metric) + waitc = make(chan struct{}) + closec = make(chan struct{}) + old = -1 + ) + defer close(closec) + + go func() { + c.Collect(ch) + go func(c <-chan struct{}) { + <-c + }(closec) + <-waitc + c.Collect(ch) + }() + + for { + select { + case metric := <-ch: + switch m := metric.(type) { + // Attention, this als catches Counter ... + case Gauge: + pb := &dto.Metric{} + m.Write(pb) + + if old == -1 { + old = int(pb.GetGauge().GetValue()) + close(waitc) + continue + } + + if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 { + t.Errorf("want 1 new goroutine, got %f", diff) + } + + return + default: + t.Errorf("want type Gauge, got %s", reflect.TypeOf(metric)) + } + case <-time.After(1 * time.Second): + t.Fatalf("expected collect timed out") + } + } +} diff --git a/prometheus/process_collector.go b/prometheus/process_collector.go new file mode 100644 index 0000000..60c5912 --- /dev/null +++ b/prometheus/process_collector.go @@ -0,0 +1,121 @@ +package prometheus + +import "github.com/prometheus/procfs" + +type processCollector struct { + pid int + collectFn func(chan<- Metric) + pidFn func() int + cpuTotal Counter + openFDs, maxFDs Gauge + vsize, rss Gauge + startTime Gauge +} + +// NewProcessCollector returns a collector which exports the current state of +// process metrics including cpu, memory and file descriptor usage as well as +// the process start time for the given process id under the given namespace. +func NewProcessCollector(pid int, namespace string) *processCollector { + return NewProcessCollectorPIDFn(func() int { return pid }, namespace) +} + +// NewProcessCollectorPIDFn returns a collector which exports the current state +// of process metrics including cpu, memory and file descriptor usage as well +// as the process start time under the given namespace. The given pidFn is +// called on each collect and is used to determine the process to export +// metrics for. +func NewProcessCollectorPIDFn( + pidFn func() int, + namespace string, +) *processCollector { + c := processCollector{ + pidFn: pidFn, + collectFn: noopCollect, + + cpuTotal: NewCounter(CounterOpts{ + Namespace: namespace, + Name: "process_cpu_seconds_total", + Help: "Total user and system CPU time spent in seconds.", + }), + openFDs: NewGauge(GaugeOpts{ + Namespace: namespace, + Name: "process_open_fds", + Help: "Number of open file descriptors.", + }), + maxFDs: NewGauge(GaugeOpts{ + Namespace: namespace, + Name: "process_max_fds", + Help: "Maximum number of open file descriptors.", + }), + vsize: NewGauge(GaugeOpts{ + Namespace: namespace, + Name: "process_virtual_memory_bytes", + Help: "Virtual memory size in bytes.", + }), + rss: NewGauge(GaugeOpts{ + Namespace: namespace, + Name: "process_resident_memory_bytes", + Help: "Resident memory size in bytes.", + }), + startTime: NewGauge(GaugeOpts{ + Namespace: namespace, + Name: "process_start_time_seconds", + Help: "Start time of the process since unix epoch in seconds.", + }), + } + + // Use procfs to export metrics if available. + if _, err := procfs.NewStat(); err == nil { + c.collectFn = c.procfsCollect + } + + return &c +} + +// Describe returns all descriptions of the collector. +func (c *processCollector) Describe(ch chan<- *Desc) { + ch <- c.cpuTotal.Desc() + ch <- c.openFDs.Desc() + ch <- c.maxFDs.Desc() + ch <- c.vsize.Desc() + ch <- c.rss.Desc() + ch <- c.startTime.Desc() +} + +// Collect returns the current state of all metrics of the collector. +func (c *processCollector) Collect(ch chan<- Metric) { + c.collectFn(ch) +} + +func noopCollect(ch chan<- Metric) {} + +func (c *processCollector) procfsCollect(ch chan<- Metric) { + p, err := procfs.NewProc(c.pidFn()) + if err != nil { + return + } + + if stat, err := p.NewStat(); err == nil { + c.cpuTotal.Set(stat.CPUTime()) + ch <- c.cpuTotal + c.vsize.Set(float64(stat.VirtualMemory())) + ch <- c.vsize + c.rss.Set(float64(stat.ResidentMemory())) + ch <- c.rss + + if startTime, err := stat.StartTime(); err == nil { + c.startTime.Set(startTime) + ch <- c.startTime + } + } + + if fds, err := p.FileDescriptorsLen(); err == nil { + c.openFDs.Set(float64(fds)) + ch <- c.openFDs + } + + if limits, err := p.NewLimits(); err == nil { + c.maxFDs.Set(float64(limits.OpenFiles)) + ch <- c.maxFDs + } +} diff --git a/prometheus/process_collector_test.go b/prometheus/process_collector_test.go new file mode 100644 index 0000000..7ab5411 --- /dev/null +++ b/prometheus/process_collector_test.go @@ -0,0 +1,54 @@ +package prometheus + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "regexp" + "testing" + + "github.com/prometheus/procfs" +) + +func TestProcessCollector(t *testing.T) { + if _, err := procfs.Self(); err != nil { + t.Skipf("skipping TestProcessCollector, procfs not available: %s", err) + } + + registry := newRegistry() + registry.Register(NewProcessCollector(os.Getpid(), "")) + registry.Register(NewProcessCollectorPIDFn( + func() int { return os.Getpid() }, "foobar")) + + s := httptest.NewServer(InstrumentHandler("prometheus", registry)) + defer s.Close() + r, err := http.Get(s.URL) + if err != nil { + t.Fatal(err) + } + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + + for _, re := range []*regexp.Regexp{ + regexp.MustCompile("process_cpu_seconds_total [0-9]"), + regexp.MustCompile("process_max_fds [0-9]{2,}"), + regexp.MustCompile("process_open_fds [1-9]"), + regexp.MustCompile("process_virtual_memory_bytes [1-9]"), + regexp.MustCompile("process_resident_memory_bytes [1-9]"), + regexp.MustCompile("process_start_time_seconds [0-9.]{10,}"), + regexp.MustCompile("foobar_process_cpu_seconds_total [0-9]"), + regexp.MustCompile("foobar_process_max_fds [0-9]{2,}"), + regexp.MustCompile("foobar_process_open_fds [1-9]"), + regexp.MustCompile("foobar_process_virtual_memory_bytes [1-9]"), + regexp.MustCompile("foobar_process_resident_memory_bytes [1-9]"), + regexp.MustCompile("foobar_process_start_time_seconds [0-9.]{10,}"), + } { + if !re.Match(body) { + t.Errorf("want body to match %s\n%s", re, body) + } + } +} From d66557ae597b97ad515ff6a5b2fb8d055323b7f8 Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Wed, 10 Dec 2014 20:05:35 -0500 Subject: [PATCH 2/2] Register process and go collectors by default --- prometheus/registry.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/prometheus/registry.go b/prometheus/registry.go index 65ea520..8485f9a 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -21,12 +21,14 @@ package prometheus import ( "bytes" + "compress/gzip" "errors" "fmt" "hash/fnv" "io" "net/http" "net/url" + "os" "sort" "strings" "sync" @@ -34,7 +36,6 @@ import ( dto "github.com/prometheus/client_model/go" "code.google.com/p/goprotobuf/proto" - "compress/gzip" "github.com/prometheus/client_golang/_vendor/goautoneg" "github.com/prometheus/client_golang/model" @@ -42,7 +43,7 @@ import ( ) var ( - defRegistry = newRegistry() + defRegistry = newDefaultRegistry() errAlreadyReg = errors.New("duplicate metrics collector registration attempted") ) @@ -643,6 +644,13 @@ func newRegistry() *registry { } } +func newDefaultRegistry() *registry { + r := newRegistry() + r.Register(NewProcessCollector(os.Getpid(), "")) + r.Register(NewGoCollector()) + return r +} + func chooseEncoder(req *http.Request) (encoder, string) { accepts := goautoneg.ParseAccept(req.Header.Get(acceptHeader)) for _, accept := range accepts {