Compare commits

...

3 Commits

Author SHA1 Message Date
Owen Williams 6068cf022d
Merge 6514f6eb91 into 0c73c1c554 2024-11-05 10:33:52 -05:00
Owen Williams 6514f6eb91 move func to gatherer
Signed-off-by: Owen Williams <owen.williams@grafana.com>
2024-10-17 13:55:29 -04:00
Owen Williams 42825b62f4 Return an http error during scraping if metrics collide when escaped to underscores
Signed-off-by: Owen Williams <owen.williams@grafana.com>
2024-10-17 11:20:00 -04:00
5 changed files with 366 additions and 15 deletions

View File

@ -57,6 +57,9 @@ type Desc struct {
// must be unique among all registered descriptors and can therefore be // must be unique among all registered descriptors and can therefore be
// used as an identifier of the descriptor. // used as an identifier of the descriptor.
id uint64 id uint64
// escapedID is similar to id, but with the metric and label names escaped
// with underscores.
escapedID uint64
// dimHash is a hash of the label names (preset and variable) and the // dimHash is a hash of the label names (preset and variable) and the
// Help string. Each Desc with the same fqName must have the same // Help string. Each Desc with the same fqName must have the same
// dimHash. // dimHash.
@ -142,11 +145,18 @@ func (v2) NewDesc(fqName, help string, variableLabels ConstrainableLabels, const
} }
xxh := xxhash.New() xxh := xxhash.New()
for _, val := range labelValues { escapedXXH := xxhash.New()
for i, val := range labelValues {
xxh.WriteString(val) xxh.WriteString(val)
xxh.Write(separatorByteSlice) xxh.Write(separatorByteSlice)
if i == 0 {
val = model.EscapeName(val, model.UnderscoreEscaping)
}
escapedXXH.WriteString(val)
escapedXXH.Write(separatorByteSlice)
} }
d.id = xxh.Sum64() d.id = xxh.Sum64()
d.escapedID = escapedXXH.Sum64()
// Sort labelNames so that order doesn't matter for the hash. // Sort labelNames so that order doesn't matter for the hash.
sort.Strings(labelNames) sort.Strings(labelNames)
// Now hash together (in this order) the help string and the sorted // Now hash together (in this order) the help string and the sorted

View File

@ -43,6 +43,7 @@ import (
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil" "github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -121,6 +122,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
if opts.MaxRequestsInFlight > 0 { if opts.MaxRequestsInFlight > 0 {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight) inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
} }
var hasEscapedCollisions bool
if opts.Registry != nil { if opts.Registry != nil {
// Initialize all possibilities that can occur below. // Initialize all possibilities that can occur below.
errCnt.WithLabelValues("gathering") errCnt.WithLabelValues("gathering")
@ -134,6 +136,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
} }
} }
} }
hasEscapedCollisions = reg.HasEscapedCollision()
// Select compression formats to offer based on default or user choice. // Select compression formats to offer based on default or user choice.
var compressions []string var compressions []string
@ -190,6 +193,19 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO
} else { } else {
contentType = expfmt.Negotiate(req.Header) contentType = expfmt.Negotiate(req.Header)
} }
if hasEscapedCollisions {
switch contentType.ToEscapingScheme() {
case model.UnderscoreEscaping, model.DotsEscaping:
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error: one or more metrics collide when escaped")
}
httpError(rsp, fmt.Errorf("one or more metrics collide when escaped"))
return
default:
}
}
rsp.Header().Set(contentTypeHeader, string(contentType)) rsp.Header().Set(contentTypeHeader, string(contentType))
w, encodingHeader, closeWriter, err := negotiateEncodingWriter(req, rsp, compressions) w, encodingHeader, closeWriter, err := negotiateEncodingWriter(req, rsp, compressions)

View File

@ -28,6 +28,8 @@ import (
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -79,6 +81,10 @@ func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(),
return mfs, func() { g.doneInvoked++ }, err return mfs, func() { g.doneInvoked++ }, err
} }
func (g *mockTransactionGatherer) HasEscapedCollision() bool {
return g.g.HasEscapedCollision()
}
func readCompressedBody(r io.Reader, comp Compression) (string, error) { func readCompressedBody(r io.Reader, comp Compression) (string, error) {
switch comp { switch comp {
case Gzip: case Gzip:
@ -548,6 +554,50 @@ func TestNegotiateEncodingWriter(t *testing.T) {
} }
} }
func TestEscapedCollisions(t *testing.T) {
oldScheme := model.NameValidationScheme
defer func() {
model.NameValidationScheme = oldScheme
}()
model.NameValidationScheme = model.UTF8Validation
reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
Help: "A test metric with underscores",
}))
reg.MustRegister(prometheus.NewCounter(prometheus.CounterOpts{
Name: "test.metric",
Help: "A test metric with dots",
}))
handler := HandlerFor(reg, HandlerOpts{})
t.Run("fail case", func(t *testing.T) {
writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/metrics", nil)
request.Header.Add(acceptHeader, string(expfmt.NewFormat(expfmt.TypeTextPlain)))
handler.ServeHTTP(writer, request)
if writer.Code != 500 {
t.Errorf("wanted error 500, got %d", writer.Code)
}
expectErr := "An error has occurred while serving metrics:\n\none or more metrics collide when escaped\n"
if writer.Body.String() != expectErr {
t.Error("incorrect body returned, want " + expectErr + " got " + writer.Body.String())
}
})
t.Run("success case", func(t *testing.T) {
writer := httptest.NewRecorder()
request, _ := http.NewRequest("GET", "/metrics", nil)
request.Header.Add(acceptHeader, string(expfmt.NewFormat(expfmt.TypeTextPlain).WithEscapingScheme(model.NoEscaping)))
handler.ServeHTTP(writer, request)
if writer.Code != 200 {
t.Errorf("wanted 200 OK, got %d", writer.Code)
}
})
}
func BenchmarkCompression(b *testing.B) { func BenchmarkCompression(b *testing.B) {
benchmarks := []struct { benchmarks := []struct {
name string name string

View File

@ -67,11 +67,17 @@ func init() {
func NewRegistry() *Registry { func NewRegistry() *Registry {
return &Registry{ return &Registry{
collectorsByID: map[uint64]Collector{}, collectorsByID: map[uint64]Collector{},
collectorsByEscapedID: map[uint64]Collector{},
descIDs: map[uint64]struct{}{}, descIDs: map[uint64]struct{}{},
escapedDescIDs: map[uint64]struct{}{},
dimHashesByName: map[string]uint64{}, dimHashesByName: map[string]uint64{},
} }
} }
func (r *Registry) HasEscapedCollision() bool {
return r.hasEscapedCollision
}
// NewPedanticRegistry returns a registry that checks during collection if each // NewPedanticRegistry returns a registry that checks during collection if each
// collected Metric is consistent with its reported Desc, and if the Desc has // collected Metric is consistent with its reported Desc, and if the Desc has
// actually been registered with the registry. Unchecked Collectors (those whose // actually been registered with the registry. Unchecked Collectors (those whose
@ -158,6 +164,11 @@ type Gatherer interface {
// expose an incomplete result and instead disregard the returned // expose an incomplete result and instead disregard the returned
// MetricFamily protobufs in case the returned error is non-nil. // MetricFamily protobufs in case the returned error is non-nil.
Gather() ([]*dto.MetricFamily, error) Gather() ([]*dto.MetricFamily, error)
// HasEscapedCollision returns true if any two of the registered metrics would
// be the same when escaped to underscores. This is needed to prevent
// duplicate metric issues when being scraped by a legacy system.
HasEscapedCollision() bool
} }
// Register registers the provided Collector with the DefaultRegisterer. // Register registers the provided Collector with the DefaultRegisterer.
@ -194,6 +205,10 @@ func (gf GathererFunc) Gather() ([]*dto.MetricFamily, error) {
return gf() return gf()
} }
func (gf GathererFunc) HasEscapedCollision() bool {
return false
}
// AlreadyRegisteredError is returned by the Register method if the Collector to // AlreadyRegisteredError is returned by the Register method if the Collector to
// be registered has already been registered before, or a different Collector // be registered has already been registered before, or a different Collector
// that collects the same metrics has been registered before. Registration fails // that collects the same metrics has been registered before. Registration fails
@ -260,10 +275,21 @@ func (errs MultiError) MaybeUnwrap() error {
type Registry struct { type Registry struct {
mtx sync.RWMutex mtx sync.RWMutex
collectorsByID map[uint64]Collector // ID is a hash of the descIDs. collectorsByID map[uint64]Collector // ID is a hash of the descIDs.
// collectorsByEscapedID stores colletors by escapedID, only if escaped id is
// different (otherwise we can just do the lookup in the regular map).
collectorsByEscapedID map[uint64]Collector
descIDs map[uint64]struct{} descIDs map[uint64]struct{}
// escapedDescIDs records desc ids of the escaped version of the metric, only
// if different from the regular name.
escapedDescIDs map[uint64]struct{}
dimHashesByName map[string]uint64 dimHashesByName map[string]uint64
uncheckedCollectors []Collector uncheckedCollectors []Collector
pedanticChecksEnabled bool pedanticChecksEnabled bool
// hasEscapedCollision is set to true if any two metrics that were not
// identical under UTF-8 would collide if scraped by a system that requires
// names to be escaped to legacy underscore replacement.
hasEscapedCollision bool
} }
// Register implements Registerer. // Register implements Registerer.
@ -271,9 +297,12 @@ func (r *Registry) Register(c Collector) error {
var ( var (
descChan = make(chan *Desc, capDescChan) descChan = make(chan *Desc, capDescChan)
newDescIDs = map[uint64]struct{}{} newDescIDs = map[uint64]struct{}{}
newEscapedIDs = map[uint64]struct{}{}
newDimHashesByName = map[string]uint64{} newDimHashesByName = map[string]uint64{}
collectorID uint64 // All desc IDs XOR'd together. collectorID uint64 // All desc IDs XOR'd together.
escapedID uint64
duplicateDescErr error duplicateDescErr error
duplicateEscapedDesc bool
) )
go func() { go func() {
c.Describe(descChan) c.Describe(descChan)
@ -307,6 +336,22 @@ func (r *Registry) Register(c Collector) error {
collectorID ^= desc.id collectorID ^= desc.id
} }
// Also check to see if the descID is unique when all the names are escaped
// to underscores. First check the primary map, then check the secondary
// map. We only officially log a collision later.
if _, exists := r.descIDs[desc.escapedID]; exists {
duplicateEscapedDesc = true
}
if _, exists := r.escapedDescIDs[desc.escapedID]; exists {
duplicateEscapedDesc = true
}
if _, exists := newEscapedIDs[desc.escapedID]; !exists {
if desc.escapedID != desc.id {
newEscapedIDs[desc.escapedID] = struct{}{}
}
escapedID ^= desc.escapedID
}
// Are all the label names and the help string consistent with // Are all the label names and the help string consistent with
// previous descriptors of the same name? // previous descriptors of the same name?
// First check existing descriptors... // First check existing descriptors...
@ -331,7 +376,17 @@ func (r *Registry) Register(c Collector) error {
r.uncheckedCollectors = append(r.uncheckedCollectors, c) r.uncheckedCollectors = append(r.uncheckedCollectors, c)
return nil return nil
} }
if existing, exists := r.collectorsByID[collectorID]; exists {
existing, collision := r.collectorsByID[collectorID]
// Also check whether the underscore-escaped versions of the IDs match.
if !collision {
_, escapedCollision := r.collectorsByID[escapedID]
r.hasEscapedCollision = r.hasEscapedCollision || escapedCollision
_, escapedCollision = r.collectorsByEscapedID[escapedID]
r.hasEscapedCollision = r.hasEscapedCollision || escapedCollision
}
if collision {
switch e := existing.(type) { switch e := existing.(type) {
case *wrappingCollector: case *wrappingCollector:
return AlreadyRegisteredError{ return AlreadyRegisteredError{
@ -351,14 +406,25 @@ func (r *Registry) Register(c Collector) error {
return duplicateDescErr return duplicateDescErr
} }
if duplicateEscapedDesc {
r.hasEscapedCollision = true
}
// Only after all tests have passed, actually register. // Only after all tests have passed, actually register.
r.collectorsByID[collectorID] = c r.collectorsByID[collectorID] = c
// We only need to store the escapedID if it doesn't match the unescaped one.
if escapedID != collectorID {
r.collectorsByEscapedID[escapedID] = c
}
for hash := range newDescIDs { for hash := range newDescIDs {
r.descIDs[hash] = struct{}{} r.descIDs[hash] = struct{}{}
} }
for name, dimHash := range newDimHashesByName { for name, dimHash := range newDimHashesByName {
r.dimHashesByName[name] = dimHash r.dimHashesByName[name] = dimHash
} }
for hash := range newEscapedIDs {
r.escapedDescIDs[hash] = struct{}{}
}
return nil return nil
} }
@ -367,7 +433,9 @@ func (r *Registry) Unregister(c Collector) bool {
var ( var (
descChan = make(chan *Desc, capDescChan) descChan = make(chan *Desc, capDescChan)
descIDs = map[uint64]struct{}{} descIDs = map[uint64]struct{}{}
escapedDescIDs = map[uint64]struct{}{}
collectorID uint64 // All desc IDs XOR'd together. collectorID uint64 // All desc IDs XOR'd together.
collectorEscapedID uint64
) )
go func() { go func() {
c.Describe(descChan) c.Describe(descChan)
@ -377,6 +445,8 @@ func (r *Registry) Unregister(c Collector) bool {
if _, exists := descIDs[desc.id]; !exists { if _, exists := descIDs[desc.id]; !exists {
collectorID ^= desc.id collectorID ^= desc.id
descIDs[desc.id] = struct{}{} descIDs[desc.id] = struct{}{}
collectorEscapedID ^= desc.escapedID
escapedDescIDs[desc.escapedID] = struct{}{}
} }
} }
@ -391,9 +461,13 @@ func (r *Registry) Unregister(c Collector) bool {
defer r.mtx.Unlock() defer r.mtx.Unlock()
delete(r.collectorsByID, collectorID) delete(r.collectorsByID, collectorID)
delete(r.collectorsByEscapedID, collectorEscapedID)
for id := range descIDs { for id := range descIDs {
delete(r.descIDs, id) delete(r.descIDs, id)
} }
for id := range escapedDescIDs {
delete(r.escapedDescIDs, id)
}
// dimHashesByName is left untouched as those must be consistent // dimHashesByName is left untouched as those must be consistent
// throughout the lifetime of a program. // throughout the lifetime of a program.
return true return true
@ -802,6 +876,15 @@ func (gs Gatherers) Gather() ([]*dto.MetricFamily, error) {
return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap() return internal.NormalizeMetricFamilies(metricFamiliesByName), errs.MaybeUnwrap()
} }
func (gs Gatherers) HasEscapedCollision() bool {
for _, g := range gs {
if g.HasEscapedCollision() {
return true
}
}
return false
}
// checkSuffixCollisions checks for collisions with the “magic” suffixes the // checkSuffixCollisions checks for collisions with the “magic” suffixes the
// Prometheus text format and the internal metric representation of the // Prometheus text format and the internal metric representation of the
// Prometheus server add while flattening Summaries and Histograms. // Prometheus server add while flattening Summaries and Histograms.
@ -1033,6 +1116,15 @@ func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err err
}, errs.MaybeUnwrap() }, errs.MaybeUnwrap()
} }
func (r *MultiTRegistry) HasEscapedCollision() bool {
for _, g := range r.tGatherers {
if g.HasEscapedCollision() {
return true
}
}
return false
}
// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory // TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
// used by metric family is no longer used by a caller. This allows implementations with cache. // used by metric family is no longer used by a caller. This allows implementations with cache.
type TransactionalGatherer interface { type TransactionalGatherer interface {
@ -1058,6 +1150,11 @@ type TransactionalGatherer interface {
// Important: done is expected to be triggered (even if the error occurs!) // Important: done is expected to be triggered (even if the error occurs!)
// once caller does not need returned slice of dto.MetricFamily. // once caller does not need returned slice of dto.MetricFamily.
Gather() (_ []*dto.MetricFamily, done func(), err error) Gather() (_ []*dto.MetricFamily, done func(), err error)
// HasEscapedCollision returns true if any two of the registered metrics would
// be the same when escaped to underscores. This is needed to prevent
// duplicate metric issues when being scraped by a legacy system.
HasEscapedCollision() bool
} }
// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function. // ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
@ -1074,3 +1171,7 @@ func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), er
mfs, err := g.g.Gather() mfs, err := g.g.Gather()
return mfs, func() {}, err return mfs, func() {}, err
} }
func (g *noTransactionGatherer) HasEscapedCollision() bool {
return g.g.HasEscapedCollision()
}

View File

@ -36,6 +36,7 @@ import (
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
) )
@ -1181,6 +1182,175 @@ func TestAlreadyRegisteredCollision(t *testing.T) {
} }
} }
func TestAlreadyRegisteredEscapingCollision(t *testing.T) {
oldValidation := model.NameValidationScheme
model.NameValidationScheme = model.UTF8Validation
defer func() {
model.NameValidationScheme = oldValidation
}()
tests := []struct {
name string
// These are functions because hashes that determine collision are created
// at metric creation time.
counterA func() prometheus.Counter
counterB func() prometheus.Counter
expectErr bool
expectLegacyCollision bool
}{
{
name: "no metric name collision",
counterA: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
counterB: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "myAcounterAa",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
},
{
name: "compatibility metric name collision",
counterA: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
counterB: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my.counter.a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
expectLegacyCollision: true,
},
{
// This is a regression test to make sure we are not accidentally
// reporting collisions when label values are different.
name: "no label value collision",
counterA: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"name": "label.value",
"type": "test",
},
})
},
counterB: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"name": "label_value",
"type": "test",
},
})
},
},
{
name: "compatibility label name collision",
counterA: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"label.name": "name",
"type": "test",
},
})
},
counterB: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"label_name": "name",
"type": "test",
},
})
},
expectErr: true,
expectLegacyCollision: false,
},
{
name: "no utf8 metric name collision",
counterA: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my_counter_a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
counterB: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my.counter.a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
expectLegacyCollision: true,
},
{
name: "post init flag flip, should collide",
counterA: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my.counter.a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
counterB: func() prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: "my.counter.a",
ConstLabels: prometheus.Labels{
"name": "label",
"type": "test",
},
})
},
expectErr: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
reg := prometheus.NewRegistry()
err := reg.Register(tc.counterA())
if err != nil {
t.Errorf("required no error, got %v", err)
}
err = reg.Register(tc.counterB())
if tc.expectErr != (err != nil) {
t.Errorf("required error state %v, got %v", tc.expectErr, err)
}
if tc.expectLegacyCollision != reg.HasEscapedCollision() {
t.Errorf("legacy collision mismatch, want %v got %v", tc.expectLegacyCollision, reg.HasEscapedCollision())
}
})
}
}
type tGatherer struct { type tGatherer struct {
done bool done bool
err error err error
@ -1194,6 +1364,10 @@ func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
}, func() { g.done = true }, g.err }, func() { g.done = true }, g.err
} }
func (g *tGatherer) HasEscapedCollision() bool {
return false
}
func TestNewMultiTRegistry(t *testing.T) { func TestNewMultiTRegistry(t *testing.T) {
treg := &tGatherer{} treg := &tGatherer{}