Merge pull request #370 from prometheus/beorn7/performance

Create goroutines adaptively during metrics gathering
This commit is contained in:
Björn Rabenstein 2018-01-31 15:28:26 +01:00 committed by GitHub
commit f4fb1b73fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 167 additions and 122 deletions

View File

@ -18,6 +18,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"runtime"
"sort" "sort"
"sync" "sync"
"unicode/utf8" "unicode/utf8"
@ -202,6 +203,13 @@ func (errs MultiError) Error() string {
return buf.String() return buf.String()
} }
// Append appends the provided error if it is not nil.
func (errs *MultiError) Append(err error) {
if err != nil {
*errs = append(*errs, err)
}
}
// MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only // MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only
// contained error as error if len(errs is 1). In all other cases, it returns // contained error as error if len(errs is 1). In all other cases, it returns
// the MultiError directly. This is helpful for returning a MultiError in a way // the MultiError directly. This is helpful for returning a MultiError in a way
@ -368,22 +376,12 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
) )
r.mtx.RLock() r.mtx.RLock()
goroutineBudget := len(r.collectorsByID)
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
collectors := make(chan Collector, len(r.collectorsByID))
// Scatter.
// (Collectors could be complex and slow, so we call them all at once.)
wg.Add(len(r.collectorsByID))
go func() {
wg.Wait()
close(metricChan)
}()
for _, collector := range r.collectorsByID { for _, collector := range r.collectorsByID {
go func(collector Collector) { collectors <- collector
defer wg.Done()
collector.Collect(metricChan)
}(collector)
} }
// In case pedantic checks are enabled, we have to copy the map before // In case pedantic checks are enabled, we have to copy the map before
// giving up the RLock. // giving up the RLock.
if r.pedanticChecksEnabled { if r.pedanticChecksEnabled {
@ -392,78 +390,132 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
registeredDescIDs[id] = struct{}{} registeredDescIDs[id] = struct{}{}
} }
} }
r.mtx.RUnlock() r.mtx.RUnlock()
wg.Add(goroutineBudget)
collectWorker := func() {
for {
select {
case collector := <-collectors:
collector.Collect(metricChan)
wg.Done()
default:
return
}
}
}
// Start the first worker now to make sure at least one is running.
go collectWorker()
goroutineBudget--
// Close the metricChan once all collectors are collected.
go func() {
wg.Wait()
close(metricChan)
}()
// Drain metricChan in case of premature return. // Drain metricChan in case of premature return.
defer func() { defer func() {
for range metricChan { for range metricChan {
} }
}() }()
// Gather. collectLoop:
for {
select {
case metric, ok := <-metricChan:
if !ok {
// metricChan is closed, we are done.
break collectLoop
}
errs.Append(processMetric(
metric, metricFamiliesByName,
metricHashes, dimHashes,
registeredDescIDs,
))
default:
if goroutineBudget <= 0 || len(collectors) == 0 {
// All collectors are aleady being worked on or
// we have already as many goroutines started as
// there are collectors. Just process metrics
// from now on.
for metric := range metricChan { for metric := range metricChan {
// This could be done concurrently, too, but it required locking errs.Append(processMetric(
// of metricFamiliesByName (and of metricHashes if checks are metric, metricFamiliesByName,
// enabled). Most likely not worth it. metricHashes, dimHashes,
registeredDescIDs,
))
}
break collectLoop
}
// Start more workers.
go collectWorker()
goroutineBudget--
runtime.Gosched()
}
}
return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
}
// processMetric is an internal helper method only used by the Gather method.
func processMetric(
metric Metric,
metricFamiliesByName map[string]*dto.MetricFamily,
metricHashes map[uint64]struct{},
dimHashes map[string]uint64,
registeredDescIDs map[uint64]struct{},
) error {
desc := metric.Desc() desc := metric.Desc()
dtoMetric := &dto.Metric{} dtoMetric := &dto.Metric{}
if err := metric.Write(dtoMetric); err != nil { if err := metric.Write(dtoMetric); err != nil {
errs = append(errs, fmt.Errorf( return fmt.Errorf("error collecting metric %v: %s", desc, err)
"error collecting metric %v: %s", desc, err,
))
continue
} }
metricFamily, ok := metricFamiliesByName[desc.fqName] metricFamily, ok := metricFamiliesByName[desc.fqName]
if ok { if ok {
if metricFamily.GetHelp() != desc.help { if metricFamily.GetHelp() != desc.help {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s has help %q but should have %q", "collected metric %s %s has help %q but should have %q",
desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(), desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
)) )
continue
} }
// TODO(beorn7): Simplify switch once Desc has type. // TODO(beorn7): Simplify switch once Desc has type.
switch metricFamily.GetType() { switch metricFamily.GetType() {
case dto.MetricType_COUNTER: case dto.MetricType_COUNTER:
if dtoMetric.Counter == nil { if dtoMetric.Counter == nil {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s should be a Counter", "collected metric %s %s should be a Counter",
desc.fqName, dtoMetric, desc.fqName, dtoMetric,
)) )
continue
} }
case dto.MetricType_GAUGE: case dto.MetricType_GAUGE:
if dtoMetric.Gauge == nil { if dtoMetric.Gauge == nil {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s should be a Gauge", "collected metric %s %s should be a Gauge",
desc.fqName, dtoMetric, desc.fqName, dtoMetric,
)) )
continue
} }
case dto.MetricType_SUMMARY: case dto.MetricType_SUMMARY:
if dtoMetric.Summary == nil { if dtoMetric.Summary == nil {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s should be a Summary", "collected metric %s %s should be a Summary",
desc.fqName, dtoMetric, desc.fqName, dtoMetric,
)) )
continue
} }
case dto.MetricType_UNTYPED: case dto.MetricType_UNTYPED:
if dtoMetric.Untyped == nil { if dtoMetric.Untyped == nil {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s should be Untyped", "collected metric %s %s should be Untyped",
desc.fqName, dtoMetric, desc.fqName, dtoMetric,
)) )
continue
} }
case dto.MetricType_HISTOGRAM: case dto.MetricType_HISTOGRAM:
if dtoMetric.Histogram == nil { if dtoMetric.Histogram == nil {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s should be a Histogram", "collected metric %s %s should be a Histogram",
desc.fqName, dtoMetric, desc.fqName, dtoMetric,
)) )
continue
} }
default: default:
panic("encountered MetricFamily with invalid type") panic("encountered MetricFamily with invalid type")
@ -485,34 +537,27 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
case dtoMetric.Histogram != nil: case dtoMetric.Histogram != nil:
metricFamily.Type = dto.MetricType_HISTOGRAM.Enum() metricFamily.Type = dto.MetricType_HISTOGRAM.Enum()
default: default:
errs = append(errs, fmt.Errorf( return fmt.Errorf("empty metric collected: %s", dtoMetric)
"empty metric collected: %s", dtoMetric,
))
continue
} }
metricFamiliesByName[desc.fqName] = metricFamily metricFamiliesByName[desc.fqName] = metricFamily
} }
if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes, dimHashes); err != nil { if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes, dimHashes); err != nil {
errs = append(errs, err) return err
continue
} }
if r.pedanticChecksEnabled { if registeredDescIDs != nil {
// Is the desc registered at all? // Is the desc registered at all?
if _, exist := registeredDescIDs[desc.id]; !exist { if _, exist := registeredDescIDs[desc.id]; !exist {
errs = append(errs, fmt.Errorf( return fmt.Errorf(
"collected metric %s %s with unregistered descriptor %s", "collected metric %s %s with unregistered descriptor %s",
metricFamily.GetName(), dtoMetric, desc, metricFamily.GetName(), dtoMetric, desc,
)) )
continue
} }
if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil { if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil {
errs = append(errs, err) return err
continue
} }
} }
metricFamily.Metric = append(metricFamily.Metric, dtoMetric) metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
} return nil
return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
} }
// Gatherers is a slice of Gatherer instances that implements the Gatherer // Gatherers is a slice of Gatherer instances that implements the Gatherer