Create goroutines adaptively during metrics gathering

This commit is contained in:
beorn7 2018-01-26 19:58:07 +01:00
parent 06bc6e01f4
commit e04451f4be
1 changed files with 161 additions and 122 deletions

View File

@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sort"
"sync"
"unicode/utf8"
@ -202,6 +203,13 @@ func (errs MultiError) Error() string {
return buf.String()
}
// Append appends the provided error if it is not nil.
func (errs *MultiError) Append(err error) {
if err != nil {
*errs = append(*errs, err)
}
}
// MaybeUnwrap returns nil if len(errs) is 0. It returns the first and only
// contained error as error if len(errs is 1). In all other cases, it returns
// the MultiError directly. This is helpful for returning a MultiError in a way
@ -368,22 +376,12 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
)
r.mtx.RLock()
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
// Scatter.
// (Collectors could be complex and slow, so we call them all at once.)
wg.Add(len(r.collectorsByID))
go func() {
wg.Wait()
close(metricChan)
}()
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
collectors := make(chan Collector, len(r.collectorsByID))
for _, collector := range r.collectorsByID {
go func(collector Collector) {
defer wg.Done()
collector.Collect(metricChan)
}(collector)
collectors <- collector
}
// In case pedantic checks are enabled, we have to copy the map before
// giving up the RLock.
if r.pedanticChecksEnabled {
@ -392,78 +390,126 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
registeredDescIDs[id] = struct{}{}
}
}
r.mtx.RUnlock()
collectWorker := func() {
for {
select {
case collector := <-collectors:
collector.Collect(metricChan)
wg.Done()
default:
return
}
}
}
// Start the first worker now to make sure at least is running.
go collectWorker()
// Close the metricChan once all collectors are collected.
go func() {
wg.Wait()
close(metricChan)
}()
// Drain metricChan in case of premature return.
defer func() {
for range metricChan {
}
}()
// Gather.
collectLoop:
for {
select {
case metric, ok := <-metricChan:
if !ok {
// metricChan is closed, we are done.
break collectLoop
}
errs.Append(processMetric(
metric, metricFamiliesByName,
metricHashes, dimHashes,
registeredDescIDs,
))
default:
if len(collectors) == 0 {
// All collectors are being worked on. Just
// process metrics from now on.
for metric := range metricChan {
// This could be done concurrently, too, but it required locking
// of metricFamiliesByName (and of metricHashes if checks are
// enabled). Most likely not worth it.
errs.Append(processMetric(
metric, metricFamiliesByName,
metricHashes, dimHashes,
registeredDescIDs,
))
}
break collectLoop
}
// Start more workers.
go collectWorker()
runtime.Gosched()
}
}
return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
}
// processMetric is an internal helper method only used by the Gather method.
func processMetric(
metric Metric,
metricFamiliesByName map[string]*dto.MetricFamily,
metricHashes map[uint64]struct{},
dimHashes map[string]uint64,
registeredDescIDs map[uint64]struct{},
) error {
desc := metric.Desc()
dtoMetric := &dto.Metric{}
if err := metric.Write(dtoMetric); err != nil {
errs = append(errs, fmt.Errorf(
"error collecting metric %v: %s", desc, err,
))
continue
return fmt.Errorf("error collecting metric %v: %s", desc, err)
}
metricFamily, ok := metricFamiliesByName[desc.fqName]
if ok {
if metricFamily.GetHelp() != desc.help {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s has help %q but should have %q",
desc.fqName, dtoMetric, desc.help, metricFamily.GetHelp(),
))
continue
)
}
// TODO(beorn7): Simplify switch once Desc has type.
switch metricFamily.GetType() {
case dto.MetricType_COUNTER:
if dtoMetric.Counter == nil {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s should be a Counter",
desc.fqName, dtoMetric,
))
continue
)
}
case dto.MetricType_GAUGE:
if dtoMetric.Gauge == nil {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s should be a Gauge",
desc.fqName, dtoMetric,
))
continue
)
}
case dto.MetricType_SUMMARY:
if dtoMetric.Summary == nil {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s should be a Summary",
desc.fqName, dtoMetric,
))
continue
)
}
case dto.MetricType_UNTYPED:
if dtoMetric.Untyped == nil {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s should be Untyped",
desc.fqName, dtoMetric,
))
continue
)
}
case dto.MetricType_HISTOGRAM:
if dtoMetric.Histogram == nil {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s should be a Histogram",
desc.fqName, dtoMetric,
))
continue
)
}
default:
panic("encountered MetricFamily with invalid type")
@ -485,34 +531,27 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
case dtoMetric.Histogram != nil:
metricFamily.Type = dto.MetricType_HISTOGRAM.Enum()
default:
errs = append(errs, fmt.Errorf(
"empty metric collected: %s", dtoMetric,
))
continue
return fmt.Errorf("empty metric collected: %s", dtoMetric)
}
metricFamiliesByName[desc.fqName] = metricFamily
}
if err := checkMetricConsistency(metricFamily, dtoMetric, metricHashes, dimHashes); err != nil {
errs = append(errs, err)
continue
return err
}
if r.pedanticChecksEnabled {
if registeredDescIDs != nil {
// Is the desc registered at all?
if _, exist := registeredDescIDs[desc.id]; !exist {
errs = append(errs, fmt.Errorf(
return fmt.Errorf(
"collected metric %s %s with unregistered descriptor %s",
metricFamily.GetName(), dtoMetric, desc,
))
continue
)
}
if err := checkDescConsistency(metricFamily, dtoMetric, desc); err != nil {
errs = append(errs, err)
continue
return err
}
}
metricFamily.Metric = append(metricFamily.Metric, dtoMetric)
}
return normalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
return nil
}
// Gatherers is a slice of Gatherer instances that implements the Gatherer