Allow the metric family injection hook to merge with existing metric families.

If a metric family returned by the injection hook already exists (with
the same name), then its metrics are simply merged into that metric
family. With enabled collect-time checks, even uniqueness is checked,
but in general, things stay the same that the caller is responsible to
ensure metric consistency.

This fixes https://github.com/prometheus/pushgateway/issues/27 .
This commit is contained in:
beorn7 2015-03-15 15:47:56 +01:00
parent f5ccf204b7
commit a762e0612e
2 changed files with 159 additions and 73 deletions

View File

@ -158,14 +158,19 @@ func Unregister(c Collector) bool {
// SetMetricFamilyInjectionHook sets a function that is called whenever metrics // SetMetricFamilyInjectionHook sets a function that is called whenever metrics
// are collected. The hook function must be set before metrics collection begins // are collected. The hook function must be set before metrics collection begins
// (i.e. call SetMetricFamilyInjectionHook before setting the HTTP handler.) The // (i.e. call SetMetricFamilyInjectionHook before setting the HTTP handler.) The
// MetricFamily protobufs returned by the hook function are added to the // MetricFamily protobufs returned by the hook function are merged with the
// delivered metrics. Each returned MetricFamily must have a unique name (also // metrics collected in the usual way.
// taking into account the MetricFamilies created in the regular way).
// //
// This is a way to directly inject MetricFamily protobufs managed and owned by // This is a way to directly inject MetricFamily protobufs managed and owned by
// the caller. The caller has full responsibility. No sanity checks are // the caller. The caller has full responsibility. As no registration of the
// performed on the returned protobufs (besides the name checks described // injected metrics has happened, there is no descriptor to check against, and
// above). The function must be callable at any time and concurrently. // there are no registration-time checks. If collect-time checks are disabled
// (see function EnableCollectChecks), no sanity checks are performed on the
// returned protobufs at all. If collect-checks are enabled, type and uniqueness
// checks are performed, but no further consistency checks (which would require
// knowledge of a metric descriptor).
//
// The function must be callable at any time and concurrently.
func SetMetricFamilyInjectionHook(hook func() []*dto.MetricFamily) { func SetMetricFamilyInjectionHook(hook func() []*dto.MetricFamily) {
defRegistry.metricFamilyInjectionHook = hook defRegistry.metricFamilyInjectionHook = hook
} }
@ -479,10 +484,26 @@ func (r *registry) writePB(w io.Writer, writeEncoded encoder) (int, error) {
if r.metricFamilyInjectionHook != nil { if r.metricFamilyInjectionHook != nil {
for _, mf := range r.metricFamilyInjectionHook() { for _, mf := range r.metricFamilyInjectionHook() {
if _, exists := metricFamiliesByName[mf.GetName()]; exists { existingMF, exists := metricFamiliesByName[mf.GetName()]
return 0, fmt.Errorf("metric family with duplicate name injected: %s", mf) if !exists {
metricFamiliesByName[mf.GetName()] = mf
if r.collectChecksEnabled {
for _, m := range mf.Metric {
if err := r.checkConsistency(mf, m, nil, metricHashes); err != nil {
return 0, err
}
}
}
continue
}
for _, m := range mf.Metric {
if r.collectChecksEnabled {
if err := r.checkConsistency(existingMF, m, nil, metricHashes); err != nil {
return 0, err
}
}
existingMF.Metric = append(existingMF.Metric, m)
} }
metricFamiliesByName[mf.GetName()] = mf
} }
} }
@ -523,11 +544,42 @@ func (r *registry) checkConsistency(metricFamily *dto.MetricFamily, dtoMetric *d
) )
} }
// Is the metric unique (i.e. no other metric with the same name and the same label values)?
h := fnv.New64a()
var buf bytes.Buffer
buf.WriteString(metricFamily.GetName())
buf.WriteByte(model.SeparatorByte)
h.Write(buf.Bytes())
for _, lp := range dtoMetric.Label {
buf.Reset()
buf.WriteString(lp.GetValue())
buf.WriteByte(model.SeparatorByte)
h.Write(buf.Bytes())
}
metricHash := h.Sum64()
if _, exists := metricHashes[metricHash]; exists {
return fmt.Errorf(
"collected metric %q was collected before with the same name and label values",
dtoMetric,
)
}
metricHashes[metricHash] = struct{}{}
if desc == nil {
return nil // Nothing left to check if we have no desc.
}
// Desc consistency with metric family. // Desc consistency with metric family.
if metricFamily.GetName() != desc.fqName {
return fmt.Errorf(
"collected metric %q has name %q but should have %q",
dtoMetric, metricFamily.GetName(), desc.fqName,
)
}
if metricFamily.GetHelp() != desc.help { if metricFamily.GetHelp() != desc.help {
return fmt.Errorf( return fmt.Errorf(
"collected metric %q has help %q but should have %q", "collected metric %q has help %q but should have %q",
dtoMetric, desc.help, metricFamily.GetHelp(), dtoMetric, metricFamily.GetHelp(), desc.help,
) )
} }
@ -557,27 +609,6 @@ func (r *registry) checkConsistency(metricFamily *dto.MetricFamily, dtoMetric *d
} }
} }
// Is the metric unique (i.e. no other metric with the same name and the same label values)?
h := fnv.New64a()
var buf bytes.Buffer
buf.WriteString(desc.fqName)
buf.WriteByte(model.SeparatorByte)
h.Write(buf.Bytes())
for _, lp := range dtoMetric.Label {
buf.Reset()
buf.WriteString(lp.GetValue())
buf.WriteByte(model.SeparatorByte)
h.Write(buf.Bytes())
}
metricHash := h.Sum64()
if _, exists := metricHashes[metricHash]; exists {
return fmt.Errorf(
"collected metric %q was collected before with the same name and label values",
dtoMetric,
)
}
metricHashes[metricHash] = struct{}{}
r.mtx.RLock() // Remaining checks need the read lock. r.mtx.RLock() // Remaining checks need the read lock.
defer r.mtx.RUnlock() defer r.mtx.RUnlock()
@ -712,6 +743,15 @@ func (s metricSorter) Swap(i, j int) {
} }
func (s metricSorter) Less(i, j int) bool { func (s metricSorter) Less(i, j int) bool {
if len(s[i].Label) != len(s[j].Label) {
// This should not happen. The metrics are
// inconsistent. However, we have to deal with the fact, as
// people might use custom collectors or metric family injection
// to create inconsistent metrics. So let's simply compare the
// number of labels in this case. That will still yield
// reproducible sorting.
return len(s[i].Label) < len(s[j].Label)
}
for n, lp := range s[i].Label { for n, lp := range s[i].Label {
vi := lp.GetValue() vi := lp.GetValue()
vj := s[j].Label[n].GetValue() vj := s[j].Label[n].GetValue()

View File

@ -61,31 +61,29 @@ func testHandler(t testing.TB) {
varintBuf := make([]byte, binary.MaxVarintLen32) varintBuf := make([]byte, binary.MaxVarintLen32)
externalMetricFamily := []*dto.MetricFamily{ externalMetricFamily := &dto.MetricFamily{
{ Name: proto.String("externalname"),
Name: proto.String("externalname"), Help: proto.String("externaldocstring"),
Help: proto.String("externaldocstring"), Type: dto.MetricType_COUNTER.Enum(),
Type: dto.MetricType_COUNTER.Enum(), Metric: []*dto.Metric{
Metric: []*dto.Metric{ {
{ Label: []*dto.LabelPair{
Label: []*dto.LabelPair{ {
{ Name: proto.String("externallabelname"),
Name: proto.String("externallabelname"), Value: proto.String("externalval1"),
Value: proto.String("externalval1"),
},
{
Name: proto.String("externalconstname"),
Value: proto.String("externalconstvalue"),
},
}, },
Counter: &dto.Counter{ {
Value: proto.Float64(1), Name: proto.String("externalconstname"),
Value: proto.String("externalconstvalue"),
}, },
}, },
Counter: &dto.Counter{
Value: proto.Float64(1),
},
}, },
}, },
} }
marshaledExternalMetricFamily, err := proto.Marshal(externalMetricFamily[0]) marshaledExternalMetricFamily, err := proto.Marshal(externalMetricFamily)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -216,16 +214,42 @@ metric: <
expectedMetricFamilyAsProtoCompactText := []byte(`name:"name" help:"docstring" type:COUNTER metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"val1" > counter:<value:1 > > metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"val2" > counter:<value:1 > > expectedMetricFamilyAsProtoCompactText := []byte(`name:"name" help:"docstring" type:COUNTER metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"val1" > counter:<value:1 > > metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"val2" > counter:<value:1 > >
`) `)
externalMetricFamilyWithSameName := &dto.MetricFamily{
Name: proto.String("name"),
Help: proto.String("inconsistent help string does not matter here"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{
Name: proto.String("constname"),
Value: proto.String("constvalue"),
},
{
Name: proto.String("labelname"),
Value: proto.String("different_val"),
},
},
Counter: &dto.Counter{
Value: proto.Float64(42),
},
},
},
}
expectedMetricFamilyMergedWithExternalAsProtoCompactText := []byte(`name:"name" help:"docstring" type:COUNTER metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"different_val" > counter:<value:42 > > metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"val1" > counter:<value:1 > > metric:<label:<name:"constname" value:"constvalue" > label:<name:"labelname" value:"val2" > counter:<value:1 > >
`)
type output struct { type output struct {
headers map[string]string headers map[string]string
body []byte body []byte
} }
var scenarios = []struct { var scenarios = []struct {
headers map[string]string headers map[string]string
out output out output
withCounter bool collector Collector
withExternalMF bool externalMF []*dto.MetricFamily
}{ }{
{ // 0 { // 0
headers: map[string]string{ headers: map[string]string{
@ -281,7 +305,7 @@ metric: <
}, },
body: expectedMetricFamilyAsText, body: expectedMetricFamilyAsText,
}, },
withCounter: true, collector: metricVec,
}, },
{ // 5 { // 5
headers: map[string]string{ headers: map[string]string{
@ -293,7 +317,7 @@ metric: <
}, },
body: expectedMetricFamilyAsBytes, body: expectedMetricFamilyAsBytes,
}, },
withCounter: true, collector: metricVec,
}, },
{ // 6 { // 6
headers: map[string]string{ headers: map[string]string{
@ -305,7 +329,7 @@ metric: <
}, },
body: externalMetricFamilyAsText, body: externalMetricFamilyAsText,
}, },
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
}, },
{ // 7 { // 7
headers: map[string]string{ headers: map[string]string{
@ -317,7 +341,7 @@ metric: <
}, },
body: externalMetricFamilyAsBytes, body: externalMetricFamilyAsBytes,
}, },
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
}, },
{ // 8 { // 8
headers: map[string]string{ headers: map[string]string{
@ -335,8 +359,8 @@ metric: <
[]byte{}, []byte{},
), ),
}, },
withCounter: true, collector: metricVec,
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
}, },
{ // 9 { // 9
headers: map[string]string{ headers: map[string]string{
@ -359,7 +383,7 @@ metric: <
}, },
body: expectedMetricFamilyAsText, body: expectedMetricFamilyAsText,
}, },
withCounter: true, collector: metricVec,
}, },
{ // 11 { // 11
headers: map[string]string{ headers: map[string]string{
@ -377,8 +401,8 @@ metric: <
[]byte{}, []byte{},
), ),
}, },
withCounter: true, collector: metricVec,
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
}, },
{ // 12 { // 12
headers: map[string]string{ headers: map[string]string{
@ -396,8 +420,8 @@ metric: <
[]byte{}, []byte{},
), ),
}, },
withCounter: true, collector: metricVec,
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
}, },
{ // 13 { // 13
headers: map[string]string{ headers: map[string]string{
@ -415,8 +439,8 @@ metric: <
[]byte{}, []byte{},
), ),
}, },
withCounter: true, collector: metricVec,
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
}, },
{ // 14 { // 14
headers: map[string]string{ headers: map[string]string{
@ -434,20 +458,42 @@ metric: <
[]byte{}, []byte{},
), ),
}, },
withCounter: true, collector: metricVec,
withExternalMF: true, externalMF: []*dto.MetricFamily{externalMetricFamily},
},
{ // 15
headers: map[string]string{
"Accept": "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=compact-text",
},
out: output{
headers: map[string]string{
"Content-Type": `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=compact-text`,
},
body: bytes.Join(
[][]byte{
externalMetricFamilyAsProtoCompactText,
expectedMetricFamilyMergedWithExternalAsProtoCompactText,
},
[]byte{},
),
},
collector: metricVec,
externalMF: []*dto.MetricFamily{
externalMetricFamily,
externalMetricFamilyWithSameName,
},
}, },
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {
registry := newRegistry() registry := newRegistry()
registry.collectChecksEnabled = true registry.collectChecksEnabled = true
if scenario.withCounter { if scenario.collector != nil {
registry.Register(metricVec) registry.Register(scenario.collector)
} }
if scenario.withExternalMF { if scenario.externalMF != nil {
registry.metricFamilyInjectionHook = func() []*dto.MetricFamily { registry.metricFamilyInjectionHook = func() []*dto.MetricFamily {
return externalMetricFamily return scenario.externalMF
} }
} }
writer := &fakeResponseWriter{ writer := &fakeResponseWriter{