From f63e219e6b9074f8a55c8475e7b11720bdfc3737 Mon Sep 17 00:00:00 2001 From: Michael Knyszek Date: Fri, 21 Jan 2022 02:34:45 -0500 Subject: [PATCH] Make the Go 1.17 collector thread-safe (#969) --- prometheus/collector.go | 8 +++++++ prometheus/go_collector_go117.go | 32 +++++++++++++++++++++------ prometheus/go_collector_go117_test.go | 22 ++++++++++++++++++ 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/prometheus/collector.go b/prometheus/collector.go index 1e83965..ac1ca3c 100644 --- a/prometheus/collector.go +++ b/prometheus/collector.go @@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) { func (c *selfCollector) Collect(ch chan<- Metric) { ch <- c.self } + +// collectorMetric is a metric that is also a collector. +// Because of selfCollector, most (if not all) Metrics in +// this package are also collectors. +type collectorMetric interface { + Metric + Collector +} diff --git a/prometheus/go_collector_go117.go b/prometheus/go_collector_go117.go index d534742..5046840 100644 --- a/prometheus/go_collector_go117.go +++ b/prometheus/go_collector_go117.go @@ -32,9 +32,10 @@ type goCollector struct { base baseGoCollector // rm... fields all pertain to the runtime/metrics package. + rmSampleMu sync.Mutex rmSampleBuf []metrics.Sample rmSampleMap map[string]*metrics.Sample - rmMetrics []Metric + rmMetrics []collectorMetric // With Go 1.17, the runtime/metrics package was introduced. // From that point on, metric names produced by the runtime/metrics @@ -58,7 +59,7 @@ func NewGoCollector() Collector { } // Generate a Desc and ValueType for each runtime/metrics metric. - metricSet := make([]Metric, 0, len(descriptions)) + metricSet := make([]collectorMetric, 0, len(descriptions)) sampleBuf := make([]metrics.Sample, 0, len(descriptions)) sampleMap := make(map[string]*metrics.Sample, len(descriptions)) for i := range descriptions { @@ -76,7 +77,7 @@ func NewGoCollector() Collector { sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name}) sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1] - var m Metric + var m collectorMetric if d.Kind == metrics.KindFloat64Histogram { _, hasSum := rmExactSumMap[d.Name] m = newBatchHistogram( @@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) { // Collect base non-memory metrics. c.base.Collect(ch) + // Collect must be thread-safe, so prevent concurrent use of + // rmSampleBuf. Just read into rmSampleBuf but write all the data + // we get into our Metrics or MemStats. + // + // Note that we cannot simply read and then clone rmSampleBuf + // because we'd need to perform a deep clone of it, which is likely + // not worth it. + c.rmSampleMu.Lock() + // Populate runtime/metrics sample buffer. metrics.Read(c.rmSampleBuf) + // Update all our metrics from rmSampleBuf. for i, sample := range c.rmSampleBuf { // N.B. switch on concrete type because it's significantly more efficient // than checking for the Counter and Gauge interface implementations. In @@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) { if v1 > v0 { m.Add(unwrapScalarRMValue(sample.Value) - m.get()) } - m.Collect(ch) case *gauge: m.Set(unwrapScalarRMValue(sample.Value)) - m.Collect(ch) case *batchHistogram: m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name)) - m.Collect(ch) default: panic("unexpected metric type") } } - // ms is a dummy MemStats that we populate ourselves so that we can // populate the old metrics from it. var ms runtime.MemStats memStatsFromRM(&ms, c.rmSampleMap) + + c.rmSampleMu.Unlock() + + // Export all the metrics to ch. + // At this point we must not access rmSampleBuf or rmSampleMap, because + // a concurrent caller could use it. It's safe to Collect all our Metrics, + // however, because they're updated in a thread-safe way while MemStats + // is local to this call of Collect. + for _, m := range c.rmMetrics { + m.Collect(ch) + } for _, i := range c.msMetrics { ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms)) } diff --git a/prometheus/go_collector_go117_test.go b/prometheus/go_collector_go117_test.go index 653e332..e780bce 100644 --- a/prometheus/go_collector_go117_test.go +++ b/prometheus/go_collector_go117_test.go @@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) { t.Log("where X is the Go version you are currently using") } } + +func TestGoCollectorConcurrency(t *testing.T) { + c := NewGoCollector().(*goCollector) + + // Set up multiple goroutines to Collect from the + // same GoCollector. In race mode with GOMAXPROCS > 1, + // this test should fail often if Collect is not + // concurrent-safe. + for i := 0; i < 4; i++ { + go func() { + ch := make(chan Metric) + go func() { + // Drain all metrics recieved until the + // channel is closed. + for range ch { + } + }() + c.Collect(ch) + close(ch) + }() + } +}