From 6f2f8f28e8af1edbbfe8c35086381b375577cb23 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 11 Feb 2015 17:00:56 +0000 Subject: [PATCH] Add support for histograms to parsers, extraction and creation. This does not include a histogram metric usable from the go client. See https://docs.google.com/document/d/1uSenXRDjDaJLV3qnSD09GqgPdEEDPjER0mVsnGaCYF0/edit# --- extraction/metricfamilyprocessor.go | 66 ++++++++++++++ extraction/metricfamilyprocessor_test.go | 42 +++++++++ text/create.go | 33 +++++++ text/create_test.go | 48 ++++++++++ text/parse.go | 110 +++++++++++++++++++---- text/parse_test.go | 61 ++++++++++++- 6 files changed, 344 insertions(+), 16 deletions(-) diff --git a/extraction/metricfamilyprocessor.go b/extraction/metricfamilyprocessor.go index af2bea6..28f6598 100644 --- a/extraction/metricfamilyprocessor.go +++ b/extraction/metricfamilyprocessor.go @@ -69,6 +69,10 @@ func extractMetricFamily(out Ingester, o *ProcessOptions, family *dto.MetricFami if err := extractUntyped(out, o, family); err != nil { return err } + case dto.MetricType_HISTOGRAM: + if err := extractHistogram(out, o, family); err != nil { + return err + } } return nil } @@ -227,3 +231,65 @@ func extractUntyped(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error return out.Ingest(samples) } + +func extractHistogram(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Histogram == nil { + continue + } + + timestamp := o.Timestamp + if m.TimestampMs != nil { + timestamp = model.TimestampFromUnixNano(*m.TimestampMs * 1000000) + } + + for _, q := range m.Histogram.Bucket { + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + metric[model.LabelName("le")] = model.LabelValue(fmt.Sprint(q.GetUpperBound())) + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket") + + sample.Value = model.SampleValue(q.GetCumulativeCount()) + } + // TODO: If +Inf bucket is missing, add it. + + if m.Histogram.SampleSum != nil { + sum := new(model.Sample) + sum.Timestamp = timestamp + metric := model.Metric{} + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + metric[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum") + sum.Metric = metric + sum.Value = model.SampleValue(m.Histogram.GetSampleSum()) + samples = append(samples, sum) + } + + if m.Histogram.SampleCount != nil { + count := new(model.Sample) + count.Timestamp = timestamp + metric := model.Metric{} + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + metric[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count") + count.Metric = metric + count.Value = model.SampleValue(m.Histogram.GetSampleCount()) + samples = append(samples, count) + } + } + + return out.Ingest(samples) +} diff --git a/extraction/metricfamilyprocessor_test.go b/extraction/metricfamilyprocessor_test.go index c4ffe90..8e8d9d7 100644 --- a/extraction/metricfamilyprocessor_test.go +++ b/extraction/metricfamilyprocessor_test.go @@ -103,6 +103,48 @@ func TestMetricFamilyProcessor(t *testing.T) { }, }, }, + { + in: "\x8d\x01\n\x1drequest_duration_microseconds\x12\x15The response latency.\x18\x04\"S:Q\b\x85\x15\x11\xcd\xcc\xccL\x8f\xcb:A\x1a\v\b{\x11\x00\x00\x00\x00\x00\x00Y@\x1a\f\b\x9c\x03\x11\x00\x00\x00\x00\x00\x00^@\x1a\f\b\xd0\x04\x11\x00\x00\x00\x00\x00\x00b@\x1a\f\b\xf4\v\x11\x9a\x99\x99\x99\x99\x99e@\x1a\f\b\x85\x15\x11\x00\x00\x00\x00\x00\x00\xf0\u007f", + expected: []model.Samples{ + model.Samples{ + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_bucket", "le": "100"}, + Value: 123, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_bucket", "le": "120"}, + Value: 412, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_bucket", "le": "144"}, + Value: 592, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_bucket", "le": "172.8"}, + Value: 1524, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_bucket", "le": "+Inf"}, + Value: 2693, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_sum"}, + Value: 1756047.3, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{model.MetricNameLabel: "request_duration_microseconds_count"}, + Value: 2693, + Timestamp: testTime, + }, + }, + }, + }, } for i, scenario := range scenarios { diff --git a/text/create.go b/text/create.go index 072619f..e3ac36f 100644 --- a/text/create.go +++ b/text/create.go @@ -139,6 +139,39 @@ func MetricFamilyToText(out io.Writer, in *dto.MetricFamily) (int, error) { float64(metric.Summary.GetSampleCount()), out, ) + case dto.MetricType_HISTOGRAM: + if metric.Histogram == nil { + return written, fmt.Errorf( + "expected summary in metric %s", metric, + ) + } + for _, q := range metric.Histogram.Bucket { + n, err = writeSample( + name+"_bucket", metric, + "le", fmt.Sprint(q.GetUpperBound()), + float64(q.GetCumulativeCount()), + out, + ) + written += n + if err != nil { + return written, err + } + // TODO: Add +inf bucket if it's missing. + } + n, err = writeSample( + name+"_sum", metric, "", "", + metric.Histogram.GetSampleSum(), + out, + ) + if err != nil { + return written, err + } + written += n + n, err = writeSample( + name+"_count", metric, "", "", + float64(metric.Histogram.GetSampleCount()), + out, + ) default: return written, fmt.Errorf( "unexpected type in metric %s", metric, diff --git a/text/create_test.go b/text/create_test.go index f74492e..d18fb98 100644 --- a/text/create_test.go +++ b/text/create_test.go @@ -219,6 +219,54 @@ summary_name{name_1="value 1",name_2="value 2",quantile="0.9"} 2 summary_name{name_1="value 1",name_2="value 2",quantile="0.99"} 3 summary_name_sum{name_1="value 1",name_2="value 2"} 2010.1971 summary_name_count{name_1="value 1",name_2="value 2"} 4711 +`, + }, + // 4: Histogram + { + in: &dto.MetricFamily{ + Name: proto.String("request_duration_microseconds"), + Help: proto.String("The response latency."), + Type: dto.MetricType_HISTOGRAM.Enum(), + Metric: []*dto.Metric{ + &dto.Metric{ + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(2693), + SampleSum: proto.Float64(1756047.3), + Bucket: []*dto.Bucket{ + &dto.Bucket{ + UpperBound: proto.Float64(100), + CumulativeCount: proto.Uint64(123), + }, + &dto.Bucket{ + UpperBound: proto.Float64(120), + CumulativeCount: proto.Uint64(412), + }, + &dto.Bucket{ + UpperBound: proto.Float64(144), + CumulativeCount: proto.Uint64(592), + }, + &dto.Bucket{ + UpperBound: proto.Float64(172.8), + CumulativeCount: proto.Uint64(1524), + }, + &dto.Bucket{ + UpperBound: proto.Float64(math.Inf(+1)), + CumulativeCount: proto.Uint64(2693), + }, + }, + }, + }, + }, + }, + out: `# HELP request_duration_microseconds The response latency. +# TYPE request_duration_microseconds histogram +request_duration_microseconds_bucket{le="100"} 123 +request_duration_microseconds_bucket{le="120"} 412 +request_duration_microseconds_bucket{le="144"} 592 +request_duration_microseconds_bucket{le="172.8"} 1524 +request_duration_microseconds_bucket{le="+Inf"} 2693 +request_duration_microseconds_sum 1.7560473e+06 +request_duration_microseconds_count 2693 `, }, } diff --git a/text/parse.go b/text/parse.go index 3e7fb30..59ba3e2 100644 --- a/text/parse.go +++ b/text/parse.go @@ -59,14 +59,19 @@ type Parser struct { currentMetric *dto.Metric currentLabelPair *dto.LabelPair - // The remaining member variables are only used for summaries. + // The remaining member variables are only used for summaries/histograms. + currentLabels map[string]string // All labels including '__name__' but excluding 'quantile'/'le' + // Summary specific. summaries map[uint64]*dto.Metric // Key is created with LabelsToSignature. - currentLabels map[string]string // All labels including '__name__' but excluding 'quantile'. currentQuantile float64 + // Histogram specific. + histograms map[uint64]*dto.Metric // Key is created with LabelsToSignature. + currentBucket float64 // These tell us if the currently processed line ends on '_count' or - // '_sum' respectively and belong to a summary, representing the sample - // count and sum of that summary. - currentIsSummaryCount, currentIsSummarySum bool + // '_sum' respectively and belong to a summary/histogram, representing the sample + // count and sum of that summary/histogram. + currentIsSummaryCount, currentIsSummarySum bool + currentIsHistogramCount, currentIsHistogramSum bool } // TextToMetricFamilies reads 'in' as the simple and flat text-based exchange @@ -111,7 +116,11 @@ func (p *Parser) reset(in io.Reader) { if p.summaries == nil || len(p.summaries) > 0 { p.summaries = map[uint64]*dto.Metric{} } + if p.histograms == nil || len(p.histograms) > 0 { + p.histograms = map[uint64]*dto.Metric{} + } p.currentQuantile = math.NaN() + p.currentBucket = math.NaN() } // startOfLine represents the state where the next byte read from p.buf is the @@ -224,13 +233,14 @@ func (p *Parser) readingMetricName() stateFn { // p.currentByte) is either the first byte of the label set (i.e. a '{'), or the // first byte of the value (otherwise). func (p *Parser) readingLabels() stateFn { - // Alas, summaries are really special... We have to reset the - // currentLabels map and the currentQuantile before starting to + // Summaries/histograms are special. We have to reset the + // currentLabels map, currentQuantile and currentBucket before starting to // read labels. - if p.currentMF.GetType() == dto.MetricType_SUMMARY { + if p.currentMF.GetType() == dto.MetricType_SUMMARY || p.currentMF.GetType() == dto.MetricType_HISTOGRAM { p.currentLabels = map[string]string{} p.currentLabels[string(model.MetricNameLabel)] = p.currentMF.GetName() p.currentQuantile = math.NaN() + p.currentBucket = math.NaN() } if p.currentByte != '{' { return p.readingValue @@ -262,10 +272,10 @@ func (p *Parser) startLabelName() stateFn { p.parseError(fmt.Sprintf("label name %q is reserved", model.MetricNameLabel)) return nil } - // Once more, special summary treatment... Don't add 'quantile' + // Special summary/histogram treatment. Don't add 'quantile' and 'le' // labels to 'real' labels. - if p.currentMF.GetType() != dto.MetricType_SUMMARY || - p.currentLabelPair.GetName() != "quantile" { + if !(p.currentMF.GetType() == dto.MetricType_SUMMARY && p.currentLabelPair.GetName() == "quantile") && + !(p.currentMF.GetType() == dto.MetricType_HISTOGRAM && p.currentLabelPair.GetName() == "le") { p.currentMetric.Label = append(p.currentMetric.Label, p.currentLabelPair) } if p.skipBlankTabIfCurrentBlankTab(); p.err != nil { @@ -292,14 +302,26 @@ func (p *Parser) startLabelValue() stateFn { return nil } p.currentLabelPair.Value = proto.String(p.currentToken.String()) - // Once more, special treatment of summaries: + // Special treatment of summaries: // - Quantile labels are special, will result in dto.Quantile later. // - Other labels have to be added to currentLabels for signature calculation. if p.currentMF.GetType() == dto.MetricType_SUMMARY { if p.currentLabelPair.GetName() == "quantile" { if p.currentQuantile, p.err = strconv.ParseFloat(p.currentLabelPair.GetValue(), 64); p.err != nil { // Create a more helpful error message. - p.parseError(fmt.Sprintf("expected float as value for quantile label, got %q", p.currentLabelPair.GetValue())) + p.parseError(fmt.Sprintf("expected float as value for 'quantile' label, got %q", p.currentLabelPair.GetValue())) + return nil + } + } else { + p.currentLabels[p.currentLabelPair.GetName()] = p.currentLabelPair.GetValue() + } + } + // Similar special treatment of histograms. + if p.currentMF.GetType() == dto.MetricType_HISTOGRAM { + if p.currentLabelPair.GetName() == "le" { + if p.currentBucket, p.err = strconv.ParseFloat(p.currentLabelPair.GetValue(), 64); p.err != nil { + // Create a more helpful error message. + p.parseError(fmt.Sprintf("expected float as value for 'le' label, got %q", p.currentLabelPair.GetValue())) return nil } } else { @@ -328,7 +350,7 @@ func (p *Parser) startLabelValue() stateFn { // p.currentByte) is the first byte of the sample value (i.e. a float). func (p *Parser) readingValue() stateFn { // When we are here, we have read all the labels, so for the - // infamous special case of a summary, we can finally find out + // special case of a summary/histogram, we can finally find out // if the metric already exists. if p.currentMF.GetType() == dto.MetricType_SUMMARY { signature := model.LabelsToSignature(p.currentLabels) @@ -338,6 +360,14 @@ func (p *Parser) readingValue() stateFn { p.summaries[signature] = p.currentMetric p.currentMF.Metric = append(p.currentMF.Metric, p.currentMetric) } + } else if p.currentMF.GetType() == dto.MetricType_HISTOGRAM { + signature := model.LabelsToSignature(p.currentLabels) + if histogram := p.histograms[signature]; histogram != nil { + p.currentMetric = histogram + } else { + p.histograms[signature] = p.currentMetric + p.currentMF.Metric = append(p.currentMF.Metric, p.currentMetric) + } } else { p.currentMF.Metric = append(p.currentMF.Metric, p.currentMetric) } @@ -376,6 +406,25 @@ func (p *Parser) readingValue() stateFn { }, ) } + case dto.MetricType_HISTOGRAM: + // *sigh* + if p.currentMetric.Histogram == nil { + p.currentMetric.Histogram = &dto.Histogram{} + } + switch { + case p.currentIsHistogramCount: + p.currentMetric.Histogram.SampleCount = proto.Uint64(uint64(value)) + case p.currentIsHistogramSum: + p.currentMetric.Histogram.SampleSum = proto.Float64(value) + case !math.IsNaN(p.currentBucket): + p.currentMetric.Histogram.Bucket = append( + p.currentMetric.Histogram.Bucket, + &dto.Bucket{ + UpperBound: proto.Float64(p.currentBucket), + CumulativeCount: proto.Uint64(uint64(value)), + }, + ) + } default: p.err = fmt.Errorf("unexpected type for metric name %q", p.currentMF.GetName()) } @@ -598,11 +647,13 @@ func (p *Parser) readTokenAsLabelValue() { func (p *Parser) setOrCreateCurrentMF() { p.currentIsSummaryCount = false p.currentIsSummarySum = false + p.currentIsHistogramCount = false + p.currentIsHistogramSum = false name := p.currentToken.String() if p.currentMF = p.metricFamiliesByName[name]; p.currentMF != nil { return } - // Try out if this is a _sum or _count for a summary. + // Try out if this is a _sum or _count for a summary/histogram. summaryName := summaryMetricName(name) if p.currentMF = p.metricFamiliesByName[summaryName]; p.currentMF != nil { if p.currentMF.GetType() == dto.MetricType_SUMMARY { @@ -615,6 +666,18 @@ func (p *Parser) setOrCreateCurrentMF() { return } } + histogramName := histogramMetricName(name) + if p.currentMF = p.metricFamiliesByName[histogramName]; p.currentMF != nil { + if p.currentMF.GetType() == dto.MetricType_HISTOGRAM { + if isCount(name) { + p.currentIsHistogramCount = true + } + if isSum(name) { + p.currentIsHistogramSum = true + } + return + } + } p.currentMF = &dto.MetricFamily{Name: proto.String(name)} p.metricFamiliesByName[name] = p.currentMF } @@ -647,6 +710,10 @@ func isSum(name string) bool { return len(name) > 4 && name[len(name)-4:] == "_sum" } +func isBucket(name string) bool { + return len(name) > 7 && name[len(name)-7:] == "_bucket" +} + func summaryMetricName(name string) string { switch { case isCount(name): @@ -657,3 +724,16 @@ func summaryMetricName(name string) string { return name } } + +func histogramMetricName(name string) string { + switch { + case isCount(name): + return name[:len(name)-6] + case isSum(name): + return name[:len(name)-4] + case isBucket(name): + return name[:len(name)-7] + default: + return name + } +} diff --git a/text/parse_test.go b/text/parse_test.go index 4ac0dfd..8fbfc3b 100644 --- a/text/parse_test.go +++ b/text/parse_test.go @@ -335,6 +335,57 @@ my_summary{n1="val3", quantile="0.2"} 4711 }, }, }, + // 4: The histogram. + { + in: ` +# HELP request_duration_microseconds The response latency. +# TYPE request_duration_microseconds histogram +request_duration_microseconds_bucket{le="100"} 123 +request_duration_microseconds_bucket{le="120"} 412 +request_duration_microseconds_bucket{le="144"} 592 +request_duration_microseconds_bucket{le="172.8"} 1524 +request_duration_microseconds_bucket{le="+Inf"} 2693 +request_duration_microseconds_sum 1.7560473e+06 +request_duration_microseconds_count 2693 +`, + out: []*dto.MetricFamily{ + { + Name: proto.String("request_duration_microseconds"), + Help: proto.String("The response latency."), + Type: dto.MetricType_HISTOGRAM.Enum(), + Metric: []*dto.Metric{ + &dto.Metric{ + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(2693), + SampleSum: proto.Float64(1756047.3), + Bucket: []*dto.Bucket{ + &dto.Bucket{ + UpperBound: proto.Float64(100), + CumulativeCount: proto.Uint64(123), + }, + &dto.Bucket{ + UpperBound: proto.Float64(120), + CumulativeCount: proto.Uint64(412), + }, + &dto.Bucket{ + UpperBound: proto.Float64(144), + CumulativeCount: proto.Uint64(592), + }, + &dto.Bucket{ + UpperBound: proto.Float64(172.8), + CumulativeCount: proto.Uint64(1524), + }, + &dto.Bucket{ + UpperBound: proto.Float64(math.Inf(+1)), + CumulativeCount: proto.Uint64(2693), + }, + }, + }, + }, + }, + }, + }, + }, } for i, scenario := range scenarios { @@ -427,7 +478,7 @@ line"} 3.14 # TYPE metric summary metric{quantile="bla"} 3.14 `, - err: "text format parsing error in line 3: expected float as value for quantile label", + err: "text format parsing error in line 3: expected float as value for 'quantile' label", }, // 8: { @@ -500,6 +551,14 @@ metric 4.12 in: `{label="bla"} 3.14 2`, err: "text format parsing error in line 1: invalid metric name", }, + // 18: + { + in: ` +# TYPE metric histogram +metric_bucket{le="bla"} 3.14 +`, + err: "text format parsing error in line 3: expected float as value for 'le' label", + }, } for i, scenario := range scenarios {