forked from mirror/client_golang
Added cached collector.
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> update. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Attempt 2 Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Added blocking registry, with raw collector and transactional handler. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Added fast path to normal (empty) registry to save 8 allocs and 3K5B per Gather. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Simplified API, added tests. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Fix. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Simplified implementation. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Added benchmark. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Optimized. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
f63e219e6b
commit
a1c9be45cf
|
@ -0,0 +1,261 @@
|
||||||
|
// Copyright 2022 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/internal"
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ prometheus.TransactionalGatherer = &CachedTGatherer{}
|
||||||
|
|
||||||
|
var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with xxhash.
|
||||||
|
|
||||||
|
// CachedTGatherer is a transactional gatherer that allows maintaining a set of metrics which
|
||||||
|
// change less frequently than scrape time, yet label values and values change over time.
|
||||||
|
//
|
||||||
|
// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider
|
||||||
|
// using CachedTGatherer instead.
|
||||||
|
//
|
||||||
|
// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers.
|
||||||
|
// NOTE(bwplotka): Experimental, API and behaviour can change.
|
||||||
|
type CachedTGatherer struct {
|
||||||
|
metrics map[uint64]*dto.Metric
|
||||||
|
metricFamilyByName map[string]*dto.MetricFamily
|
||||||
|
mMu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCachedTGatherer() *CachedTGatherer {
|
||||||
|
return &CachedTGatherer{
|
||||||
|
metrics: make(map[uint64]*dto.Metric),
|
||||||
|
metricFamilyByName: map[string]*dto.MetricFamily{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather implements TransactionalGatherer interface.
|
||||||
|
func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
|
||||||
|
c.mMu.RLock()
|
||||||
|
|
||||||
|
// BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families
|
||||||
|
// this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now.
|
||||||
|
return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type Key struct {
|
||||||
|
FQName string // __name__
|
||||||
|
|
||||||
|
// Label names can be unsorted, we will be sorting them later. The only implication is cachability if
|
||||||
|
// consumer provide non-deterministic order of those.
|
||||||
|
LabelNames []string
|
||||||
|
LabelValues []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k Key) isValid() error {
|
||||||
|
if k.FQName == "" {
|
||||||
|
return errors.New("FQName cannot be empty")
|
||||||
|
}
|
||||||
|
if len(k.LabelNames) != len(k.LabelValues) {
|
||||||
|
return errors.New("new metric: label name has different length than values")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// hash returns unique hash for this key.
|
||||||
|
func (k Key) hash() uint64 {
|
||||||
|
h := xxhash.New()
|
||||||
|
h.WriteString(k.FQName)
|
||||||
|
h.Write(separatorByteSlice)
|
||||||
|
|
||||||
|
for i := range k.LabelNames {
|
||||||
|
h.WriteString(k.LabelNames[i])
|
||||||
|
h.Write(separatorByteSlice)
|
||||||
|
h.WriteString(k.LabelValues[i])
|
||||||
|
h.Write(separatorByteSlice)
|
||||||
|
}
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert represents record to set in cache.
|
||||||
|
type Insert struct {
|
||||||
|
Key
|
||||||
|
|
||||||
|
Help string
|
||||||
|
ValueType prometheus.ValueType
|
||||||
|
Value float64
|
||||||
|
|
||||||
|
// Timestamp is optional. Pass nil for no explicit timestamp.
|
||||||
|
Timestamp *time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update goes through inserts and deletions and updates current cache in concurrency safe manner.
|
||||||
|
// If reset is set to true, all inserts and deletions are working on empty cache. In such case
|
||||||
|
// this implementation tries to reuse memory from existing cached item when possible.
|
||||||
|
//
|
||||||
|
// Update reuses insert struct memory, so after use, Insert slice and its elements cannot be reused
|
||||||
|
// outside of this method.
|
||||||
|
// TODO(bwplotka): Lack of copying can pose memory safety problems if insert variables are reused. Consider copying if value
|
||||||
|
// is different. Yet it gives significant allocation gains.
|
||||||
|
func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) error {
|
||||||
|
c.mMu.Lock()
|
||||||
|
defer c.mMu.Unlock()
|
||||||
|
|
||||||
|
currMetrics := c.metrics
|
||||||
|
currMetricFamilies := c.metricFamilyByName
|
||||||
|
if reset {
|
||||||
|
currMetrics = make(map[uint64]*dto.Metric, len(c.metrics))
|
||||||
|
currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName))
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := prometheus.MultiError{}
|
||||||
|
for i := range inserts {
|
||||||
|
// TODO(bwplotka): Validate more about this insert?
|
||||||
|
if err := inserts[i].isValid(); err != nil {
|
||||||
|
errs.Append(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update metric family.
|
||||||
|
mf, ok := c.metricFamilyByName[inserts[i].FQName]
|
||||||
|
if !ok {
|
||||||
|
mf = &dto.MetricFamily{}
|
||||||
|
mf.Name = &inserts[i].FQName
|
||||||
|
} else if reset {
|
||||||
|
// Reset metric slice, since we want to start from scratch.
|
||||||
|
mf.Metric = mf.Metric[:0]
|
||||||
|
}
|
||||||
|
mf.Type = inserts[i].ValueType.ToDTO()
|
||||||
|
mf.Help = &inserts[i].Help
|
||||||
|
|
||||||
|
currMetricFamilies[inserts[i].FQName] = mf
|
||||||
|
|
||||||
|
// Update metric pointer.
|
||||||
|
hSum := inserts[i].hash()
|
||||||
|
m, ok := c.metrics[hSum]
|
||||||
|
if !ok {
|
||||||
|
m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))}
|
||||||
|
for j := range inserts[i].LabelNames {
|
||||||
|
m.Label = append(m.Label, &dto.LabelPair{
|
||||||
|
Name: &inserts[i].LabelNames[j],
|
||||||
|
Value: &inserts[i].LabelValues[j],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Sort(internal.LabelPairSorter(m.Label))
|
||||||
|
}
|
||||||
|
|
||||||
|
switch inserts[i].ValueType {
|
||||||
|
case prometheus.CounterValue:
|
||||||
|
v := m.Counter
|
||||||
|
if v == nil {
|
||||||
|
v = &dto.Counter{}
|
||||||
|
}
|
||||||
|
v.Value = &inserts[i].Value
|
||||||
|
m.Counter = v
|
||||||
|
m.Gauge = nil
|
||||||
|
m.Untyped = nil
|
||||||
|
case prometheus.GaugeValue:
|
||||||
|
v := m.Gauge
|
||||||
|
if v == nil {
|
||||||
|
v = &dto.Gauge{}
|
||||||
|
}
|
||||||
|
v.Value = &inserts[i].Value
|
||||||
|
m.Counter = nil
|
||||||
|
m.Gauge = v
|
||||||
|
m.Untyped = nil
|
||||||
|
case prometheus.UntypedValue:
|
||||||
|
v := m.Untyped
|
||||||
|
if v == nil {
|
||||||
|
v = &dto.Untyped{}
|
||||||
|
}
|
||||||
|
v.Value = &inserts[i].Value
|
||||||
|
m.Counter = nil
|
||||||
|
m.Gauge = nil
|
||||||
|
m.Untyped = v
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported value type %v", inserts[i].ValueType)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.TimestampMs = nil
|
||||||
|
if inserts[i].Timestamp != nil {
|
||||||
|
m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000))
|
||||||
|
}
|
||||||
|
currMetrics[hSum] = m
|
||||||
|
|
||||||
|
if !reset && ok {
|
||||||
|
// If we did update without reset and we found metric in previous
|
||||||
|
// map, we know metric pointer exists in metric family map, so just continue.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Will be sorted later anyway, so just append.
|
||||||
|
mf.Metric = append(mf.Metric, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, del := range deletions {
|
||||||
|
if err := del.isValid(); err != nil {
|
||||||
|
errs.Append(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hSum := del.hash()
|
||||||
|
m, ok := currMetrics[hSum]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
delete(currMetrics, hSum)
|
||||||
|
|
||||||
|
mf, ok := currMetricFamilies[del.FQName]
|
||||||
|
if !ok {
|
||||||
|
// Impossible, but well...
|
||||||
|
errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
toDel := -1
|
||||||
|
for i := range mf.Metric {
|
||||||
|
if mf.Metric[i] == m {
|
||||||
|
toDel = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if toDel == -1 {
|
||||||
|
errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(mf.Metric) == 1 {
|
||||||
|
delete(currMetricFamilies, del.FQName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.metrics = currMetrics
|
||||||
|
c.metricFamilyByName = currMetricFamilies
|
||||||
|
return errs.MaybeUnwrap()
|
||||||
|
}
|
|
@ -0,0 +1,275 @@
|
||||||
|
// Copyright 2022 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCachedTGatherer(t *testing.T) {
|
||||||
|
c := NewCachedTGatherer()
|
||||||
|
mfs, done, err := c.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
if got := mfsToString(mfs); got != "" {
|
||||||
|
t.Error("unexpected metric family", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Update(false, []Insert{
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help a",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help b",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}},
|
||||||
|
Help: "help a2",
|
||||||
|
ValueType: prometheus.CounterValue,
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}},
|
||||||
|
Help: "help a2",
|
||||||
|
ValueType: prometheus.CounterValue,
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
}, []Key{
|
||||||
|
{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected.
|
||||||
|
}); err != nil {
|
||||||
|
t.Error("update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mfs, done, err = c.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
|
||||||
|
const expected = "name:\"a\" help:\"help a2\" type:COUNTER metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" > " +
|
||||||
|
"gauge:<value:1 > > metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc2\" > counter:<value:2 > > ,name:\"b\" help:\"help b\" " +
|
||||||
|
"type:GAUGE metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" > gauge:<value:1 > > "
|
||||||
|
if got := mfsToString(mfs); got != expected {
|
||||||
|
t.Error("unexpected metric family, got", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update with exactly same insertion should have the same effect.
|
||||||
|
if err := c.Update(false, []Insert{
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help a",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help b",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}},
|
||||||
|
Help: "help a2",
|
||||||
|
ValueType: prometheus.CounterValue,
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}},
|
||||||
|
Help: "help a2",
|
||||||
|
ValueType: prometheus.CounterValue,
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
}, []Key{
|
||||||
|
{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected.
|
||||||
|
}); err != nil {
|
||||||
|
t.Error("update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mfs, done, err = c.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
|
||||||
|
if got := mfsToString(mfs); got != expected {
|
||||||
|
t.Error("unexpected metric family, got", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update one element.
|
||||||
|
if err := c.Update(false, []Insert{
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help a12321",
|
||||||
|
ValueType: prometheus.CounterValue,
|
||||||
|
Value: 9999,
|
||||||
|
},
|
||||||
|
}, nil); err != nil {
|
||||||
|
t.Error("update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mfs, done, err = c.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
|
||||||
|
if got := mfsToString(mfs); got != "name:\"a\" help:\"help a12321\" type:COUNTER metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" >"+
|
||||||
|
" counter:<value:9999 > > metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc2\" > counter:<value:2 > > ,name:\"b\" help:\"help b\" "+
|
||||||
|
"type:GAUGE metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" > gauge:<value:1 > > " {
|
||||||
|
t.Error("unexpected metric family, got", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild cache and insert only 2 elements.
|
||||||
|
if err := c.Update(true, []Insert{
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "ax", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help ax",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Key{FQName: "bx", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}},
|
||||||
|
Help: "help bx",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
}, nil); err != nil {
|
||||||
|
t.Error("update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mfs, done, err = c.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
|
||||||
|
if got := mfsToString(mfs); got != "name:\"ax\" help:\"help ax\" type:GAUGE metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" >"+
|
||||||
|
" gauge:<value:1 > > ,name:\"bx\" help:\"help bx\" type:GAUGE metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" > gauge:<value:1 > > " {
|
||||||
|
t.Error("unexpected metric family, got", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Update(true, nil, nil); err != nil {
|
||||||
|
t.Error("update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mfs, done, err = c.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
if got := mfsToString(mfs); got != "" {
|
||||||
|
t.Error("unexpected metric family", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mfsToString(mfs []*dto.MetricFamily) string {
|
||||||
|
ret := make([]string, 0, len(mfs))
|
||||||
|
for _, m := range mfs {
|
||||||
|
ret = append(ret, m.String())
|
||||||
|
}
|
||||||
|
return strings.Join(ret, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
// export var=v1 && go test -count 5 -benchtime 100x -run '^$' -bench . -memprofile=${var}.mem.pprof -cpuprofile=${var}.cpu.pprof > ${var}.txt
|
||||||
|
func BenchmarkCachedTGatherer_Update(b *testing.B) {
|
||||||
|
c := NewCachedTGatherer()
|
||||||
|
|
||||||
|
// Generate larger metric payload.
|
||||||
|
inserts := make([]Insert, 0, 1e6)
|
||||||
|
|
||||||
|
// 1000 metrics in 1000 families.
|
||||||
|
for i := 0; i < 1e3; i++ {
|
||||||
|
for j := 0; j < 1e3; j++ {
|
||||||
|
inserts = append(inserts, Insert{
|
||||||
|
Key: Key{
|
||||||
|
FQName: fmt.Sprintf("realistic_longer_name_%d", i),
|
||||||
|
LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"},
|
||||||
|
LabelValues: []string{"realistic_label_value1", "realistic_label_value2", fmt.Sprintf("realistic_label_value3_%d", j)}},
|
||||||
|
Help: "help string is usually quite large, so let's make it a bit realistic.",
|
||||||
|
ValueType: prometheus.GaugeValue,
|
||||||
|
Value: float64(j),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Update(false, inserts, nil); err != nil {
|
||||||
|
b.Error("update:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 {
|
||||||
|
// Ensure we did not generate duplicates.
|
||||||
|
panic("generated data set gave wrong numbers")
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Run("Update of one element without reset", func(b *testing.B) {
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
if err := c.Update(false, []Insert{
|
||||||
|
{
|
||||||
|
Key: Key{
|
||||||
|
FQName: "realistic_longer_name_334",
|
||||||
|
LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"},
|
||||||
|
LabelValues: []string{"realistic_label_value1", "realistic_label_value2", "realistic_label_value3_2345"}},
|
||||||
|
Help: "CUSTOM help string is usually quite large, so let's make it a bit realistic.",
|
||||||
|
ValueType: prometheus.CounterValue,
|
||||||
|
Value: 1929495,
|
||||||
|
},
|
||||||
|
}, nil); err != nil {
|
||||||
|
b.Error("update:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("Update of all elements with reset", func(b *testing.B) {
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
if err := c.Update(true, inserts, nil); err != nil {
|
||||||
|
b.Error("update:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("Gather", func(b *testing.B) {
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
mfs, done, err := c.Gather()
|
||||||
|
done()
|
||||||
|
if err != nil {
|
||||||
|
b.Error("update:", err)
|
||||||
|
}
|
||||||
|
testMfs = mfs
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var testMfs []*dto.MetricFamily
|
|
@ -20,6 +20,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/cespare/xxhash/v2"
|
"github.com/cespare/xxhash/v2"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/internal"
|
||||||
|
|
||||||
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) *
|
||||||
Value: proto.String(v),
|
Value: proto.String(v),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
sort.Sort(labelPairSorter(d.constLabelPairs))
|
sort.Sort(internal.LabelPairSorter(d.constLabelPairs))
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,22 @@ import (
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// LabelPairSorter implements sort.Interface. It is used to sort a slice of
|
||||||
|
// dto.LabelPair pointers.
|
||||||
|
type LabelPairSorter []*dto.LabelPair
|
||||||
|
|
||||||
|
func (s LabelPairSorter) Len() int {
|
||||||
|
return len(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s LabelPairSorter) Swap(i, j int) {
|
||||||
|
s[i], s[j] = s[j], s[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s LabelPairSorter) Less(i, j int) bool {
|
||||||
|
return s[i].GetName() < s[j].GetName()
|
||||||
|
}
|
||||||
|
|
||||||
// metricSorter is a sortable slice of *dto.Metric.
|
// metricSorter is a sortable slice of *dto.Metric.
|
||||||
type metricSorter []*dto.Metric
|
type metricSorter []*dto.Metric
|
||||||
|
|
||||||
|
|
|
@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string {
|
||||||
return name
|
return name
|
||||||
}
|
}
|
||||||
|
|
||||||
// labelPairSorter implements sort.Interface. It is used to sort a slice of
|
|
||||||
// dto.LabelPair pointers.
|
|
||||||
type labelPairSorter []*dto.LabelPair
|
|
||||||
|
|
||||||
func (s labelPairSorter) Len() int {
|
|
||||||
return len(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s labelPairSorter) Swap(i, j int) {
|
|
||||||
s[i], s[j] = s[j], s[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s labelPairSorter) Less(i, j int) bool {
|
|
||||||
return s[i].GetName() < s[j].GetName()
|
|
||||||
}
|
|
||||||
|
|
||||||
type invalidMetric struct {
|
type invalidMetric struct {
|
||||||
desc *Desc
|
desc *Desc
|
||||||
err error
|
err error
|
||||||
|
|
|
@ -84,6 +84,10 @@ func Handler() http.Handler {
|
||||||
// instrumentation. Use the InstrumentMetricHandler function to apply the same
|
// instrumentation. Use the InstrumentMetricHandler function to apply the same
|
||||||
// kind of instrumentation as it is used by the Handler function.
|
// kind of instrumentation as it is used by the Handler function.
|
||||||
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
|
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
|
||||||
|
return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler {
|
||||||
var (
|
var (
|
||||||
inFlightSem chan struct{}
|
inFlightSem chan struct{}
|
||||||
errCnt = prometheus.NewCounterVec(
|
errCnt = prometheus.NewCounterVec(
|
||||||
|
@ -113,6 +117,7 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
|
||||||
|
|
||||||
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
|
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
|
||||||
if inFlightSem != nil {
|
if inFlightSem != nil {
|
||||||
|
// TODO(bwplotka): Implement single-flight which is essential for blocking TransactionalGatherer.
|
||||||
select {
|
select {
|
||||||
case inFlightSem <- struct{}{}: // All good, carry on.
|
case inFlightSem <- struct{}{}: // All good, carry on.
|
||||||
defer func() { <-inFlightSem }()
|
defer func() { <-inFlightSem }()
|
||||||
|
@ -123,7 +128,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mfs, err := reg.Gather()
|
mfs, done, err := reg.Gather()
|
||||||
|
defer done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if opts.ErrorLog != nil {
|
if opts.ErrorLog != nil {
|
||||||
opts.ErrorLog.Println("error gathering metrics:", err)
|
opts.ErrorLog.Println("error gathering metrics:", err)
|
||||||
|
|
|
@ -407,6 +407,14 @@ func (r *Registry) MustRegister(cs ...Collector) {
|
||||||
|
|
||||||
// Gather implements Gatherer.
|
// Gather implements Gatherer.
|
||||||
func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
|
func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
|
||||||
|
r.mtx.RLock()
|
||||||
|
|
||||||
|
if len(r.collectorsByID) == 0 && len(r.uncheckedCollectors) == 0 {
|
||||||
|
// Fast path.
|
||||||
|
r.mtx.RUnlock()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
checkedMetricChan = make(chan Metric, capMetricChan)
|
checkedMetricChan = make(chan Metric, capMetricChan)
|
||||||
uncheckedMetricChan = make(chan Metric, capMetricChan)
|
uncheckedMetricChan = make(chan Metric, capMetricChan)
|
||||||
|
@ -416,7 +424,6 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
|
||||||
registeredDescIDs map[uint64]struct{} // Only used for pedantic checks
|
registeredDescIDs map[uint64]struct{} // Only used for pedantic checks
|
||||||
)
|
)
|
||||||
|
|
||||||
r.mtx.RLock()
|
|
||||||
goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors)
|
goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors)
|
||||||
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
|
metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName))
|
||||||
checkedCollectors := make(chan Collector, len(r.collectorsByID))
|
checkedCollectors := make(chan Collector, len(r.collectorsByID))
|
||||||
|
@ -884,11 +891,11 @@ func checkMetricConsistency(
|
||||||
h.Write(separatorByteSlice)
|
h.Write(separatorByteSlice)
|
||||||
// Make sure label pairs are sorted. We depend on it for the consistency
|
// Make sure label pairs are sorted. We depend on it for the consistency
|
||||||
// check.
|
// check.
|
||||||
if !sort.IsSorted(labelPairSorter(dtoMetric.Label)) {
|
if !sort.IsSorted(internal.LabelPairSorter(dtoMetric.Label)) {
|
||||||
// We cannot sort dtoMetric.Label in place as it is immutable by contract.
|
// We cannot sort dtoMetric.Label in place as it is immutable by contract.
|
||||||
copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label))
|
copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label))
|
||||||
copy(copiedLabels, dtoMetric.Label)
|
copy(copiedLabels, dtoMetric.Label)
|
||||||
sort.Sort(labelPairSorter(copiedLabels))
|
sort.Sort(internal.LabelPairSorter(copiedLabels))
|
||||||
dtoMetric.Label = copiedLabels
|
dtoMetric.Label = copiedLabels
|
||||||
}
|
}
|
||||||
for _, lp := range dtoMetric.Label {
|
for _, lp := range dtoMetric.Label {
|
||||||
|
@ -935,7 +942,7 @@ func checkDescConsistency(
|
||||||
metricFamily.GetName(), dtoMetric, desc,
|
metricFamily.GetName(), dtoMetric, desc,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
sort.Sort(labelPairSorter(lpsFromDesc))
|
sort.Sort(internal.LabelPairSorter(lpsFromDesc))
|
||||||
for i, lpFromDesc := range lpsFromDesc {
|
for i, lpFromDesc := range lpsFromDesc {
|
||||||
lpFromMetric := dtoMetric.Label[i]
|
lpFromMetric := dtoMetric.Label[i]
|
||||||
if lpFromDesc.GetName() != lpFromMetric.GetName() ||
|
if lpFromDesc.GetName() != lpFromMetric.GetName() ||
|
||||||
|
@ -948,3 +955,89 @@ func checkDescConsistency(
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ TransactionalGatherer = &MultiTRegistry{}
|
||||||
|
|
||||||
|
// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple
|
||||||
|
// transactional gatherers.
|
||||||
|
//
|
||||||
|
// It is caller responsibility to ensure two registries have mutually exclusive metric families,
|
||||||
|
// no deduplication will happen.
|
||||||
|
type MultiTRegistry struct {
|
||||||
|
tGatherers []TransactionalGatherer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMultiTRegistry creates MultiTRegistry.
|
||||||
|
func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry {
|
||||||
|
return &MultiTRegistry{
|
||||||
|
tGatherers: tGatherers,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather implements TransactionalGatherer interface.
|
||||||
|
func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) {
|
||||||
|
errs := MultiError{}
|
||||||
|
|
||||||
|
dFns := make([]func(), 0, len(r.tGatherers))
|
||||||
|
// TODO(bwplotka): Implement concurrency for those?
|
||||||
|
for _, g := range r.tGatherers {
|
||||||
|
// TODO(bwplotka): Check for duplicates?
|
||||||
|
m, d, err := g.Gather()
|
||||||
|
errs.Append(err)
|
||||||
|
|
||||||
|
mfs = append(mfs, m...)
|
||||||
|
dFns = append(dFns, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already.
|
||||||
|
sort.Slice(mfs, func(i, j int) bool {
|
||||||
|
return *mfs[i].Name < *mfs[j].Name
|
||||||
|
})
|
||||||
|
return mfs, func() {
|
||||||
|
for _, d := range dFns {
|
||||||
|
d()
|
||||||
|
}
|
||||||
|
}, errs.MaybeUnwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
|
||||||
|
// used by metric family is no longer used by a caller. This allows implementations with cache.
|
||||||
|
type TransactionalGatherer interface {
|
||||||
|
// Gather returns metrics in a lexicographically sorted slice
|
||||||
|
// of uniquely named MetricFamily protobufs. Gather ensures that the
|
||||||
|
// returned slice is valid and self-consistent so that it can be used
|
||||||
|
// for valid exposition. As an exception to the strict consistency
|
||||||
|
// requirements described for metric.Desc, Gather will tolerate
|
||||||
|
// different sets of label names for metrics of the same metric family.
|
||||||
|
//
|
||||||
|
// Even if an error occurs, Gather attempts to gather as many metrics as
|
||||||
|
// possible. Hence, if a non-nil error is returned, the returned
|
||||||
|
// MetricFamily slice could be nil (in case of a fatal error that
|
||||||
|
// prevented any meaningful metric collection) or contain a number of
|
||||||
|
// MetricFamily protobufs, some of which might be incomplete, and some
|
||||||
|
// might be missing altogether. The returned error (which might be a
|
||||||
|
// MultiError) explains the details. Note that this is mostly useful for
|
||||||
|
// debugging purposes. If the gathered protobufs are to be used for
|
||||||
|
// exposition in actual monitoring, it is almost always better to not
|
||||||
|
// expose an incomplete result and instead disregard the returned
|
||||||
|
// MetricFamily protobufs in case the returned error is non-nil.
|
||||||
|
//
|
||||||
|
// Important: done is expected to be triggered (even if the error occurs!)
|
||||||
|
// once caller does not need returned slice of dto.MetricFamily.
|
||||||
|
Gather() (_ []*dto.MetricFamily, done func(), err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
|
||||||
|
func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
|
||||||
|
return &noTransactionGatherer{g: g}
|
||||||
|
}
|
||||||
|
|
||||||
|
type noTransactionGatherer struct {
|
||||||
|
g Gatherer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather implements TransactionalGatherer interface.
|
||||||
|
func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
|
||||||
|
mfs, err := g.g.Gather()
|
||||||
|
return mfs, func() {}, err
|
||||||
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package prometheus_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -1175,3 +1176,82 @@ func TestAlreadyRegisteredCollision(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type tGatherer struct {
|
||||||
|
done bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
|
||||||
|
name := "g1"
|
||||||
|
val := 1.0
|
||||||
|
return []*dto.MetricFamily{
|
||||||
|
{Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}},
|
||||||
|
}, func() { g.done = true }, g.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewMultiTRegistry(t *testing.T) {
|
||||||
|
treg := &tGatherer{}
|
||||||
|
|
||||||
|
t.Run("one registry", func(t *testing.T) {
|
||||||
|
m := prometheus.NewMultiTRegistry(treg)
|
||||||
|
ret, done, err := m.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
if len(ret) != 1 {
|
||||||
|
t.Error("unexpected number of metric families, expected 1, got", ret)
|
||||||
|
}
|
||||||
|
if !treg.done {
|
||||||
|
t.Error("inner transactional registry not marked as done")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
reg := prometheus.NewRegistry()
|
||||||
|
if err := reg.Register(prometheus.NewCounter(prometheus.CounterOpts{Name: "c1", Help: "help c1"})); err != nil {
|
||||||
|
t.Error("registration failed:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note on purpose two registries will have exactly same metric family name (but with different string).
|
||||||
|
// This behaviour is undefined at the moment.
|
||||||
|
if err := reg.Register(prometheus.NewGauge(prometheus.GaugeOpts{Name: "g1", Help: "help g1"})); err != nil {
|
||||||
|
t.Error("registration failed:", err)
|
||||||
|
}
|
||||||
|
treg.done = false
|
||||||
|
|
||||||
|
t.Run("two registries", func(t *testing.T) {
|
||||||
|
m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg)
|
||||||
|
ret, done, err := m.Gather()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("gather failed:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
if len(ret) != 3 {
|
||||||
|
t.Error("unexpected number of metric families, expected 3, got", ret)
|
||||||
|
}
|
||||||
|
if !treg.done {
|
||||||
|
t.Error("inner transactional registry not marked as done")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
treg.done = false
|
||||||
|
// Inject error.
|
||||||
|
treg.err = errors.New("test err")
|
||||||
|
|
||||||
|
t.Run("two registries, one with error", func(t *testing.T) {
|
||||||
|
m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg)
|
||||||
|
ret, done, err := m.Gather()
|
||||||
|
if err != treg.err {
|
||||||
|
t.Error("unexpected error:", err)
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
if len(ret) != 3 {
|
||||||
|
t.Error("unexpected number of metric families, expected 3, got", ret)
|
||||||
|
}
|
||||||
|
// Still on error, we expect done to be triggered.
|
||||||
|
if !treg.done {
|
||||||
|
t.Error("inner transactional registry not marked as done")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -167,7 +167,16 @@ func CollectAndCompare(c prometheus.Collector, expected io.Reader, metricNames .
|
||||||
// exposition format. If any metricNames are provided, only metrics with those
|
// exposition format. If any metricNames are provided, only metrics with those
|
||||||
// names are compared.
|
// names are compared.
|
||||||
func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error {
|
func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error {
|
||||||
got, err := g.Gather()
|
return TransactionalGatherAndCompare(prometheus.ToTransactionalGatherer(g), expected, metricNames...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionalGatherAndCompare gathers all metrics from the provided Gatherer and compares
|
||||||
|
// it to an expected output read from the provided Reader in the Prometheus text
|
||||||
|
// exposition format. If any metricNames are provided, only metrics with those
|
||||||
|
// names are compared.
|
||||||
|
func TransactionalGatherAndCompare(g prometheus.TransactionalGatherer, expected io.Reader, metricNames ...string) error {
|
||||||
|
got, done, err := g.Gather()
|
||||||
|
defer done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("gathering metrics failed: %s", err)
|
return fmt.Errorf("gathering metrics failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/internal"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
@ -38,6 +39,23 @@ const (
|
||||||
UntypedValue
|
UntypedValue
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
CounterMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_COUNTER; return &d }()
|
||||||
|
GaugeMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_GAUGE; return &d }()
|
||||||
|
UntypedMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_UNTYPED; return &d }()
|
||||||
|
)
|
||||||
|
|
||||||
|
func (v ValueType) ToDTO() *dto.MetricType {
|
||||||
|
switch v {
|
||||||
|
case CounterValue:
|
||||||
|
return CounterMetricTypePtr
|
||||||
|
case GaugeValue:
|
||||||
|
return GaugeMetricTypePtr
|
||||||
|
default:
|
||||||
|
return UntypedMetricTypePtr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// valueFunc is a generic metric for simple values retrieved on collect time
|
// valueFunc is a generic metric for simple values retrieved on collect time
|
||||||
// from a function. It implements Metric and Collector. Its effective type is
|
// from a function. It implements Metric and Collector. Its effective type is
|
||||||
// determined by ValueType. This is a low-level building block used by the
|
// determined by ValueType. This is a low-level building block used by the
|
||||||
|
@ -91,11 +109,15 @@ func NewConstMetric(desc *Desc, valueType ValueType, value float64, labelValues
|
||||||
if err := validateLabelValues(labelValues, len(desc.variableLabels)); err != nil {
|
if err := validateLabelValues(labelValues, len(desc.variableLabels)); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metric := &dto.Metric{}
|
||||||
|
if err := populateMetric(valueType, value, MakeLabelPairs(desc, labelValues), nil, metric); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &constMetric{
|
return &constMetric{
|
||||||
desc: desc,
|
desc: desc,
|
||||||
valType: valueType,
|
metric: metric,
|
||||||
val: value,
|
|
||||||
labelPairs: MakeLabelPairs(desc, labelValues),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,9 +133,7 @@ func MustNewConstMetric(desc *Desc, valueType ValueType, value float64, labelVal
|
||||||
|
|
||||||
type constMetric struct {
|
type constMetric struct {
|
||||||
desc *Desc
|
desc *Desc
|
||||||
valType ValueType
|
metric *dto.Metric
|
||||||
val float64
|
|
||||||
labelPairs []*dto.LabelPair
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *constMetric) Desc() *Desc {
|
func (m *constMetric) Desc() *Desc {
|
||||||
|
@ -121,7 +141,11 @@ func (m *constMetric) Desc() *Desc {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *constMetric) Write(out *dto.Metric) error {
|
func (m *constMetric) Write(out *dto.Metric) error {
|
||||||
return populateMetric(m.valType, m.val, m.labelPairs, nil, out)
|
out.Label = m.metric.Label
|
||||||
|
out.Counter = m.metric.Counter
|
||||||
|
out.Gauge = m.metric.Gauge
|
||||||
|
out.Untyped = m.metric.Untyped
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func populateMetric(
|
func populateMetric(
|
||||||
|
@ -170,7 +194,7 @@ func MakeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
labelPairs = append(labelPairs, desc.constLabelPairs...)
|
labelPairs = append(labelPairs, desc.constLabelPairs...)
|
||||||
sort.Sort(labelPairSorter(labelPairs))
|
sort.Sort(internal.LabelPairSorter(labelPairs))
|
||||||
return labelPairs
|
return labelPairs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus/internal"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -182,7 +183,7 @@ func (m *wrappingMetric) Write(out *dto.Metric) error {
|
||||||
Value: proto.String(lv),
|
Value: proto.String(lv),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
sort.Sort(labelPairSorter(out.Label))
|
sort.Sort(internal.LabelPairSorter(out.Label))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue