From a1c9be45cf9add3d346624ab04a28bf69a1e0efd Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Sat, 23 Oct 2021 13:58:26 +0200 Subject: [PATCH] Added cached collector. Signed-off-by: Bartlomiej Plotka update. Signed-off-by: Bartlomiej Plotka Attempt 2 Signed-off-by: Bartlomiej Plotka Added blocking registry, with raw collector and transactional handler. Signed-off-by: Bartlomiej Plotka Added fast path to normal (empty) registry to save 8 allocs and 3K5B per Gather. Signed-off-by: Bartlomiej Plotka Simplified API, added tests. Signed-off-by: Bartlomiej Plotka Fix. Signed-off-by: Bartlomiej Plotka Simplified implementation. Signed-off-by: Bartlomiej Plotka Added benchmark. Signed-off-by: Bartlomiej Plotka Optimized. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 261 ++++++++++++++++++++++++++++++ prometheus/cache/cache_test.go | 275 ++++++++++++++++++++++++++++++++ prometheus/desc.go | 4 +- prometheus/internal/metric.go | 16 ++ prometheus/metric.go | 16 -- prometheus/promhttp/http.go | 8 +- prometheus/registry.go | 101 +++++++++++- prometheus/registry_test.go | 80 ++++++++++ prometheus/testutil/testutil.go | 11 +- prometheus/value.go | 44 +++-- prometheus/wrap.go | 3 +- 11 files changed, 785 insertions(+), 34 deletions(-) create mode 100644 prometheus/cache/cache.go create mode 100644 prometheus/cache/cache_test.go diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go new file mode 100644 index 0000000..db0df82 --- /dev/null +++ b/prometheus/cache/cache.go @@ -0,0 +1,261 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + + //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. + "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" + dto "github.com/prometheus/client_model/go" +) + +var _ prometheus.TransactionalGatherer = &CachedTGatherer{} + +var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with xxhash. + +// CachedTGatherer is a transactional gatherer that allows maintaining a set of metrics which +// change less frequently than scrape time, yet label values and values change over time. +// +// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider +// using CachedTGatherer instead. +// +// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. +// NOTE(bwplotka): Experimental, API and behaviour can change. +type CachedTGatherer struct { + metrics map[uint64]*dto.Metric + metricFamilyByName map[string]*dto.MetricFamily + mMu sync.RWMutex +} + +func NewCachedTGatherer() *CachedTGatherer { + return &CachedTGatherer{ + metrics: make(map[uint64]*dto.Metric), + metricFamilyByName: map[string]*dto.MetricFamily{}, + } +} + +// Gather implements TransactionalGatherer interface. +func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + c.mMu.RLock() + + // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families + // this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now. + return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil +} + +type Key struct { + FQName string // __name__ + + // Label names can be unsorted, we will be sorting them later. The only implication is cachability if + // consumer provide non-deterministic order of those. + LabelNames []string + LabelValues []string +} + +func (k Key) isValid() error { + if k.FQName == "" { + return errors.New("FQName cannot be empty") + } + if len(k.LabelNames) != len(k.LabelValues) { + return errors.New("new metric: label name has different length than values") + } + + return nil +} + +// hash returns unique hash for this key. +func (k Key) hash() uint64 { + h := xxhash.New() + h.WriteString(k.FQName) + h.Write(separatorByteSlice) + + for i := range k.LabelNames { + h.WriteString(k.LabelNames[i]) + h.Write(separatorByteSlice) + h.WriteString(k.LabelValues[i]) + h.Write(separatorByteSlice) + } + return h.Sum64() +} + +// Insert represents record to set in cache. +type Insert struct { + Key + + Help string + ValueType prometheus.ValueType + Value float64 + + // Timestamp is optional. Pass nil for no explicit timestamp. + Timestamp *time.Time +} + +// Update goes through inserts and deletions and updates current cache in concurrency safe manner. +// If reset is set to true, all inserts and deletions are working on empty cache. In such case +// this implementation tries to reuse memory from existing cached item when possible. +// +// Update reuses insert struct memory, so after use, Insert slice and its elements cannot be reused +// outside of this method. +// TODO(bwplotka): Lack of copying can pose memory safety problems if insert variables are reused. Consider copying if value +// is different. Yet it gives significant allocation gains. +func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) error { + c.mMu.Lock() + defer c.mMu.Unlock() + + currMetrics := c.metrics + currMetricFamilies := c.metricFamilyByName + if reset { + currMetrics = make(map[uint64]*dto.Metric, len(c.metrics)) + currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)) + } + + errs := prometheus.MultiError{} + for i := range inserts { + // TODO(bwplotka): Validate more about this insert? + if err := inserts[i].isValid(); err != nil { + errs.Append(err) + continue + } + + // Update metric family. + mf, ok := c.metricFamilyByName[inserts[i].FQName] + if !ok { + mf = &dto.MetricFamily{} + mf.Name = &inserts[i].FQName + } else if reset { + // Reset metric slice, since we want to start from scratch. + mf.Metric = mf.Metric[:0] + } + mf.Type = inserts[i].ValueType.ToDTO() + mf.Help = &inserts[i].Help + + currMetricFamilies[inserts[i].FQName] = mf + + // Update metric pointer. + hSum := inserts[i].hash() + m, ok := c.metrics[hSum] + if !ok { + m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))} + for j := range inserts[i].LabelNames { + m.Label = append(m.Label, &dto.LabelPair{ + Name: &inserts[i].LabelNames[j], + Value: &inserts[i].LabelValues[j], + }) + } + sort.Sort(internal.LabelPairSorter(m.Label)) + } + + switch inserts[i].ValueType { + case prometheus.CounterValue: + v := m.Counter + if v == nil { + v = &dto.Counter{} + } + v.Value = &inserts[i].Value + m.Counter = v + m.Gauge = nil + m.Untyped = nil + case prometheus.GaugeValue: + v := m.Gauge + if v == nil { + v = &dto.Gauge{} + } + v.Value = &inserts[i].Value + m.Counter = nil + m.Gauge = v + m.Untyped = nil + case prometheus.UntypedValue: + v := m.Untyped + if v == nil { + v = &dto.Untyped{} + } + v.Value = &inserts[i].Value + m.Counter = nil + m.Gauge = nil + m.Untyped = v + default: + return fmt.Errorf("unsupported value type %v", inserts[i].ValueType) + } + + m.TimestampMs = nil + if inserts[i].Timestamp != nil { + m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) + } + currMetrics[hSum] = m + + if !reset && ok { + // If we did update without reset and we found metric in previous + // map, we know metric pointer exists in metric family map, so just continue. + continue + } + + // Will be sorted later anyway, so just append. + mf.Metric = append(mf.Metric, m) + } + + for _, del := range deletions { + if err := del.isValid(); err != nil { + errs.Append(err) + continue + } + + hSum := del.hash() + m, ok := currMetrics[hSum] + if !ok { + continue + } + delete(currMetrics, hSum) + + mf, ok := currMetricFamilies[del.FQName] + if !ok { + // Impossible, but well... + errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues)) + continue + } + + toDel := -1 + for i := range mf.Metric { + if mf.Metric[i] == m { + toDel = i + break + } + } + + if toDel == -1 { + errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues)) + continue + } + + if len(mf.Metric) == 1 { + delete(currMetricFamilies, del.FQName) + continue + } + + mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...) + } + + c.metrics = currMetrics + c.metricFamilyByName = currMetricFamilies + return errs.MaybeUnwrap() +} diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go new file mode 100644 index 0000000..f528117 --- /dev/null +++ b/prometheus/cache/cache_test.go @@ -0,0 +1,275 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func TestCachedTGatherer(t *testing.T) { + c := NewCachedTGatherer() + mfs, done, err := c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if got := mfsToString(mfs); got != "" { + t.Error("unexpected metric family", got) + } + + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help b", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + }, []Key{ + {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. + }); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + const expected = "name:\"a\" help:\"help a2\" type:COUNTER metric: label: " + + "gauge: > metric: label: counter: > ,name:\"b\" help:\"help b\" " + + "type:GAUGE metric: label: gauge: > " + if got := mfsToString(mfs); got != expected { + t.Error("unexpected metric family, got", got) + } + + // Update with exactly same insertion should have the same effect. + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help b", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + }, []Key{ + {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. + }); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != expected { + t.Error("unexpected metric family, got", got) + } + + // Update one element. + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a12321", + ValueType: prometheus.CounterValue, + Value: 9999, + }, + }, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != "name:\"a\" help:\"help a12321\" type:COUNTER metric: label:"+ + " counter: > metric: label: counter: > ,name:\"b\" help:\"help b\" "+ + "type:GAUGE metric: label: gauge: > " { + t.Error("unexpected metric family, got", got) + } + + // Rebuild cache and insert only 2 elements. + if err := c.Update(true, []Insert{ + { + Key: Key{FQName: "ax", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help ax", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "bx", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help bx", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + }, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != "name:\"ax\" help:\"help ax\" type:GAUGE metric: label:"+ + " gauge: > ,name:\"bx\" help:\"help bx\" type:GAUGE metric: label: gauge: > " { + t.Error("unexpected metric family, got", got) + } + + if err := c.Update(true, nil, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if got := mfsToString(mfs); got != "" { + t.Error("unexpected metric family", got) + } +} + +func mfsToString(mfs []*dto.MetricFamily) string { + ret := make([]string, 0, len(mfs)) + for _, m := range mfs { + ret = append(ret, m.String()) + } + return strings.Join(ret, ",") +} + +// export var=v1 && go test -count 5 -benchtime 100x -run '^$' -bench . -memprofile=${var}.mem.pprof -cpuprofile=${var}.cpu.pprof > ${var}.txt +func BenchmarkCachedTGatherer_Update(b *testing.B) { + c := NewCachedTGatherer() + + // Generate larger metric payload. + inserts := make([]Insert, 0, 1e6) + + // 1000 metrics in 1000 families. + for i := 0; i < 1e3; i++ { + for j := 0; j < 1e3; j++ { + inserts = append(inserts, Insert{ + Key: Key{ + FQName: fmt.Sprintf("realistic_longer_name_%d", i), + LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, + LabelValues: []string{"realistic_label_value1", "realistic_label_value2", fmt.Sprintf("realistic_label_value3_%d", j)}}, + Help: "help string is usually quite large, so let's make it a bit realistic.", + ValueType: prometheus.GaugeValue, + Value: float64(j), + }) + } + } + + if err := c.Update(false, inserts, nil); err != nil { + b.Error("update:", err) + } + + if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 { + // Ensure we did not generate duplicates. + panic("generated data set gave wrong numbers") + } + + b.Run("Update of one element without reset", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := c.Update(false, []Insert{ + { + Key: Key{ + FQName: "realistic_longer_name_334", + LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, + LabelValues: []string{"realistic_label_value1", "realistic_label_value2", "realistic_label_value3_2345"}}, + Help: "CUSTOM help string is usually quite large, so let's make it a bit realistic.", + ValueType: prometheus.CounterValue, + Value: 1929495, + }, + }, nil); err != nil { + b.Error("update:", err) + } + } + }) + + b.Run("Update of all elements with reset", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := c.Update(true, inserts, nil); err != nil { + b.Error("update:", err) + } + } + }) + + b.Run("Gather", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mfs, done, err := c.Gather() + done() + if err != nil { + b.Error("update:", err) + } + testMfs = mfs + } + }) +} + +var testMfs []*dto.MetricFamily diff --git a/prometheus/desc.go b/prometheus/desc.go index 4bb816a..ee81107 100644 --- a/prometheus/desc.go +++ b/prometheus/desc.go @@ -20,6 +20,8 @@ import ( "strings" "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus/internal" + //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" "github.com/prometheus/common/model" @@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) * Value: proto.String(v), }) } - sort.Sort(labelPairSorter(d.constLabelPairs)) + sort.Sort(internal.LabelPairSorter(d.constLabelPairs)) return d } diff --git a/prometheus/internal/metric.go b/prometheus/internal/metric.go index 351c26e..089ab5b 100644 --- a/prometheus/internal/metric.go +++ b/prometheus/internal/metric.go @@ -19,6 +19,22 @@ import ( dto "github.com/prometheus/client_model/go" ) +// LabelPairSorter implements sort.Interface. It is used to sort a slice of +// dto.LabelPair pointers. +type LabelPairSorter []*dto.LabelPair + +func (s LabelPairSorter) Len() int { + return len(s) +} + +func (s LabelPairSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s LabelPairSorter) Less(i, j int) bool { + return s[i].GetName() < s[j].GetName() +} + // metricSorter is a sortable slice of *dto.Metric. type metricSorter []*dto.Metric diff --git a/prometheus/metric.go b/prometheus/metric.go index dc12191..118a54e 100644 --- a/prometheus/metric.go +++ b/prometheus/metric.go @@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string { return name } -// labelPairSorter implements sort.Interface. It is used to sort a slice of -// dto.LabelPair pointers. -type labelPairSorter []*dto.LabelPair - -func (s labelPairSorter) Len() int { - return len(s) -} - -func (s labelPairSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (s labelPairSorter) Less(i, j int) bool { - return s[i].GetName() < s[j].GetName() -} - type invalidMetric struct { desc *Desc err error diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index d86d0cf..b463a74 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -84,6 +84,10 @@ func Handler() http.Handler { // instrumentation. Use the InstrumentMetricHandler function to apply the same // kind of instrumentation as it is used by the Handler function. func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { + return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts) +} + +func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler { var ( inFlightSem chan struct{} errCnt = prometheus.NewCounterVec( @@ -113,6 +117,7 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if inFlightSem != nil { + // TODO(bwplotka): Implement single-flight which is essential for blocking TransactionalGatherer. select { case inFlightSem <- struct{}{}: // All good, carry on. defer func() { <-inFlightSem }() @@ -123,7 +128,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - mfs, err := reg.Gather() + mfs, done, err := reg.Gather() + defer done() if err != nil { if opts.ErrorLog != nil { opts.ErrorLog.Println("error gathering metrics:", err) diff --git a/prometheus/registry.go b/prometheus/registry.go index 383a7f5..5046f7e 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -407,6 +407,14 @@ func (r *Registry) MustRegister(cs ...Collector) { // Gather implements Gatherer. func (r *Registry) Gather() ([]*dto.MetricFamily, error) { + r.mtx.RLock() + + if len(r.collectorsByID) == 0 && len(r.uncheckedCollectors) == 0 { + // Fast path. + r.mtx.RUnlock() + return nil, nil + } + var ( checkedMetricChan = make(chan Metric, capMetricChan) uncheckedMetricChan = make(chan Metric, capMetricChan) @@ -416,7 +424,6 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { registeredDescIDs map[uint64]struct{} // Only used for pedantic checks ) - r.mtx.RLock() goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) checkedCollectors := make(chan Collector, len(r.collectorsByID)) @@ -884,11 +891,11 @@ func checkMetricConsistency( h.Write(separatorByteSlice) // Make sure label pairs are sorted. We depend on it for the consistency // check. - if !sort.IsSorted(labelPairSorter(dtoMetric.Label)) { + if !sort.IsSorted(internal.LabelPairSorter(dtoMetric.Label)) { // We cannot sort dtoMetric.Label in place as it is immutable by contract. copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label)) copy(copiedLabels, dtoMetric.Label) - sort.Sort(labelPairSorter(copiedLabels)) + sort.Sort(internal.LabelPairSorter(copiedLabels)) dtoMetric.Label = copiedLabels } for _, lp := range dtoMetric.Label { @@ -935,7 +942,7 @@ func checkDescConsistency( metricFamily.GetName(), dtoMetric, desc, ) } - sort.Sort(labelPairSorter(lpsFromDesc)) + sort.Sort(internal.LabelPairSorter(lpsFromDesc)) for i, lpFromDesc := range lpsFromDesc { lpFromMetric := dtoMetric.Label[i] if lpFromDesc.GetName() != lpFromMetric.GetName() || @@ -948,3 +955,89 @@ func checkDescConsistency( } return nil } + +var _ TransactionalGatherer = &MultiTRegistry{} + +// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple +// transactional gatherers. +// +// It is caller responsibility to ensure two registries have mutually exclusive metric families, +// no deduplication will happen. +type MultiTRegistry struct { + tGatherers []TransactionalGatherer +} + +// NewMultiTRegistry creates MultiTRegistry. +func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry { + return &MultiTRegistry{ + tGatherers: tGatherers, + } +} + +// Gather implements TransactionalGatherer interface. +func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) { + errs := MultiError{} + + dFns := make([]func(), 0, len(r.tGatherers)) + // TODO(bwplotka): Implement concurrency for those? + for _, g := range r.tGatherers { + // TODO(bwplotka): Check for duplicates? + m, d, err := g.Gather() + errs.Append(err) + + mfs = append(mfs, m...) + dFns = append(dFns, d) + } + + // TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already. + sort.Slice(mfs, func(i, j int) bool { + return *mfs[i].Name < *mfs[j].Name + }) + return mfs, func() { + for _, d := range dFns { + d() + } + }, errs.MaybeUnwrap() +} + +// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory +// used by metric family is no longer used by a caller. This allows implementations with cache. +type TransactionalGatherer interface { + // Gather returns metrics in a lexicographically sorted slice + // of uniquely named MetricFamily protobufs. Gather ensures that the + // returned slice is valid and self-consistent so that it can be used + // for valid exposition. As an exception to the strict consistency + // requirements described for metric.Desc, Gather will tolerate + // different sets of label names for metrics of the same metric family. + // + // Even if an error occurs, Gather attempts to gather as many metrics as + // possible. Hence, if a non-nil error is returned, the returned + // MetricFamily slice could be nil (in case of a fatal error that + // prevented any meaningful metric collection) or contain a number of + // MetricFamily protobufs, some of which might be incomplete, and some + // might be missing altogether. The returned error (which might be a + // MultiError) explains the details. Note that this is mostly useful for + // debugging purposes. If the gathered protobufs are to be used for + // exposition in actual monitoring, it is almost always better to not + // expose an incomplete result and instead disregard the returned + // MetricFamily protobufs in case the returned error is non-nil. + // + // Important: done is expected to be triggered (even if the error occurs!) + // once caller does not need returned slice of dto.MetricFamily. + Gather() (_ []*dto.MetricFamily, done func(), err error) +} + +// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function. +func ToTransactionalGatherer(g Gatherer) TransactionalGatherer { + return &noTransactionGatherer{g: g} +} + +type noTransactionGatherer struct { + g Gatherer +} + +// Gather implements TransactionalGatherer interface. +func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + mfs, err := g.g.Gather() + return mfs, func() {}, err +} diff --git a/prometheus/registry_test.go b/prometheus/registry_test.go index 0ff7a64..7a959da 100644 --- a/prometheus/registry_test.go +++ b/prometheus/registry_test.go @@ -21,6 +21,7 @@ package prometheus_test import ( "bytes" + "errors" "fmt" "io/ioutil" "math/rand" @@ -1175,3 +1176,82 @@ func TestAlreadyRegisteredCollision(t *testing.T) { } } } + +type tGatherer struct { + done bool + err error +} + +func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + name := "g1" + val := 1.0 + return []*dto.MetricFamily{ + {Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}}, + }, func() { g.done = true }, g.err +} + +func TestNewMultiTRegistry(t *testing.T) { + treg := &tGatherer{} + + t.Run("one registry", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(treg) + ret, done, err := m.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if len(ret) != 1 { + t.Error("unexpected number of metric families, expected 1, got", ret) + } + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) + + reg := prometheus.NewRegistry() + if err := reg.Register(prometheus.NewCounter(prometheus.CounterOpts{Name: "c1", Help: "help c1"})); err != nil { + t.Error("registration failed:", err) + } + + // Note on purpose two registries will have exactly same metric family name (but with different string). + // This behaviour is undefined at the moment. + if err := reg.Register(prometheus.NewGauge(prometheus.GaugeOpts{Name: "g1", Help: "help g1"})); err != nil { + t.Error("registration failed:", err) + } + treg.done = false + + t.Run("two registries", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg) + ret, done, err := m.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if len(ret) != 3 { + t.Error("unexpected number of metric families, expected 3, got", ret) + } + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) + + treg.done = false + // Inject error. + treg.err = errors.New("test err") + + t.Run("two registries, one with error", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg) + ret, done, err := m.Gather() + if err != treg.err { + t.Error("unexpected error:", err) + } + done() + if len(ret) != 3 { + t.Error("unexpected number of metric families, expected 3, got", ret) + } + // Still on error, we expect done to be triggered. + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) +} diff --git a/prometheus/testutil/testutil.go b/prometheus/testutil/testutil.go index 9af60ce..bf95bea 100644 --- a/prometheus/testutil/testutil.go +++ b/prometheus/testutil/testutil.go @@ -167,7 +167,16 @@ func CollectAndCompare(c prometheus.Collector, expected io.Reader, metricNames . // exposition format. If any metricNames are provided, only metrics with those // names are compared. func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error { - got, err := g.Gather() + return TransactionalGatherAndCompare(prometheus.ToTransactionalGatherer(g), expected, metricNames...) +} + +// TransactionalGatherAndCompare gathers all metrics from the provided Gatherer and compares +// it to an expected output read from the provided Reader in the Prometheus text +// exposition format. If any metricNames are provided, only metrics with those +// names are compared. +func TransactionalGatherAndCompare(g prometheus.TransactionalGatherer, expected io.Reader, metricNames ...string) error { + got, done, err := g.Gather() + defer done() if err != nil { return fmt.Errorf("gathering metrics failed: %s", err) } diff --git a/prometheus/value.go b/prometheus/value.go index b4e0ae1..9f10695 100644 --- a/prometheus/value.go +++ b/prometheus/value.go @@ -21,6 +21,7 @@ import ( //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" "google.golang.org/protobuf/types/known/timestamppb" dto "github.com/prometheus/client_model/go" @@ -38,6 +39,23 @@ const ( UntypedValue ) +var ( + CounterMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_COUNTER; return &d }() + GaugeMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_GAUGE; return &d }() + UntypedMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_UNTYPED; return &d }() +) + +func (v ValueType) ToDTO() *dto.MetricType { + switch v { + case CounterValue: + return CounterMetricTypePtr + case GaugeValue: + return GaugeMetricTypePtr + default: + return UntypedMetricTypePtr + } +} + // valueFunc is a generic metric for simple values retrieved on collect time // from a function. It implements Metric and Collector. Its effective type is // determined by ValueType. This is a low-level building block used by the @@ -91,11 +109,15 @@ func NewConstMetric(desc *Desc, valueType ValueType, value float64, labelValues if err := validateLabelValues(labelValues, len(desc.variableLabels)); err != nil { return nil, err } + + metric := &dto.Metric{} + if err := populateMetric(valueType, value, MakeLabelPairs(desc, labelValues), nil, metric); err != nil { + return nil, err + } + return &constMetric{ - desc: desc, - valType: valueType, - val: value, - labelPairs: MakeLabelPairs(desc, labelValues), + desc: desc, + metric: metric, }, nil } @@ -110,10 +132,8 @@ func MustNewConstMetric(desc *Desc, valueType ValueType, value float64, labelVal } type constMetric struct { - desc *Desc - valType ValueType - val float64 - labelPairs []*dto.LabelPair + desc *Desc + metric *dto.Metric } func (m *constMetric) Desc() *Desc { @@ -121,7 +141,11 @@ func (m *constMetric) Desc() *Desc { } func (m *constMetric) Write(out *dto.Metric) error { - return populateMetric(m.valType, m.val, m.labelPairs, nil, out) + out.Label = m.metric.Label + out.Counter = m.metric.Counter + out.Gauge = m.metric.Gauge + out.Untyped = m.metric.Untyped + return nil } func populateMetric( @@ -170,7 +194,7 @@ func MakeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair { }) } labelPairs = append(labelPairs, desc.constLabelPairs...) - sort.Sort(labelPairSorter(labelPairs)) + sort.Sort(internal.LabelPairSorter(labelPairs)) return labelPairs } diff --git a/prometheus/wrap.go b/prometheus/wrap.go index 74ee932..c29f94b 100644 --- a/prometheus/wrap.go +++ b/prometheus/wrap.go @@ -20,6 +20,7 @@ import ( //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" dto "github.com/prometheus/client_model/go" ) @@ -182,7 +183,7 @@ func (m *wrappingMetric) Write(out *dto.Metric) error { Value: proto.String(lv), }) } - sort.Sort(labelPairSorter(out.Label)) + sort.Sort(internal.LabelPairSorter(out.Label)) return nil }