Use simpler locking in the Go 1.17 collector (#975)

A previous PR made it so that the Go 1.17 collector locked only around
uses of rmSampleBuf, but really that means that Metric values may be
sent over the channel containing some values from future metrics.Read
calls. While generally-speaking this isn't a problem, we lose any
consistency guarantees provided by the runtime/metrics package.

Also, that optimization to not just lock around all of Collect was
premature. Truthfully, Collect is called relatively infrequently, and
its critical path is fairly fast (10s of µs). To prove it, this change
also adds a benchmark.

name            old time/op  new time/op  delta
GoCollector-16  43.7µs ± 2%  43.2µs ± 2%   ~     (p=0.190 n=9+9)

Note that because the benchmark is single-threaded it actually looks
like it might be getting *slightly* faster, because all those Collect
calls for the Metrics are direct calls instead of interface calls.

Signed-off-by: Michael Anthony Knyszek <mknyszek@google.com>
This commit is contained in:
Michael Knyszek 2022-01-25 02:43:45 -05:00 committed by Kemal Akkoyun
parent 772b89389c
commit d32edd6083
3 changed files with 35 additions and 17 deletions

View File

@ -31,8 +31,11 @@ import (
type goCollector struct { type goCollector struct {
base baseGoCollector base baseGoCollector
// mu protects updates to all fields ensuring a consistent
// snapshot is always produced by Collect.
mu sync.Mutex
// rm... fields all pertain to the runtime/metrics package. // rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample rmSampleMap map[string]*metrics.Sample
rmMetrics []collectorMetric rmMetrics []collectorMetric
@ -135,10 +138,16 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// rmSampleBuf. Just read into rmSampleBuf but write all the data // rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats. // we get into our Metrics or MemStats.
// //
// Note that we cannot simply read and then clone rmSampleBuf // This lock also ensures that the Metrics we send out are all from
// because we'd need to perform a deep clone of it, which is likely // the same updates, ensuring their mutual consistency insofar as
// not worth it. // is guaranteed by the runtime/metrics package.
c.rmSampleMu.Lock() //
// N.B. This locking is heavy-handed, but Collect is expected to be called
// relatively infrequently. Also the core operation here, metrics.Read,
// is fast (O(tens of microseconds)) so contention should certainly be
// low, though channel operations and any allocations may add to that.
c.mu.Lock()
defer c.mu.Unlock()
// Populate runtime/metrics sample buffer. // Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf) metrics.Read(c.rmSampleBuf)
@ -157,10 +166,13 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 { if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get()) m.Add(unwrapScalarRMValue(sample.Value) - m.get())
} }
m.Collect(ch)
case *gauge: case *gauge:
m.Set(unwrapScalarRMValue(sample.Value)) m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram: case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name)) m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default: default:
panic("unexpected metric type") panic("unexpected metric type")
} }
@ -169,17 +181,6 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// populate the old metrics from it. // populate the old metrics from it.
var ms runtime.MemStats var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap) memStatsFromRM(&ms, c.rmSampleMap)
c.rmSampleMu.Unlock()
// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics { for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms)) ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
} }

View File

@ -292,7 +292,7 @@ func TestGoCollectorConcurrency(t *testing.T) {
go func() { go func() {
ch := make(chan Metric) ch := make(chan Metric)
go func() { go func() {
// Drain all metrics recieved until the // Drain all metrics received until the
// channel is closed. // channel is closed.
for range ch { for range ch {
} }

View File

@ -154,3 +154,20 @@ func TestGoCollectorGC(t *testing.T) {
break break
} }
} }
func BenchmarkGoCollector(b *testing.B) {
c := NewGoCollector().(*goCollector)
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch := make(chan Metric, 8)
go func() {
// Drain all metrics received until the
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}
}