Make the Go 1.17 collector thread-safe (#969)
This commit is contained in:
parent
01087964d0
commit
f63e219e6b
|
@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) {
|
|||
func (c *selfCollector) Collect(ch chan<- Metric) {
|
||||
ch <- c.self
|
||||
}
|
||||
|
||||
// collectorMetric is a metric that is also a collector.
|
||||
// Because of selfCollector, most (if not all) Metrics in
|
||||
// this package are also collectors.
|
||||
type collectorMetric interface {
|
||||
Metric
|
||||
Collector
|
||||
}
|
||||
|
|
|
@ -32,9 +32,10 @@ type goCollector struct {
|
|||
base baseGoCollector
|
||||
|
||||
// rm... fields all pertain to the runtime/metrics package.
|
||||
rmSampleMu sync.Mutex
|
||||
rmSampleBuf []metrics.Sample
|
||||
rmSampleMap map[string]*metrics.Sample
|
||||
rmMetrics []Metric
|
||||
rmMetrics []collectorMetric
|
||||
|
||||
// With Go 1.17, the runtime/metrics package was introduced.
|
||||
// From that point on, metric names produced by the runtime/metrics
|
||||
|
@ -58,7 +59,7 @@ func NewGoCollector() Collector {
|
|||
}
|
||||
|
||||
// Generate a Desc and ValueType for each runtime/metrics metric.
|
||||
metricSet := make([]Metric, 0, len(descriptions))
|
||||
metricSet := make([]collectorMetric, 0, len(descriptions))
|
||||
sampleBuf := make([]metrics.Sample, 0, len(descriptions))
|
||||
sampleMap := make(map[string]*metrics.Sample, len(descriptions))
|
||||
for i := range descriptions {
|
||||
|
@ -76,7 +77,7 @@ func NewGoCollector() Collector {
|
|||
sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name})
|
||||
sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1]
|
||||
|
||||
var m Metric
|
||||
var m collectorMetric
|
||||
if d.Kind == metrics.KindFloat64Histogram {
|
||||
_, hasSum := rmExactSumMap[d.Name]
|
||||
m = newBatchHistogram(
|
||||
|
@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) {
|
|||
// Collect base non-memory metrics.
|
||||
c.base.Collect(ch)
|
||||
|
||||
// Collect must be thread-safe, so prevent concurrent use of
|
||||
// rmSampleBuf. Just read into rmSampleBuf but write all the data
|
||||
// we get into our Metrics or MemStats.
|
||||
//
|
||||
// Note that we cannot simply read and then clone rmSampleBuf
|
||||
// because we'd need to perform a deep clone of it, which is likely
|
||||
// not worth it.
|
||||
c.rmSampleMu.Lock()
|
||||
|
||||
// Populate runtime/metrics sample buffer.
|
||||
metrics.Read(c.rmSampleBuf)
|
||||
|
||||
// Update all our metrics from rmSampleBuf.
|
||||
for i, sample := range c.rmSampleBuf {
|
||||
// N.B. switch on concrete type because it's significantly more efficient
|
||||
// than checking for the Counter and Gauge interface implementations. In
|
||||
|
@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) {
|
|||
if v1 > v0 {
|
||||
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
|
||||
}
|
||||
m.Collect(ch)
|
||||
case *gauge:
|
||||
m.Set(unwrapScalarRMValue(sample.Value))
|
||||
m.Collect(ch)
|
||||
case *batchHistogram:
|
||||
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
|
||||
m.Collect(ch)
|
||||
default:
|
||||
panic("unexpected metric type")
|
||||
}
|
||||
}
|
||||
|
||||
// ms is a dummy MemStats that we populate ourselves so that we can
|
||||
// populate the old metrics from it.
|
||||
var ms runtime.MemStats
|
||||
memStatsFromRM(&ms, c.rmSampleMap)
|
||||
|
||||
c.rmSampleMu.Unlock()
|
||||
|
||||
// Export all the metrics to ch.
|
||||
// At this point we must not access rmSampleBuf or rmSampleMap, because
|
||||
// a concurrent caller could use it. It's safe to Collect all our Metrics,
|
||||
// however, because they're updated in a thread-safe way while MemStats
|
||||
// is local to this call of Collect.
|
||||
for _, m := range c.rmMetrics {
|
||||
m.Collect(ch)
|
||||
}
|
||||
for _, i := range c.msMetrics {
|
||||
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
|
||||
}
|
||||
|
|
|
@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) {
|
|||
t.Log("where X is the Go version you are currently using")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGoCollectorConcurrency(t *testing.T) {
|
||||
c := NewGoCollector().(*goCollector)
|
||||
|
||||
// Set up multiple goroutines to Collect from the
|
||||
// same GoCollector. In race mode with GOMAXPROCS > 1,
|
||||
// this test should fail often if Collect is not
|
||||
// concurrent-safe.
|
||||
for i := 0; i < 4; i++ {
|
||||
go func() {
|
||||
ch := make(chan Metric)
|
||||
go func() {
|
||||
// Drain all metrics recieved until the
|
||||
// channel is closed.
|
||||
for range ch {
|
||||
}
|
||||
}()
|
||||
c.Collect(ch)
|
||||
close(ch)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue