Use simpler locking in the Go 1.17 collector (#975)
A previous PR made it so that the Go 1.17 collector locked only around uses of rmSampleBuf, but really that means that Metric values may be sent over the channel containing some values from future metrics.Read calls. While generally-speaking this isn't a problem, we lose any consistency guarantees provided by the runtime/metrics package. Also, that optimization to not just lock around all of Collect was premature. Truthfully, Collect is called relatively infrequently, and its critical path is fairly fast (10s of µs). To prove it, this change also adds a benchmark. name old time/op new time/op delta GoCollector-16 43.7µs ± 2% 43.2µs ± 2% ~ (p=0.190 n=9+9) Note that because the benchmark is single-threaded it actually looks like it might be getting *slightly* faster, because all those Collect calls for the Metrics are direct calls instead of interface calls. Signed-off-by: Michael Anthony Knyszek <mknyszek@google.com>
This commit is contained in:
parent
f63e219e6b
commit
85206714ae
|
@ -31,8 +31,11 @@ import (
|
||||||
type goCollector struct {
|
type goCollector struct {
|
||||||
base baseGoCollector
|
base baseGoCollector
|
||||||
|
|
||||||
|
// mu protects updates to all fields ensuring a consistent
|
||||||
|
// snapshot is always produced by Collect.
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
// rm... fields all pertain to the runtime/metrics package.
|
// rm... fields all pertain to the runtime/metrics package.
|
||||||
rmSampleMu sync.Mutex
|
|
||||||
rmSampleBuf []metrics.Sample
|
rmSampleBuf []metrics.Sample
|
||||||
rmSampleMap map[string]*metrics.Sample
|
rmSampleMap map[string]*metrics.Sample
|
||||||
rmMetrics []collectorMetric
|
rmMetrics []collectorMetric
|
||||||
|
@ -135,10 +138,16 @@ func (c *goCollector) Collect(ch chan<- Metric) {
|
||||||
// rmSampleBuf. Just read into rmSampleBuf but write all the data
|
// rmSampleBuf. Just read into rmSampleBuf but write all the data
|
||||||
// we get into our Metrics or MemStats.
|
// we get into our Metrics or MemStats.
|
||||||
//
|
//
|
||||||
// Note that we cannot simply read and then clone rmSampleBuf
|
// This lock also ensures that the Metrics we send out are all from
|
||||||
// because we'd need to perform a deep clone of it, which is likely
|
// the same updates, ensuring their mutual consistency insofar as
|
||||||
// not worth it.
|
// is guaranteed by the runtime/metrics package.
|
||||||
c.rmSampleMu.Lock()
|
//
|
||||||
|
// N.B. This locking is heavy-handed, but Collect is expected to be called
|
||||||
|
// relatively infrequently. Also the core operation here, metrics.Read,
|
||||||
|
// is fast (O(tens of microseconds)) so contention should certainly be
|
||||||
|
// low, though channel operations and any allocations may add to that.
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
// Populate runtime/metrics sample buffer.
|
// Populate runtime/metrics sample buffer.
|
||||||
metrics.Read(c.rmSampleBuf)
|
metrics.Read(c.rmSampleBuf)
|
||||||
|
@ -157,10 +166,13 @@ func (c *goCollector) Collect(ch chan<- Metric) {
|
||||||
if v1 > v0 {
|
if v1 > v0 {
|
||||||
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
|
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
|
||||||
}
|
}
|
||||||
|
m.Collect(ch)
|
||||||
case *gauge:
|
case *gauge:
|
||||||
m.Set(unwrapScalarRMValue(sample.Value))
|
m.Set(unwrapScalarRMValue(sample.Value))
|
||||||
|
m.Collect(ch)
|
||||||
case *batchHistogram:
|
case *batchHistogram:
|
||||||
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
|
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
|
||||||
|
m.Collect(ch)
|
||||||
default:
|
default:
|
||||||
panic("unexpected metric type")
|
panic("unexpected metric type")
|
||||||
}
|
}
|
||||||
|
@ -169,17 +181,6 @@ func (c *goCollector) Collect(ch chan<- Metric) {
|
||||||
// populate the old metrics from it.
|
// populate the old metrics from it.
|
||||||
var ms runtime.MemStats
|
var ms runtime.MemStats
|
||||||
memStatsFromRM(&ms, c.rmSampleMap)
|
memStatsFromRM(&ms, c.rmSampleMap)
|
||||||
|
|
||||||
c.rmSampleMu.Unlock()
|
|
||||||
|
|
||||||
// Export all the metrics to ch.
|
|
||||||
// At this point we must not access rmSampleBuf or rmSampleMap, because
|
|
||||||
// a concurrent caller could use it. It's safe to Collect all our Metrics,
|
|
||||||
// however, because they're updated in a thread-safe way while MemStats
|
|
||||||
// is local to this call of Collect.
|
|
||||||
for _, m := range c.rmMetrics {
|
|
||||||
m.Collect(ch)
|
|
||||||
}
|
|
||||||
for _, i := range c.msMetrics {
|
for _, i := range c.msMetrics {
|
||||||
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
|
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,7 +292,7 @@ func TestGoCollectorConcurrency(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
ch := make(chan Metric)
|
ch := make(chan Metric)
|
||||||
go func() {
|
go func() {
|
||||||
// Drain all metrics recieved until the
|
// Drain all metrics received until the
|
||||||
// channel is closed.
|
// channel is closed.
|
||||||
for range ch {
|
for range ch {
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,3 +154,20 @@ func TestGoCollectorGC(t *testing.T) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkGoCollector(b *testing.B) {
|
||||||
|
c := NewGoCollector().(*goCollector)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ch := make(chan Metric, 8)
|
||||||
|
go func() {
|
||||||
|
// Drain all metrics received until the
|
||||||
|
// channel is closed.
|
||||||
|
for range ch {
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
c.Collect(ch)
|
||||||
|
close(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue