process_collector: Add Platform-Specific Describe for processCollector (#1625)

* process_collector: Add Platform-Specific Describe for processCollector

Signed-off-by: Ying WANG <ying.wang@grafana.com>

* add changelog entry

Signed-off-by: Ying WANG <ying.wang@grafana.com>

* Address comments

Signed-off-by: Ying WANG <ying.wang@grafana.com>

---------

Signed-off-by: Ying WANG <ying.wang@grafana.com>
This commit is contained in:
Ying WANG 2024-10-07 12:08:32 +02:00 committed by GitHub
parent a9c0488390
commit b2ef833442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 186 additions and 26 deletions

View File

@ -23,6 +23,7 @@ import (
type processCollector struct {
collectFn func(chan<- Metric)
describeFn func(chan<- *Desc)
pidFn func() (int, error)
reportErrors bool
cpuTotal *Desc
@ -122,26 +123,23 @@ func NewProcessCollector(opts ProcessCollectorOpts) Collector {
// Set up process metric collection if supported by the runtime.
if canCollectProcess() {
c.collectFn = c.processCollect
c.describeFn = c.describe
} else {
c.collectFn = func(ch chan<- Metric) {
c.reportError(ch, nil, errors.New("process metrics not supported on this platform"))
}
c.collectFn = c.errorCollectFn
c.describeFn = c.errorDescribeFn
}
return c
}
// Describe returns all descriptions of the collector.
func (c *processCollector) Describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.vsize
ch <- c.maxVsize
ch <- c.rss
ch <- c.startTime
ch <- c.inBytes
ch <- c.outBytes
func (c *processCollector) errorCollectFn(ch chan<- Metric) {
c.reportError(ch, nil, errors.New("process metrics not supported on this platform"))
}
func (c *processCollector) errorDescribeFn(ch chan<- *Desc) {
if c.reportErrors {
ch <- NewInvalidDesc(errors.New("process metrics not supported on this platform"))
}
}
// Collect returns the current state of all metrics of the collector.
@ -149,6 +147,11 @@ func (c *processCollector) Collect(ch chan<- Metric) {
c.collectFn(ch)
}
// Describe returns all descriptions of the collector.
func (c *processCollector) Describe(ch chan<- *Desc) {
c.describeFn(ch)
}
func (c *processCollector) reportError(ch chan<- Metric, desc *Desc, err error) {
if !c.reportErrors {
return

View File

@ -16,10 +16,11 @@ package prometheus
import (
"errors"
"fmt"
"golang.org/x/sys/unix"
"os"
"syscall"
"time"
"golang.org/x/sys/unix"
)
// notImplementedErr is returned by stub functions that replace cgo functions, when cgo
@ -68,6 +69,25 @@ func getOpenFileCount() (float64, error) {
}
}
// describe returns all descriptions of the collector for Darwin.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.maxVsize
ch <- c.startTime
/* the process could be collected but not implemented yet
ch <- c.rss
ch <- c.vsize
ch <- c.inBytes
ch <- c.outBytes
*/
}
func (c *processCollector) processCollect(ch chan<- Metric) {
if procs, err := unix.SysctlKinfoProcSlice("kern.proc.pid", os.Getpid()); err == nil {
if len(procs) == 1 {

View File

@ -20,7 +20,14 @@ func canCollectProcess() bool {
return false
}
// describe returns all descriptions of the collector for js.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) processCollect(ch chan<- Metric) {
// noop on this platform
return
c.errorCollectFn(ch)
}
func (c *processCollector) describe(ch chan<- *Desc) {
c.errorDescribeFn(ch)
}

View File

@ -78,3 +78,19 @@ func (c *processCollector) processCollect(ch chan<- Metric) {
c.reportError(ch, nil, err)
}
}
// describe returns all descriptions of the collector for others than windows, js, wasip1 and darwin.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.vsize
ch <- c.maxVsize
ch <- c.rss
ch <- c.startTime
ch <- c.inBytes
ch <- c.outBytes
}

View File

@ -170,3 +170,52 @@ func TestNewPidFileFn(t *testing.T) {
}
}
}
func TestDescribeAndCollectAlignment(t *testing.T) {
collector := &processCollector{
pidFn: getPIDFn(),
cpuTotal: NewDesc("cpu_total", "Total CPU usage", nil, nil),
openFDs: NewDesc("open_fds", "Number of open file descriptors", nil, nil),
maxFDs: NewDesc("max_fds", "Maximum file descriptors", nil, nil),
vsize: NewDesc("vsize", "Virtual memory size", nil, nil),
maxVsize: NewDesc("max_vsize", "Maximum virtual memory size", nil, nil),
rss: NewDesc("rss", "Resident Set Size", nil, nil),
startTime: NewDesc("start_time", "Process start time", nil, nil),
inBytes: NewDesc("in_bytes", "Input bytes", nil, nil),
outBytes: NewDesc("out_bytes", "Output bytes", nil, nil),
}
// Collect and get descriptors
descCh := make(chan *Desc, 15)
collector.describe(descCh)
close(descCh)
definedDescs := make(map[string]bool)
for desc := range descCh {
definedDescs[desc.String()] = true
}
// Collect and get metrics
metricsCh := make(chan Metric, 15)
collector.processCollect(metricsCh)
close(metricsCh)
collectedMetrics := make(map[string]bool)
for metric := range metricsCh {
collectedMetrics[metric.Desc().String()] = true
}
// Verify that all described metrics are collected
for desc := range definedDescs {
if !collectedMetrics[desc] {
t.Errorf("Metric %s described but not collected", desc)
}
}
// Verify that no extra metrics are collected
for desc := range collectedMetrics {
if !definedDescs[desc] {
t.Errorf("Metric %s collected but not described", desc)
}
}
}

View File

@ -20,7 +20,14 @@ func canCollectProcess() bool {
return false
}
func (*processCollector) processCollect(chan<- Metric) {
// noop on this platform
return
func (c *processCollector) processCollect(ch chan<- Metric) {
c.errorCollectFn(ch)
}
// describe returns all descriptions of the collector for wasip1.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
c.errorDescribeFn(ch)
}

View File

@ -79,14 +79,10 @@ func getProcessHandleCount(handle windows.Handle) (uint32, error) {
}
func (c *processCollector) processCollect(ch chan<- Metric) {
h, err := windows.GetCurrentProcess()
if err != nil {
c.reportError(ch, nil, err)
return
}
h := windows.CurrentProcess()
var startTime, exitTime, kernelTime, userTime windows.Filetime
err = windows.GetProcessTimes(h, &startTime, &exitTime, &kernelTime, &userTime)
err := windows.GetProcessTimes(h, &startTime, &exitTime, &kernelTime, &userTime)
if err != nil {
c.reportError(ch, nil, err)
return
@ -111,6 +107,19 @@ func (c *processCollector) processCollect(ch chan<- Metric) {
ch <- MustNewConstMetric(c.maxFDs, GaugeValue, float64(16*1024*1024)) // Windows has a hard-coded max limit, not per-process.
}
// describe returns all descriptions of the collector for windows.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.vsize
ch <- c.rss
ch <- c.startTime
}
func fileTimeToSeconds(ft windows.Filetime) float64 {
return float64(uint64(ft.HighDateTime)<<32+uint64(ft.LowDateTime)) / 1e7
}

View File

@ -68,3 +68,52 @@ func TestWindowsProcessCollector(t *testing.T) {
}
}
}
func TestWindowsDescribeAndCollectAlignment(t *testing.T) {
collector := &processCollector{
pidFn: getPIDFn(),
cpuTotal: NewDesc("cpu_total", "Total CPU usage", nil, nil),
openFDs: NewDesc("open_fds", "Number of open file descriptors", nil, nil),
maxFDs: NewDesc("max_fds", "Maximum file descriptors", nil, nil),
vsize: NewDesc("vsize", "Virtual memory size", nil, nil),
maxVsize: NewDesc("max_vsize", "Maximum virtual memory size", nil, nil),
rss: NewDesc("rss", "Resident Set Size", nil, nil),
startTime: NewDesc("start_time", "Process start time", nil, nil),
inBytes: NewDesc("in_bytes", "Input bytes", nil, nil),
outBytes: NewDesc("out_bytes", "Output bytes", nil, nil),
}
// Collect and get descriptors
descCh := make(chan *Desc, 15)
collector.describe(descCh)
close(descCh)
definedDescs := make(map[string]bool)
for desc := range descCh {
definedDescs[desc.String()] = true
}
// Collect and get metrics
metricsCh := make(chan Metric, 15)
collector.processCollect(metricsCh)
close(metricsCh)
collectedMetrics := make(map[string]bool)
for metric := range metricsCh {
collectedMetrics[metric.Desc().String()] = true
}
// Verify that all described metrics are collected
for desc := range definedDescs {
if !collectedMetrics[desc] {
t.Errorf("Metric %s described but not collected", desc)
}
}
// Verify that no extra metrics are collected
for desc := range collectedMetrics {
if !definedDescs[desc] {
t.Errorf("Metric %s collected but not described", desc)
}
}
}