From 1f81b3e9130fe21c55b7db82ecb2d61477358d61 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 23 Feb 2022 12:22:52 +0100 Subject: [PATCH] Added Transactional Gatherer allowed cached solutions (#989) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added cached collector. Signed-off-by: Bartlomiej Plotka update. Signed-off-by: Bartlomiej Plotka Attempt 2 Signed-off-by: Bartlomiej Plotka Added blocking registry, with raw collector and transactional handler. Signed-off-by: Bartlomiej Plotka Added fast path to normal (empty) registry to save 8 allocs and 3K5B per Gather. Signed-off-by: Bartlomiej Plotka Simplified API, added tests. Signed-off-by: Bartlomiej Plotka Fix. Signed-off-by: Bartlomiej Plotka Simplified implementation. Signed-off-by: Bartlomiej Plotka Added benchmark. Signed-off-by: Bartlomiej Plotka Optimized. Signed-off-by: Bartlomiej Plotka * Optimization attempt. Signed-off-by: Bartlomiej Plotka * Revert "Optimization attempt." This reverts commit 2fcaf51be9a12b4b95413b6b3e0c13fabfaaf73f. Optimization was not worth it: benchstat v1.txt v2.txt name old time/op new time/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 2.64µs ± 0% 4.05µs ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 701ms ± 0% 358ms ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 535µs ± 0% 703934µs ± 0% ~ (p=1.000 n=1+1) name old alloc/op new alloc/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 208B ± 0% 208B ± 0% ~ (all equal) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 40.2MB ± 0% 41.1MB ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 48.6kB ± 0% 84.3kB ± 0% ~ (p=1.000 n=1+1) name old allocs/op new allocs/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 3.00 ± 0% 3.00 ± 0% ~ (all equal) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 6.00 ± 0% 4003.00 ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 1.00k ± 0% 2.01k ± 0% ~ (p=1.000 n=1+1) * nit. Signed-off-by: Bartlomiej Plotka * Another optimization attempt. Signed-off-by: Bartlomiej Plotka * rename and further optimization. Signed-off-by: Bartlomiej Plotka * Hopefully final optimization. benchstat -delta-test=none v6.txt v9.txt name old time/op new time/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 13.1ms ± 0% 0.0ms ± 0% -99.81% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 309ms ± 0% 282ms ± 0% -8.77% CachedTGatherer_Update/Gather-12 422ms ± 0% 0ms ± 0% -99.95% name old alloc/op new alloc/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 208B ± 0% 208B ± 0% 0.00% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 2.47kB ± 0% 1.67kB ± 0% -32.56% CachedTGatherer_Update/Gather-12 52.8kB ± 0% 24.6kB ± 0% -53.34% name old allocs/op new allocs/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 3.00 ± 0% 3.00 ± 0% 0.00% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 0.00 0.00 0.00% CachedTGatherer_Update/Gather-12 1.00k ± 0% 0.00k ± 0% -99.60% Signed-off-by: Bartlomiej Plotka * Removed obsolete comment Signed-off-by: Bartlomiej Plotka * Fixed tests. Signed-off-by: Bartlomiej Plotka * Removed cache. Signed-off-by: Bartlomiej Plotka * Fixed tests. Signed-off-by: Bartlomiej Plotka * Re-add cache. Signed-off-by: Bartlomiej Plotka * Removed cache. Signed-off-by: Bartlomiej Plotka --- prometheus/desc.go | 4 +- prometheus/internal/metric.go | 28 ++++++-- prometheus/metric.go | 16 ----- prometheus/promhttp/http.go | 10 ++- prometheus/promhttp/http_test.go | 107 ++++++++++++++++++++++++------- prometheus/registry.go | 101 +++++++++++++++++++++++++++-- prometheus/registry_test.go | 80 +++++++++++++++++++++++ prometheus/testutil/testutil.go | 11 +++- prometheus/value.go | 44 ++++++++++--- prometheus/wrap.go | 3 +- 10 files changed, 340 insertions(+), 64 deletions(-) diff --git a/prometheus/desc.go b/prometheus/desc.go index 4bb816a..ee81107 100644 --- a/prometheus/desc.go +++ b/prometheus/desc.go @@ -20,6 +20,8 @@ import ( "strings" "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus/internal" + //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" "github.com/prometheus/common/model" @@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) * Value: proto.String(v), }) } - sort.Sort(labelPairSorter(d.constLabelPairs)) + sort.Sort(internal.LabelPairSorter(d.constLabelPairs)) return d } diff --git a/prometheus/internal/metric.go b/prometheus/internal/metric.go index 351c26e..6515c11 100644 --- a/prometheus/internal/metric.go +++ b/prometheus/internal/metric.go @@ -19,18 +19,34 @@ import ( dto "github.com/prometheus/client_model/go" ) -// metricSorter is a sortable slice of *dto.Metric. -type metricSorter []*dto.Metric +// LabelPairSorter implements sort.Interface. It is used to sort a slice of +// dto.LabelPair pointers. +type LabelPairSorter []*dto.LabelPair -func (s metricSorter) Len() int { +func (s LabelPairSorter) Len() int { return len(s) } -func (s metricSorter) Swap(i, j int) { +func (s LabelPairSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s metricSorter) Less(i, j int) bool { +func (s LabelPairSorter) Less(i, j int) bool { + return s[i].GetName() < s[j].GetName() +} + +// MetricSorter is a sortable slice of *dto.Metric. +type MetricSorter []*dto.Metric + +func (s MetricSorter) Len() int { + return len(s) +} + +func (s MetricSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s MetricSorter) Less(i, j int) bool { if len(s[i].Label) != len(s[j].Label) { // This should not happen. The metrics are // inconsistent. However, we have to deal with the fact, as @@ -68,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool { // the slice, with the contained Metrics sorted within each MetricFamily. func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily { for _, mf := range metricFamiliesByName { - sort.Sort(metricSorter(mf.Metric)) + sort.Sort(MetricSorter(mf.Metric)) } names := make([]string, 0, len(metricFamiliesByName)) for name, mf := range metricFamiliesByName { diff --git a/prometheus/metric.go b/prometheus/metric.go index dc12191..118a54e 100644 --- a/prometheus/metric.go +++ b/prometheus/metric.go @@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string { return name } -// labelPairSorter implements sort.Interface. It is used to sort a slice of -// dto.LabelPair pointers. -type labelPairSorter []*dto.LabelPair - -func (s labelPairSorter) Len() int { - return len(s) -} - -func (s labelPairSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (s labelPairSorter) Less(i, j int) bool { - return s[i].GetName() < s[j].GetName() -} - type invalidMetric struct { desc *Desc err error diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index d86d0cf..a6e4f85 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -84,6 +84,13 @@ func Handler() http.Handler { // instrumentation. Use the InstrumentMetricHandler function to apply the same // kind of instrumentation as it is used by the Handler function. func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { + return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts) +} + +// HandlerForTransactional is like HandlerFor, but it uses transactional gather, which +// can safely change in-place returned *dto.MetricFamily before call to `Gather` and after +// call to `done` of that `Gather`. +func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler { var ( inFlightSem chan struct{} errCnt = prometheus.NewCounterVec( @@ -123,7 +130,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - mfs, err := reg.Gather() + mfs, done, err := reg.Gather() + defer done() if err != nil { if opts.ErrorLog != nil { opts.ErrorLog.Println("error gathering metrics:", err) diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 781ea8f..53204c5 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -16,6 +16,7 @@ package promhttp import ( "bytes" "errors" + "fmt" "log" "net/http" "net/http/httptest" @@ -24,6 +25,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) type errorCollector struct{} @@ -56,8 +58,19 @@ func (b blockingCollector) Collect(ch chan<- prometheus.Metric) { <-b.Block } -func TestHandlerErrorHandling(t *testing.T) { +type mockTransactionGatherer struct { + g prometheus.Gatherer + gatherInvoked int + doneInvoked int +} +func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + g.gatherInvoked++ + mfs, err := g.g.Gather() + return mfs, func() { g.doneInvoked++ }, err +} + +func TestHandlerErrorHandling(t *testing.T) { // Create a registry that collects a MetricFamily with two elements, // another with one, and reports an error. Further down, we'll use the // same registry in the HandlerOpts. @@ -90,21 +103,30 @@ func TestHandlerErrorHandling(t *testing.T) { request, _ := http.NewRequest("GET", "/", nil) request.Header.Add("Accept", "test/plain") - errorHandler := HandlerFor(reg, HandlerOpts{ + mReg := &mockTransactionGatherer{g: reg} + errorHandler := HandlerForTransactional(mReg, HandlerOpts{ ErrorLog: logger, ErrorHandling: HTTPErrorOnError, Registry: reg, }) - continueHandler := HandlerFor(reg, HandlerOpts{ + continueHandler := HandlerForTransactional(mReg, HandlerOpts{ ErrorLog: logger, ErrorHandling: ContinueOnError, Registry: reg, }) - panicHandler := HandlerFor(reg, HandlerOpts{ + panicHandler := HandlerForTransactional(mReg, HandlerOpts{ ErrorLog: logger, ErrorHandling: PanicOnError, Registry: reg, }) + // Expect gatherer not touched. + if got := mReg.gatherInvoked; got != 0 { + t.Fatalf("unexpected number of gather invokes, want 0, got %d", got) + } + if got := mReg.doneInvoked; got != 0 { + t.Fatalf("unexpected number of done invokes, want 0, got %d", got) + } + wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` wantErrorBody := `An error has occurred while serving metrics: @@ -140,25 +162,39 @@ the_count 0 ` errorHandler.ServeHTTP(writer, request) + if got := mReg.gatherInvoked; got != 1 { + t.Fatalf("unexpected number of gather invokes, want 1, got %d", got) + } + if got := mReg.doneInvoked; got != 1 { + t.Fatalf("unexpected number of done invokes, want 1, got %d", got) + } if got, want := writer.Code, http.StatusInternalServerError; got != want { t.Errorf("got HTTP status code %d, want %d", got, want) } - if got := logBuf.String(); got != wantMsg { - t.Errorf("got log message:\n%s\nwant log message:\n%s\n", got, wantMsg) + if got, want := logBuf.String(), wantMsg; got != want { + t.Errorf("got log buf %q, want %q", got, want) } - if got := writer.Body.String(); got != wantErrorBody { - t.Errorf("got body:\n%s\nwant body:\n%s\n", got, wantErrorBody) + if got, want := writer.Body.String(), wantErrorBody; got != want { + t.Errorf("got body %q, want %q", got, want) } + logBuf.Reset() writer.Body.Reset() writer.Code = http.StatusOK continueHandler.ServeHTTP(writer, request) + + if got := mReg.gatherInvoked; got != 2 { + t.Fatalf("unexpected number of gather invokes, want 2, got %d", got) + } + if got := mReg.doneInvoked; got != 2 { + t.Fatalf("unexpected number of done invokes, want 2, got %d", got) + } if got, want := writer.Code, http.StatusOK; got != want { t.Errorf("got HTTP status code %d, want %d", got, want) } - if got := logBuf.String(); got != wantMsg { - t.Errorf("got log message %q, want %q", got, wantMsg) + if got, want := logBuf.String(), wantMsg; got != want { + t.Errorf("got log buf %q, want %q", got, want) } if got := writer.Body.String(); got != wantOKBody1 && got != wantOKBody2 { t.Errorf("got body %q, want either %q or %q", got, wantOKBody1, wantOKBody2) @@ -168,20 +204,34 @@ the_count 0 if err := recover(); err == nil { t.Error("expected panic from panicHandler") } + if got := mReg.gatherInvoked; got != 3 { + t.Fatalf("unexpected number of gather invokes, want 3, got %d", got) + } + if got := mReg.doneInvoked; got != 3 { + t.Fatalf("unexpected number of done invokes, want 3, got %d", got) + } }() panicHandler.ServeHTTP(writer, request) } func TestInstrumentMetricHandler(t *testing.T) { reg := prometheus.NewRegistry() - handler := InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{})) + mReg := &mockTransactionGatherer{g: reg} + handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) // Do it again to test idempotency. - InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{})) + InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) request.Header.Add("Accept", "test/plain") handler.ServeHTTP(writer, request) + if got := mReg.gatherInvoked; got != 1 { + t.Fatalf("unexpected number of gather invokes, want 1, got %d", got) + } + if got := mReg.doneInvoked; got != 1 { + t.Fatalf("unexpected number of done invokes, want 1, got %d", got) + } + if got, want := writer.Code, http.StatusOK; got != want { t.Errorf("got HTTP status code %d, want %d", got, want) } @@ -195,19 +245,28 @@ func TestInstrumentMetricHandler(t *testing.T) { t.Errorf("got body %q, does not contain %q", got, want) } - writer.Body.Reset() - handler.ServeHTTP(writer, request) - if got, want := writer.Code, http.StatusOK; got != want { - t.Errorf("got HTTP status code %d, want %d", got, want) - } + for i := 0; i < 100; i++ { + writer.Body.Reset() + handler.ServeHTTP(writer, request) - want = "promhttp_metric_handler_requests_in_flight 1\n" - if got := writer.Body.String(); !strings.Contains(got, want) { - t.Errorf("got body %q, does not contain %q", got, want) - } - want = "promhttp_metric_handler_requests_total{code=\"200\"} 1\n" - if got := writer.Body.String(); !strings.Contains(got, want) { - t.Errorf("got body %q, does not contain %q", got, want) + if got, want := mReg.gatherInvoked, i+2; got != want { + t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got) + } + if got, want := mReg.doneInvoked, i+2; got != want { + t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := writer.Body.String(); !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q", got, want) + } + want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1) + if got := writer.Body.String(); !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q", got, want) + } } } diff --git a/prometheus/registry.go b/prometheus/registry.go index 383a7f5..5046f7e 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -407,6 +407,14 @@ func (r *Registry) MustRegister(cs ...Collector) { // Gather implements Gatherer. func (r *Registry) Gather() ([]*dto.MetricFamily, error) { + r.mtx.RLock() + + if len(r.collectorsByID) == 0 && len(r.uncheckedCollectors) == 0 { + // Fast path. + r.mtx.RUnlock() + return nil, nil + } + var ( checkedMetricChan = make(chan Metric, capMetricChan) uncheckedMetricChan = make(chan Metric, capMetricChan) @@ -416,7 +424,6 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { registeredDescIDs map[uint64]struct{} // Only used for pedantic checks ) - r.mtx.RLock() goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) checkedCollectors := make(chan Collector, len(r.collectorsByID)) @@ -884,11 +891,11 @@ func checkMetricConsistency( h.Write(separatorByteSlice) // Make sure label pairs are sorted. We depend on it for the consistency // check. - if !sort.IsSorted(labelPairSorter(dtoMetric.Label)) { + if !sort.IsSorted(internal.LabelPairSorter(dtoMetric.Label)) { // We cannot sort dtoMetric.Label in place as it is immutable by contract. copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label)) copy(copiedLabels, dtoMetric.Label) - sort.Sort(labelPairSorter(copiedLabels)) + sort.Sort(internal.LabelPairSorter(copiedLabels)) dtoMetric.Label = copiedLabels } for _, lp := range dtoMetric.Label { @@ -935,7 +942,7 @@ func checkDescConsistency( metricFamily.GetName(), dtoMetric, desc, ) } - sort.Sort(labelPairSorter(lpsFromDesc)) + sort.Sort(internal.LabelPairSorter(lpsFromDesc)) for i, lpFromDesc := range lpsFromDesc { lpFromMetric := dtoMetric.Label[i] if lpFromDesc.GetName() != lpFromMetric.GetName() || @@ -948,3 +955,89 @@ func checkDescConsistency( } return nil } + +var _ TransactionalGatherer = &MultiTRegistry{} + +// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple +// transactional gatherers. +// +// It is caller responsibility to ensure two registries have mutually exclusive metric families, +// no deduplication will happen. +type MultiTRegistry struct { + tGatherers []TransactionalGatherer +} + +// NewMultiTRegistry creates MultiTRegistry. +func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry { + return &MultiTRegistry{ + tGatherers: tGatherers, + } +} + +// Gather implements TransactionalGatherer interface. +func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) { + errs := MultiError{} + + dFns := make([]func(), 0, len(r.tGatherers)) + // TODO(bwplotka): Implement concurrency for those? + for _, g := range r.tGatherers { + // TODO(bwplotka): Check for duplicates? + m, d, err := g.Gather() + errs.Append(err) + + mfs = append(mfs, m...) + dFns = append(dFns, d) + } + + // TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already. + sort.Slice(mfs, func(i, j int) bool { + return *mfs[i].Name < *mfs[j].Name + }) + return mfs, func() { + for _, d := range dFns { + d() + } + }, errs.MaybeUnwrap() +} + +// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory +// used by metric family is no longer used by a caller. This allows implementations with cache. +type TransactionalGatherer interface { + // Gather returns metrics in a lexicographically sorted slice + // of uniquely named MetricFamily protobufs. Gather ensures that the + // returned slice is valid and self-consistent so that it can be used + // for valid exposition. As an exception to the strict consistency + // requirements described for metric.Desc, Gather will tolerate + // different sets of label names for metrics of the same metric family. + // + // Even if an error occurs, Gather attempts to gather as many metrics as + // possible. Hence, if a non-nil error is returned, the returned + // MetricFamily slice could be nil (in case of a fatal error that + // prevented any meaningful metric collection) or contain a number of + // MetricFamily protobufs, some of which might be incomplete, and some + // might be missing altogether. The returned error (which might be a + // MultiError) explains the details. Note that this is mostly useful for + // debugging purposes. If the gathered protobufs are to be used for + // exposition in actual monitoring, it is almost always better to not + // expose an incomplete result and instead disregard the returned + // MetricFamily protobufs in case the returned error is non-nil. + // + // Important: done is expected to be triggered (even if the error occurs!) + // once caller does not need returned slice of dto.MetricFamily. + Gather() (_ []*dto.MetricFamily, done func(), err error) +} + +// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function. +func ToTransactionalGatherer(g Gatherer) TransactionalGatherer { + return &noTransactionGatherer{g: g} +} + +type noTransactionGatherer struct { + g Gatherer +} + +// Gather implements TransactionalGatherer interface. +func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + mfs, err := g.g.Gather() + return mfs, func() {}, err +} diff --git a/prometheus/registry_test.go b/prometheus/registry_test.go index 0ff7a64..7a959da 100644 --- a/prometheus/registry_test.go +++ b/prometheus/registry_test.go @@ -21,6 +21,7 @@ package prometheus_test import ( "bytes" + "errors" "fmt" "io/ioutil" "math/rand" @@ -1175,3 +1176,82 @@ func TestAlreadyRegisteredCollision(t *testing.T) { } } } + +type tGatherer struct { + done bool + err error +} + +func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + name := "g1" + val := 1.0 + return []*dto.MetricFamily{ + {Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}}, + }, func() { g.done = true }, g.err +} + +func TestNewMultiTRegistry(t *testing.T) { + treg := &tGatherer{} + + t.Run("one registry", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(treg) + ret, done, err := m.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if len(ret) != 1 { + t.Error("unexpected number of metric families, expected 1, got", ret) + } + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) + + reg := prometheus.NewRegistry() + if err := reg.Register(prometheus.NewCounter(prometheus.CounterOpts{Name: "c1", Help: "help c1"})); err != nil { + t.Error("registration failed:", err) + } + + // Note on purpose two registries will have exactly same metric family name (but with different string). + // This behaviour is undefined at the moment. + if err := reg.Register(prometheus.NewGauge(prometheus.GaugeOpts{Name: "g1", Help: "help g1"})); err != nil { + t.Error("registration failed:", err) + } + treg.done = false + + t.Run("two registries", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg) + ret, done, err := m.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if len(ret) != 3 { + t.Error("unexpected number of metric families, expected 3, got", ret) + } + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) + + treg.done = false + // Inject error. + treg.err = errors.New("test err") + + t.Run("two registries, one with error", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg) + ret, done, err := m.Gather() + if err != treg.err { + t.Error("unexpected error:", err) + } + done() + if len(ret) != 3 { + t.Error("unexpected number of metric families, expected 3, got", ret) + } + // Still on error, we expect done to be triggered. + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) +} diff --git a/prometheus/testutil/testutil.go b/prometheus/testutil/testutil.go index 9af60ce..bf95bea 100644 --- a/prometheus/testutil/testutil.go +++ b/prometheus/testutil/testutil.go @@ -167,7 +167,16 @@ func CollectAndCompare(c prometheus.Collector, expected io.Reader, metricNames . // exposition format. If any metricNames are provided, only metrics with those // names are compared. func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error { - got, err := g.Gather() + return TransactionalGatherAndCompare(prometheus.ToTransactionalGatherer(g), expected, metricNames...) +} + +// TransactionalGatherAndCompare gathers all metrics from the provided Gatherer and compares +// it to an expected output read from the provided Reader in the Prometheus text +// exposition format. If any metricNames are provided, only metrics with those +// names are compared. +func TransactionalGatherAndCompare(g prometheus.TransactionalGatherer, expected io.Reader, metricNames ...string) error { + got, done, err := g.Gather() + defer done() if err != nil { return fmt.Errorf("gathering metrics failed: %s", err) } diff --git a/prometheus/value.go b/prometheus/value.go index b4e0ae1..9f10695 100644 --- a/prometheus/value.go +++ b/prometheus/value.go @@ -21,6 +21,7 @@ import ( //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" "google.golang.org/protobuf/types/known/timestamppb" dto "github.com/prometheus/client_model/go" @@ -38,6 +39,23 @@ const ( UntypedValue ) +var ( + CounterMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_COUNTER; return &d }() + GaugeMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_GAUGE; return &d }() + UntypedMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_UNTYPED; return &d }() +) + +func (v ValueType) ToDTO() *dto.MetricType { + switch v { + case CounterValue: + return CounterMetricTypePtr + case GaugeValue: + return GaugeMetricTypePtr + default: + return UntypedMetricTypePtr + } +} + // valueFunc is a generic metric for simple values retrieved on collect time // from a function. It implements Metric and Collector. Its effective type is // determined by ValueType. This is a low-level building block used by the @@ -91,11 +109,15 @@ func NewConstMetric(desc *Desc, valueType ValueType, value float64, labelValues if err := validateLabelValues(labelValues, len(desc.variableLabels)); err != nil { return nil, err } + + metric := &dto.Metric{} + if err := populateMetric(valueType, value, MakeLabelPairs(desc, labelValues), nil, metric); err != nil { + return nil, err + } + return &constMetric{ - desc: desc, - valType: valueType, - val: value, - labelPairs: MakeLabelPairs(desc, labelValues), + desc: desc, + metric: metric, }, nil } @@ -110,10 +132,8 @@ func MustNewConstMetric(desc *Desc, valueType ValueType, value float64, labelVal } type constMetric struct { - desc *Desc - valType ValueType - val float64 - labelPairs []*dto.LabelPair + desc *Desc + metric *dto.Metric } func (m *constMetric) Desc() *Desc { @@ -121,7 +141,11 @@ func (m *constMetric) Desc() *Desc { } func (m *constMetric) Write(out *dto.Metric) error { - return populateMetric(m.valType, m.val, m.labelPairs, nil, out) + out.Label = m.metric.Label + out.Counter = m.metric.Counter + out.Gauge = m.metric.Gauge + out.Untyped = m.metric.Untyped + return nil } func populateMetric( @@ -170,7 +194,7 @@ func MakeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair { }) } labelPairs = append(labelPairs, desc.constLabelPairs...) - sort.Sort(labelPairSorter(labelPairs)) + sort.Sort(internal.LabelPairSorter(labelPairs)) return labelPairs } diff --git a/prometheus/wrap.go b/prometheus/wrap.go index 74ee932..c29f94b 100644 --- a/prometheus/wrap.go +++ b/prometheus/wrap.go @@ -20,6 +20,7 @@ import ( //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" dto "github.com/prometheus/client_model/go" ) @@ -182,7 +183,7 @@ func (m *wrappingMetric) Write(out *dto.Metric) error { Value: proto.String(lv), }) } - sort.Sort(labelPairSorter(out.Label)) + sort.Sort(internal.LabelPairSorter(out.Label)) return nil }