forked from mirror/client_golang
Merge pull request #568 from prometheus/beorn7/go-collector
Return previous memstats if Go collector needs longer than 1s
This commit is contained in:
commit
2d3b0fe0e0
|
@ -14,9 +14,9 @@
|
||||||
package prometheus
|
package prometheus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,16 +26,41 @@ type goCollector struct {
|
||||||
gcDesc *Desc
|
gcDesc *Desc
|
||||||
goInfoDesc *Desc
|
goInfoDesc *Desc
|
||||||
|
|
||||||
// metrics to describe and collect
|
// ms... are memstats related.
|
||||||
metrics memStatsMetrics
|
msLast *runtime.MemStats // Previously collected memstats.
|
||||||
|
msLastTimestamp time.Time
|
||||||
|
msMtx sync.Mutex // Protects msLast and msLastTimestamp.
|
||||||
|
msMetrics memStatsMetrics
|
||||||
|
msRead func(*runtime.MemStats) // For mocking in tests.
|
||||||
|
msMaxWait time.Duration // Wait time for fresh memstats.
|
||||||
|
msMaxAge time.Duration // Maximum allowed age of old memstats.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGoCollector returns a collector which exports metrics about the current Go
|
// NewGoCollector returns a collector which exports metrics about the current Go
|
||||||
// process. This includes memory stats. To collect those, runtime.ReadMemStats
|
// process. This includes memory stats. To collect those, runtime.ReadMemStats
|
||||||
// is called. This causes a stop-the-world, which is very short with Go1.9+
|
// is called. This requires to “stop the world”, which usually only happens for
|
||||||
// (~25µs). However, with older Go versions, the stop-the-world duration depends
|
// garbage collection (GC). Take the following implications into account when
|
||||||
// on the heap size and can be quite significant (~1.7 ms/GiB as per
|
// deciding whether to use the Go collector:
|
||||||
|
//
|
||||||
|
// 1. The performance impact of stopping the world is the more relevant the more
|
||||||
|
// frequently metrics are collected. However, with Go1.9 or later the
|
||||||
|
// stop-the-world time per metrics collection is very short (~25µs) so that the
|
||||||
|
// performance impact will only matter in rare cases. However, with older Go
|
||||||
|
// versions, the stop-the-world duration depends on the heap size and can be
|
||||||
|
// quite significant (~1.7 ms/GiB as per
|
||||||
// https://go-review.googlesource.com/c/go/+/34937).
|
// https://go-review.googlesource.com/c/go/+/34937).
|
||||||
|
//
|
||||||
|
// 2. During an ongoing GC, nothing else can stop the world. Therefore, if the
|
||||||
|
// metrics collection happens to coincide with GC, it will only complete after
|
||||||
|
// GC has finished. Usually, GC is fast enough to not cause problems. However,
|
||||||
|
// with a very large heap, GC might take multiple seconds, which is enough to
|
||||||
|
// cause scrape timeouts in common setups. To avoid this problem, the Go
|
||||||
|
// collector will use the memstats from a previous collection if
|
||||||
|
// runtime.ReadMemStats takes more than 1s. However, if there are no previously
|
||||||
|
// collected memstats, or their collection is more than 5m ago, the collection
|
||||||
|
// will block until runtime.ReadMemStats succeeds. (The problem might be solved
|
||||||
|
// in Go1.13, see https://github.com/golang/go/issues/19812 for the related Go
|
||||||
|
// issue.)
|
||||||
func NewGoCollector() Collector {
|
func NewGoCollector() Collector {
|
||||||
return &goCollector{
|
return &goCollector{
|
||||||
goroutinesDesc: NewDesc(
|
goroutinesDesc: NewDesc(
|
||||||
|
@ -54,7 +79,11 @@ func NewGoCollector() Collector {
|
||||||
"go_info",
|
"go_info",
|
||||||
"Information about the Go environment.",
|
"Information about the Go environment.",
|
||||||
nil, Labels{"version": runtime.Version()}),
|
nil, Labels{"version": runtime.Version()}),
|
||||||
metrics: memStatsMetrics{
|
msLast: &runtime.MemStats{},
|
||||||
|
msRead: runtime.ReadMemStats,
|
||||||
|
msMaxWait: time.Second,
|
||||||
|
msMaxAge: 5 * time.Minute,
|
||||||
|
msMetrics: memStatsMetrics{
|
||||||
{
|
{
|
||||||
desc: NewDesc(
|
desc: NewDesc(
|
||||||
memstatNamespace("alloc_bytes"),
|
memstatNamespace("alloc_bytes"),
|
||||||
|
@ -253,7 +282,7 @@ func NewGoCollector() Collector {
|
||||||
}
|
}
|
||||||
|
|
||||||
func memstatNamespace(s string) string {
|
func memstatNamespace(s string) string {
|
||||||
return fmt.Sprintf("go_memstats_%s", s)
|
return "go_memstats_" + s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Describe returns all descriptions of the collector.
|
// Describe returns all descriptions of the collector.
|
||||||
|
@ -262,13 +291,27 @@ func (c *goCollector) Describe(ch chan<- *Desc) {
|
||||||
ch <- c.threadsDesc
|
ch <- c.threadsDesc
|
||||||
ch <- c.gcDesc
|
ch <- c.gcDesc
|
||||||
ch <- c.goInfoDesc
|
ch <- c.goInfoDesc
|
||||||
for _, i := range c.metrics {
|
for _, i := range c.msMetrics {
|
||||||
ch <- i.desc
|
ch <- i.desc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect returns the current state of all metrics of the collector.
|
// Collect returns the current state of all metrics of the collector.
|
||||||
func (c *goCollector) Collect(ch chan<- Metric) {
|
func (c *goCollector) Collect(ch chan<- Metric) {
|
||||||
|
var (
|
||||||
|
ms = &runtime.MemStats{}
|
||||||
|
done = make(chan struct{})
|
||||||
|
)
|
||||||
|
// Start reading memstats first as it might take a while.
|
||||||
|
go func() {
|
||||||
|
c.msRead(ms)
|
||||||
|
c.msMtx.Lock()
|
||||||
|
c.msLast = ms
|
||||||
|
c.msLastTimestamp = time.Now()
|
||||||
|
c.msMtx.Unlock()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
ch <- MustNewConstMetric(c.goroutinesDesc, GaugeValue, float64(runtime.NumGoroutine()))
|
ch <- MustNewConstMetric(c.goroutinesDesc, GaugeValue, float64(runtime.NumGoroutine()))
|
||||||
n, _ := runtime.ThreadCreateProfile(nil)
|
n, _ := runtime.ThreadCreateProfile(nil)
|
||||||
ch <- MustNewConstMetric(c.threadsDesc, GaugeValue, float64(n))
|
ch <- MustNewConstMetric(c.threadsDesc, GaugeValue, float64(n))
|
||||||
|
@ -286,9 +329,31 @@ func (c *goCollector) Collect(ch chan<- Metric) {
|
||||||
|
|
||||||
ch <- MustNewConstMetric(c.goInfoDesc, GaugeValue, 1)
|
ch <- MustNewConstMetric(c.goInfoDesc, GaugeValue, 1)
|
||||||
|
|
||||||
ms := &runtime.MemStats{}
|
timer := time.NewTimer(c.msMaxWait)
|
||||||
runtime.ReadMemStats(ms)
|
select {
|
||||||
for _, i := range c.metrics {
|
case <-done: // Our own ReadMemStats succeeded in time. Use it.
|
||||||
|
timer.Stop() // Important for high collection frequencies to not pile up timers.
|
||||||
|
c.msCollect(ch, ms)
|
||||||
|
return
|
||||||
|
case <-timer.C: // Time out, use last memstats if possible. Continue below.
|
||||||
|
}
|
||||||
|
c.msMtx.Lock()
|
||||||
|
if time.Since(c.msLastTimestamp) < c.msMaxAge {
|
||||||
|
// Last memstats are recent enough. Collect from them under the lock.
|
||||||
|
c.msCollect(ch, c.msLast)
|
||||||
|
c.msMtx.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// If we are here, the last memstats are too old or don't exist. We have
|
||||||
|
// to wait until our own ReadMemStats finally completes. For that to
|
||||||
|
// happen, we have to release the lock.
|
||||||
|
c.msMtx.Unlock()
|
||||||
|
<-done
|
||||||
|
c.msCollect(ch, ms)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *goCollector) msCollect(ch chan<- Metric, ms *runtime.MemStats) {
|
||||||
|
for _, i := range c.msMetrics {
|
||||||
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(ms))
|
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(ms))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,28 +21,40 @@ import (
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGoCollector(t *testing.T) {
|
func TestGoCollectorGoroutines(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
c = NewGoCollector()
|
c = NewGoCollector()
|
||||||
ch = make(chan Metric)
|
metricCh = make(chan Metric)
|
||||||
waitc = make(chan struct{})
|
waitCh = make(chan struct{})
|
||||||
closec = make(chan struct{})
|
endGoroutineCh = make(chan struct{})
|
||||||
|
endCollectionCh = make(chan struct{})
|
||||||
old = -1
|
old = -1
|
||||||
)
|
)
|
||||||
defer close(closec)
|
defer func() {
|
||||||
|
close(endGoroutineCh)
|
||||||
|
// Drain the collect channel to prevent goroutine leak.
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-metricCh:
|
||||||
|
case <-endCollectionCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
c.Collect(ch)
|
c.Collect(metricCh)
|
||||||
go func(c <-chan struct{}) {
|
go func(c <-chan struct{}) {
|
||||||
<-c
|
<-c
|
||||||
}(closec)
|
}(endGoroutineCh)
|
||||||
<-waitc
|
<-waitCh
|
||||||
c.Collect(ch)
|
c.Collect(metricCh)
|
||||||
|
close(endCollectionCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case m := <-ch:
|
case m := <-metricCh:
|
||||||
// m can be Gauge or Counter,
|
// m can be Gauge or Counter,
|
||||||
// currently just test the go_goroutines Gauge
|
// currently just test the go_goroutines Gauge
|
||||||
// and ignore others.
|
// and ignore others.
|
||||||
|
@ -57,7 +69,7 @@ func TestGoCollector(t *testing.T) {
|
||||||
|
|
||||||
if old == -1 {
|
if old == -1 {
|
||||||
old = int(pb.GetGauge().GetValue())
|
old = int(pb.GetGauge().GetValue())
|
||||||
close(waitc)
|
close(waitCh)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,43 +77,47 @@ func TestGoCollector(t *testing.T) {
|
||||||
// TODO: This is flaky in highly concurrent situations.
|
// TODO: This is flaky in highly concurrent situations.
|
||||||
t.Errorf("want 1 new goroutine, got %d", diff)
|
t.Errorf("want 1 new goroutine, got %d", diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GoCollector performs three sends per call.
|
|
||||||
// On line 27 we need to receive three more sends
|
|
||||||
// to shut down cleanly.
|
|
||||||
<-ch
|
|
||||||
<-ch
|
|
||||||
<-ch
|
|
||||||
return
|
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
t.Fatalf("expected collect timed out")
|
t.Fatalf("expected collect timed out")
|
||||||
}
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGCCollector(t *testing.T) {
|
func TestGoCollectorGC(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
c = NewGoCollector()
|
c = NewGoCollector()
|
||||||
ch = make(chan Metric)
|
metricCh = make(chan Metric)
|
||||||
waitc = make(chan struct{})
|
waitCh = make(chan struct{})
|
||||||
closec = make(chan struct{})
|
endCollectionCh = make(chan struct{})
|
||||||
oldGC uint64
|
oldGC uint64
|
||||||
oldPause float64
|
oldPause float64
|
||||||
)
|
)
|
||||||
defer close(closec)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
c.Collect(ch)
|
c.Collect(metricCh)
|
||||||
// force GC
|
// force GC
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
<-waitc
|
<-waitCh
|
||||||
c.Collect(ch)
|
c.Collect(metricCh)
|
||||||
|
close(endCollectionCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// Drain the collect channel to prevent goroutine leak.
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-metricCh:
|
||||||
|
case <-endCollectionCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
first := true
|
first := true
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case metric := <-ch:
|
case metric := <-metricCh:
|
||||||
pb := &dto.Metric{}
|
pb := &dto.Metric{}
|
||||||
metric.Write(pb)
|
metric.Write(pb)
|
||||||
if pb.GetSummary() == nil {
|
if pb.GetSummary() == nil {
|
||||||
|
@ -119,7 +135,7 @@ func TestGCCollector(t *testing.T) {
|
||||||
first = false
|
first = false
|
||||||
oldGC = *pb.GetSummary().SampleCount
|
oldGC = *pb.GetSummary().SampleCount
|
||||||
oldPause = *pb.GetSummary().SampleSum
|
oldPause = *pb.GetSummary().SampleSum
|
||||||
close(waitc)
|
close(waitCh)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if diff := *pb.GetSummary().SampleCount - oldGC; diff != 1 {
|
if diff := *pb.GetSummary().SampleCount - oldGC; diff != 1 {
|
||||||
|
@ -128,9 +144,94 @@ func TestGCCollector(t *testing.T) {
|
||||||
if diff := *pb.GetSummary().SampleSum - oldPause; diff <= 0 {
|
if diff := *pb.GetSummary().SampleSum - oldPause; diff <= 0 {
|
||||||
t.Errorf("want moar pause, got %f", diff)
|
t.Errorf("want moar pause, got %f", diff)
|
||||||
}
|
}
|
||||||
return
|
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
t.Fatalf("expected collect timed out")
|
t.Fatalf("expected collect timed out")
|
||||||
}
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGoCollectorMemStats(t *testing.T) {
|
||||||
|
var (
|
||||||
|
c = NewGoCollector().(*goCollector)
|
||||||
|
got uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
checkCollect := func(want uint64) {
|
||||||
|
metricCh := make(chan Metric)
|
||||||
|
endCh := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
c.Collect(metricCh)
|
||||||
|
close(endCh)
|
||||||
|
}()
|
||||||
|
Collect:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case metric := <-metricCh:
|
||||||
|
if metric.Desc().fqName != "go_memstats_alloc_bytes" {
|
||||||
|
continue Collect
|
||||||
|
}
|
||||||
|
pb := &dto.Metric{}
|
||||||
|
metric.Write(pb)
|
||||||
|
got = uint64(pb.GetGauge().GetValue())
|
||||||
|
case <-endCh:
|
||||||
|
break Collect
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if want != got {
|
||||||
|
t.Errorf("unexpected value of go_memstats_alloc_bytes, want %d, got %d", want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Speed up the timing to make the tast faster.
|
||||||
|
c.msMaxWait = time.Millisecond
|
||||||
|
c.msMaxAge = 10 * time.Millisecond
|
||||||
|
|
||||||
|
// Scenario 1: msRead responds slowly, no previous memstats available,
|
||||||
|
// msRead is executed anyway.
|
||||||
|
c.msRead = func(ms *runtime.MemStats) {
|
||||||
|
time.Sleep(3 * time.Millisecond)
|
||||||
|
ms.Alloc = 1
|
||||||
|
}
|
||||||
|
checkCollect(1)
|
||||||
|
// Now msLast is set.
|
||||||
|
if want, got := uint64(1), c.msLast.Alloc; want != got {
|
||||||
|
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scenario 2: msRead responds fast, previous memstats available, new
|
||||||
|
// value collected.
|
||||||
|
c.msRead = func(ms *runtime.MemStats) {
|
||||||
|
ms.Alloc = 2
|
||||||
|
}
|
||||||
|
checkCollect(2)
|
||||||
|
// msLast is set, too.
|
||||||
|
if want, got := uint64(2), c.msLast.Alloc; want != got {
|
||||||
|
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scenario 3: msRead responds slowly, previous memstats available, old
|
||||||
|
// value collected.
|
||||||
|
c.msRead = func(ms *runtime.MemStats) {
|
||||||
|
time.Sleep(3 * time.Millisecond)
|
||||||
|
ms.Alloc = 3
|
||||||
|
}
|
||||||
|
checkCollect(2)
|
||||||
|
// After waiting, new value is still set in msLast.
|
||||||
|
time.Sleep(12 * time.Millisecond)
|
||||||
|
if want, got := uint64(3), c.msLast.Alloc; want != got {
|
||||||
|
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scenario 4: msRead responds slowly, previous memstats is too old, new
|
||||||
|
// value collected.
|
||||||
|
c.msRead = func(ms *runtime.MemStats) {
|
||||||
|
time.Sleep(3 * time.Millisecond)
|
||||||
|
ms.Alloc = 4
|
||||||
|
}
|
||||||
|
checkCollect(4)
|
||||||
|
if want, got := uint64(4), c.msLast.Alloc; want != got {
|
||||||
|
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue