From e04451f4be0c5c82c61948e14410a2cf45aad949 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 26 Jan 2018 19:58:07 +0100 Subject: [PATCH 1/2] Create goroutines adaptively during metrics gathering --- prometheus/registry.go | 283 +++++++++++++++++++++++------------------ 1 file changed, 161 insertions(+), 122 deletions(-) diff --git a/prometheus/registry.go b/prometheus/registry.go index c84a442..2520e80 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "os" + "runtime" "sort" "sync" "unicode/utf8" @@ -202,6 +203,13 @@ func (errs MultiError) Error() string { return buf.String() } +// Append appends the provided error if it is not nil. +func (errs *MultiError) Append(err error) { + if err != nil { + *errs = append(*errs, err) + } +} + // MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only // contained error as error if len(errs is 1). In all other cases, it returns // the MultiError directly. This is helpful for returning a MultiError in a way @@ -368,22 +376,12 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { ) r.mtx.RLock() - metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) - - // Scatter. - // (Collectors could be complex and slow, so we call them all at once.) wg.Add(len(r.collectorsByID)) - go func() { - wg.Wait() - close(metricChan) - }() + metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) + collectors := make(chan Collector, len(r.collectorsByID)) for _, collector := range r.collectorsByID { - go func(collector Collector) { - defer wg.Done() - collector.Collect(metricChan) - }(collector) + collectors <- collector } - // In case pedantic checks are enabled, we have to copy the map before // giving up the RLock. if r.pedanticChecksEnabled { @@ -392,129 +390,170 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { registeredDescIDs[id] = struct{}{} } } - r.mtx.RUnlock() + collectWorker := func() { + for { + select { + case collector := <-collectors: + collector.Collect(metricChan) + wg.Done() + default: + return + } + } + } + + // Start the first worker now to make sure at least is running. + go collectWorker() + + // Close the metricChan once all collectors are collected. + go func() { + wg.Wait() + close(metricChan) + }() + // Drain metricChan in case of premature return. defer func() { for range metricChan { } }() - // Gather. - for metric := range metricChan { - // This could be done concurrently, too, but it required locking - // of metricFamiliesByName (and of metricHashes if checks are - // enabled). Most likely not worth it. - desc := metric.Desc() - dtoMetric := &dto.Metric{} - if err := metric.Write(dtoMetric); err != nil { - errs = append(errs, fmt.Errorf( - "error collecting metric %v: %s", desc, err, +collectLoop: + for { + select { + case metric, ok := <-metricChan: + if !ok { + // metricChan is closed, we are done. + break collectLoop + } + errs.Append(processMetric( + metric, metricFamiliesByName, + metricHashes, dimHashes, + registeredDescIDs, )) - continue + default: + if len(collectors) == 0 { + // All collectors are being worked on. Just + // process metrics from now on. + for metric := range metricChan { + errs.Append(processMetric( + metric, metricFamiliesByName, + metricHashes, dimHashes, + registeredDescIDs, + )) + } + break collectLoop + } + // Start more workers. + go collectWorker() + runtime.Gosched() } - metricFamily, ok := metricFamiliesByName[desc.fqName] - if ok { - if metricFamily.GetHelp() != desc.help { - errs = append(errs, fmt.Errorf( - "collected metric %s %s has help %q but should have %q", - desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(), - )) - continue - } - // TODO(beorn7): Simplify switch once Desc has type. - switch metricFamily.GetType() { - case dto.MetricType_COUNTER: - if dtoMetric.Counter == nil { - errs = append(errs, fmt.Errorf( - "collected metric %s %s should be a Counter", - desc.fqName, dtoMetric, - )) - continue - } - case dto.MetricType_GAUGE: - if dtoMetric.Gauge == nil { - errs = append(errs, fmt.Errorf( - "collected metric %s %s should be a Gauge", - desc.fqName, dtoMetric, - )) - continue - } - case dto.MetricType_SUMMARY: - if dtoMetric.Summary == nil { - errs = append(errs, fmt.Errorf( - "collected metric %s %s should be a Summary", - desc.fqName, dtoMetric, - )) - continue - } - case dto.MetricType_UNTYPED: - if dtoMetric.Untyped == nil { - errs = append(errs, fmt.Errorf( - "collected metric %s %s should be Untyped", - desc.fqName, dtoMetric, - )) - continue - } - case dto.MetricType_HISTOGRAM: - if dtoMetric.Histogram == nil { - errs = append(errs, fmt.Errorf( - "collected metric %s %s should be a Histogram", - desc.fqName, dtoMetric, - )) - continue - } - default: - panic("encountered MetricFamily with invalid type") - } - } else { - metricFamily = &dto.MetricFamily{} - metricFamily.Name = proto.String(desc.fqName) - metricFamily.Help = proto.String(desc.help) - // TODO(beorn7): Simplify switch once Desc has type. - switch { - case dtoMetric.Gauge != nil: - metricFamily.Type = dto.MetricType_GAUGE.Enum() - case dtoMetric.Counter != nil: - metricFamily.Type = dto.MetricType_COUNTER.Enum() - case dtoMetric.Summary != nil: - metricFamily.Type = dto.MetricType_SUMMARY.Enum() - case dtoMetric.Untyped != nil: - metricFamily.Type = dto.MetricType_UNTYPED.Enum() - case dtoMetric.Histogram != nil: - metricFamily.Type = dto.MetricType_HISTOGRAM.Enum() - default: - errs = append(errs, fmt.Errorf( - "empty metric collected: %s", dtoMetric, - )) - continue - } - metricFamiliesByName[desc.fqName] = metricFamily - } - if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes, dimHashes); err != nil { - errs = append(errs, err) - continue - } - if r.pedanticChecksEnabled { - // Is the desc registered at all? - if _, exist := registeredDescIDs[desc.id]; !exist { - errs = append(errs, fmt.Errorf( - "collected metric %s %s with unregistered descriptor %s", - metricFamily.GetName(), dtoMetric, desc, - )) - continue - } - if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil { - errs = append(errs, err) - continue - } - } - metricFamily.Metric = append(metricFamily.Metric, dtoMetric) } return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap() } +// processMetric is an internal helper method only used by the Gather method. +func processMetric( + metric Metric, + metricFamiliesByName map[string]*dto.MetricFamily, + metricHashes map[uint64]struct{}, + dimHashes map[string]uint64, + registeredDescIDs map[uint64]struct{}, +) error { + desc := metric.Desc() + dtoMetric := &dto.Metric{} + if err := metric.Write(dtoMetric); err != nil { + return fmt.Errorf("error collecting metric %v: %s", desc, err) + } + metricFamily, ok := metricFamiliesByName[desc.fqName] + if ok { + if metricFamily.GetHelp() != desc.help { + return fmt.Errorf( + "collected metric %s %s has help %q but should have %q", + desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(), + ) + } + // TODO(beorn7): Simplify switch once Desc has type. + switch metricFamily.GetType() { + case dto.MetricType_COUNTER: + if dtoMetric.Counter == nil { + return fmt.Errorf( + "collected metric %s %s should be a Counter", + desc.fqName, dtoMetric, + ) + } + case dto.MetricType_GAUGE: + if dtoMetric.Gauge == nil { + return fmt.Errorf( + "collected metric %s %s should be a Gauge", + desc.fqName, dtoMetric, + ) + } + case dto.MetricType_SUMMARY: + if dtoMetric.Summary == nil { + return fmt.Errorf( + "collected metric %s %s should be a Summary", + desc.fqName, dtoMetric, + ) + } + case dto.MetricType_UNTYPED: + if dtoMetric.Untyped == nil { + return fmt.Errorf( + "collected metric %s %s should be Untyped", + desc.fqName, dtoMetric, + ) + } + case dto.MetricType_HISTOGRAM: + if dtoMetric.Histogram == nil { + return fmt.Errorf( + "collected metric %s %s should be a Histogram", + desc.fqName, dtoMetric, + ) + } + default: + panic("encountered MetricFamily with invalid type") + } + } else { + metricFamily = &dto.MetricFamily{} + metricFamily.Name = proto.String(desc.fqName) + metricFamily.Help = proto.String(desc.help) + // TODO(beorn7): Simplify switch once Desc has type. + switch { + case dtoMetric.Gauge != nil: + metricFamily.Type = dto.MetricType_GAUGE.Enum() + case dtoMetric.Counter != nil: + metricFamily.Type = dto.MetricType_COUNTER.Enum() + case dtoMetric.Summary != nil: + metricFamily.Type = dto.MetricType_SUMMARY.Enum() + case dtoMetric.Untyped != nil: + metricFamily.Type = dto.MetricType_UNTYPED.Enum() + case dtoMetric.Histogram != nil: + metricFamily.Type = dto.MetricType_HISTOGRAM.Enum() + default: + return fmt.Errorf("empty metric collected: %s", dtoMetric) + } + metricFamiliesByName[desc.fqName] = metricFamily + } + if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes, dimHashes); err != nil { + return err + } + if registeredDescIDs != nil { + // Is the desc registered at all? + if _, exist := registeredDescIDs[desc.id]; !exist { + return fmt.Errorf( + "collected metric %s %s with unregistered descriptor %s", + metricFamily.GetName(), dtoMetric, desc, + ) + } + if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil { + return err + } + } + metricFamily.Metric = append(metricFamily.Metric, dtoMetric) + return nil +} + // Gatherers is a slice of Gatherer instances that implements the Gatherer // interface itself. Its Gather method calls Gather on all Gatherers in the // slice in order and returns the merged results. Errors returned from the From 4957f7bba4af2daf22a88d2d2ea282f51748c41e Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 30 Jan 2018 15:23:23 +0100 Subject: [PATCH 2/2] Add a safety goroutine budget This makes sure we don't spin up a possibly infinite number of goroutines in `Gather`, which could theoretically happen with unlucky scheduling. --- prometheus/registry.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/prometheus/registry.go b/prometheus/registry.go index 2520e80..73dde7b 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -376,7 +376,7 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { ) r.mtx.RLock() - wg.Add(len(r.collectorsByID)) + goroutineBudget := len(r.collectorsByID) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) collectors := make(chan Collector, len(r.collectorsByID)) for _, collector := range r.collectorsByID { @@ -392,6 +392,8 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { } r.mtx.RUnlock() + wg.Add(goroutineBudget) + collectWorker := func() { for { select { @@ -404,8 +406,9 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { } } - // Start the first worker now to make sure at least is running. + // Start the first worker now to make sure at least one is running. go collectWorker() + goroutineBudget-- // Close the metricChan once all collectors are collected. go func() { @@ -433,9 +436,11 @@ collectLoop: registeredDescIDs, )) default: - if len(collectors) == 0 { - // All collectors are being worked on. Just - // process metrics from now on. + if goroutineBudget <= 0 || len(collectors) == 0 { + // All collectors are aleady being worked on or + // we have already as many goroutines started as + // there are collectors. Just process metrics + // from now on. for metric := range metricChan { errs.Append(processMetric( metric, metricFamiliesByName, @@ -447,6 +452,7 @@ collectLoop: } // Start more workers. go collectWorker() + goroutineBudget-- runtime.Gosched() } }