Make the Go 1.17 collector thread-safe (#969)

This commit is contained in:
Michael Knyszek 2022-01-21 02:34:45 -05:00 committed by Kemal Akkoyun
parent 08a53e57a2
commit 772b89389c
3 changed files with 55 additions and 7 deletions

View File

@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) {
func (c *selfCollector) Collect(ch chan<- Metric) { func (c *selfCollector) Collect(ch chan<- Metric) {
ch <- c.self ch <- c.self
} }
// collectorMetric is a metric that is also a collector.
// Because of selfCollector, most (if not all) Metrics in
// this package are also collectors.
type collectorMetric interface {
Metric
Collector
}

View File

@ -32,9 +32,10 @@ type goCollector struct {
base baseGoCollector base baseGoCollector
// rm... fields all pertain to the runtime/metrics package. // rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample rmSampleMap map[string]*metrics.Sample
rmMetrics []Metric rmMetrics []collectorMetric
// With Go 1.17, the runtime/metrics package was introduced. // With Go 1.17, the runtime/metrics package was introduced.
// From that point on, metric names produced by the runtime/metrics // From that point on, metric names produced by the runtime/metrics
@ -58,7 +59,7 @@ func NewGoCollector() Collector {
} }
// Generate a Desc and ValueType for each runtime/metrics metric. // Generate a Desc and ValueType for each runtime/metrics metric.
metricSet := make([]Metric, 0, len(descriptions)) metricSet := make([]collectorMetric, 0, len(descriptions))
sampleBuf := make([]metrics.Sample, 0, len(descriptions)) sampleBuf := make([]metrics.Sample, 0, len(descriptions))
sampleMap := make(map[string]*metrics.Sample, len(descriptions)) sampleMap := make(map[string]*metrics.Sample, len(descriptions))
for i := range descriptions { for i := range descriptions {
@ -76,7 +77,7 @@ func NewGoCollector() Collector {
sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name}) sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name})
sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1] sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1]
var m Metric var m collectorMetric
if d.Kind == metrics.KindFloat64Histogram { if d.Kind == metrics.KindFloat64Histogram {
_, hasSum := rmExactSumMap[d.Name] _, hasSum := rmExactSumMap[d.Name]
m = newBatchHistogram( m = newBatchHistogram(
@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// Collect base non-memory metrics. // Collect base non-memory metrics.
c.base.Collect(ch) c.base.Collect(ch)
// Collect must be thread-safe, so prevent concurrent use of
// rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats.
//
// Note that we cannot simply read and then clone rmSampleBuf
// because we'd need to perform a deep clone of it, which is likely
// not worth it.
c.rmSampleMu.Lock()
// Populate runtime/metrics sample buffer. // Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf) metrics.Read(c.rmSampleBuf)
// Update all our metrics from rmSampleBuf.
for i, sample := range c.rmSampleBuf { for i, sample := range c.rmSampleBuf {
// N.B. switch on concrete type because it's significantly more efficient // N.B. switch on concrete type because it's significantly more efficient
// than checking for the Counter and Gauge interface implementations. In // than checking for the Counter and Gauge interface implementations. In
@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 { if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get()) m.Add(unwrapScalarRMValue(sample.Value) - m.get())
} }
m.Collect(ch)
case *gauge: case *gauge:
m.Set(unwrapScalarRMValue(sample.Value)) m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram: case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name)) m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default: default:
panic("unexpected metric type") panic("unexpected metric type")
} }
} }
// ms is a dummy MemStats that we populate ourselves so that we can // ms is a dummy MemStats that we populate ourselves so that we can
// populate the old metrics from it. // populate the old metrics from it.
var ms runtime.MemStats var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap) memStatsFromRM(&ms, c.rmSampleMap)
c.rmSampleMu.Unlock()
// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics { for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms)) ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
} }

View File

@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) {
t.Log("where X is the Go version you are currently using") t.Log("where X is the Go version you are currently using")
} }
} }
func TestGoCollectorConcurrency(t *testing.T) {
c := NewGoCollector().(*goCollector)
// Set up multiple goroutines to Collect from the
// same GoCollector. In race mode with GOMAXPROCS > 1,
// this test should fail often if Collect is not
// concurrent-safe.
for i := 0; i < 4; i++ {
go func() {
ch := make(chan Metric)
go func() {
// Drain all metrics recieved until the
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}()
}
}