From 35125cdac0eda4cca5a17d2f921ce6d223962a95 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 26 Jun 2013 17:38:31 +0200 Subject: [PATCH] Implement Protocol Buffer Stream Decoding This commit introduces protocol buffer telemetric stream decoding. --- extraction/discriminator.go | 1 + extraction/metricfamilyprocessor.go | 133 ++++++++++++++++++++++- extraction/metricfamilyprocessor_test.go | 132 ++++++++++++++++++++++ 3 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 extraction/metricfamilyprocessor_test.go diff --git a/extraction/discriminator.go b/extraction/discriminator.go index e0964d8..c0c9bdf 100644 --- a/extraction/discriminator.go +++ b/extraction/discriminator.go @@ -33,6 +33,7 @@ func ProcessorForRequestHeader(header http.Header) (Processor, error) { } switch mediatype { case "application/vnd.google.protobuf": + // BUG(matt): Version? if params["proto"] != "io.prometheus.client.MetricFamily" { return nil, fmt.Errorf("Unrecognized Protocol Message %s", params["proto"]) } diff --git a/extraction/metricfamilyprocessor.go b/extraction/metricfamilyprocessor.go index ec54d5a..15e5186 100644 --- a/extraction/metricfamilyprocessor.go +++ b/extraction/metricfamilyprocessor.go @@ -14,7 +14,14 @@ package extraction import ( + "fmt" "io" + + dto "github.com/prometheus/client_model/go" + + "github.com/matttproud/golang_protobuf_extensions/ext" + + "github.com/prometheus/client_golang/model" ) type metricFamilyProcessor struct{} @@ -26,6 +33,128 @@ type metricFamilyProcessor struct{} // more details. var MetricFamilyProcessor = new(metricFamilyProcessor) -func (m *metricFamilyProcessor) ProcessSingle(io.Reader, chan<- *Result, *ProcessOptions) error { - panic("not implemented") +func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *ProcessOptions) error { + family := new(dto.MetricFamily) + + for { + family.Reset() + + if _, err := ext.ReadDelimited(i, family); err != nil { + if err == io.EOF { + return nil + } + + return err + } + + switch *family.Type { + case dto.MetricType_COUNTER: + extractCounter(r, o, family) + case dto.MetricType_GAUGE: + extractGauge(r, o, family) + case dto.MetricType_SUMMARY: + extractSummary(r, o, family) + } + } +} + +func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Counter == nil { + continue + } + + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = o.Timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for l, v := range o.BaseLabels { + metric[l] = v + } + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(m.Counter.GetValue()) + } + + r <- &Result{ + Samples: samples, + } +} + +func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Gauge == nil { + continue + } + + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = o.Timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for l, v := range o.BaseLabels { + metric[l] = v + } + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(m.Gauge.GetValue()) + } + + r <- &Result{ + Samples: samples, + } +} + +func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { + // BUG(matt): Lack of dumping of sum or count. + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Summary == nil { + continue + } + + for _, q := range m.Summary.Quantile { + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = o.Timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for l, v := range o.BaseLabels { + metric[l] = v + } + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + // BUG(matt): Update other names to "quantile". + metric[model.LabelName("quantile")] = model.LabelValue(fmt.Sprint(q.GetQuantile())) + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(q.GetValue()) + } + } + + r <- &Result{ + Samples: samples, + } } diff --git a/extraction/metricfamilyprocessor_test.go b/extraction/metricfamilyprocessor_test.go new file mode 100644 index 0000000..df23f4e --- /dev/null +++ b/extraction/metricfamilyprocessor_test.go @@ -0,0 +1,132 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extraction + +import ( + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/model" +) + +var testTime = time.Now() + +type metricFamilyProcessorScenario struct { + in string + out []*Result +} + +func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) { + i := strings.NewReader(s.in) + chanSize := 1 + if len(s.out) > 0 { + chanSize = len(s.out) * 3 + } + r := make(chan *Result, chanSize) + + o := &ProcessOptions{ + Timestamp: testTime, + BaseLabels: model.LabelSet{"base": "label"}, + } + + err := MetricFamilyProcessor.ProcessSingle(i, r, o) + if err != nil { + t.Fatalf("%d. got error: %s", set, err) + } + close(r) + + actual := []*Result{} + + for e := range r { + actual = append(actual, e) + } + + if len(actual) != len(s.out) { + t.Fatalf("%d. expected length %d, got %d", set, len(s.out), len(actual)) + } + + for i, expected := range s.out { + if expected.Err != actual[i].Err { + t.Fatalf("%d. expected err of %s, got %s", set, expected.Err, actual[i].Err) + } + + if len(expected.Samples) != len(actual[i].Samples) { + t.Fatalf("%d.%d expected %d samples, got %d", set, i, len(expected.Samples), len(actual[i].Samples)) + } + + for j := 0; j < len(expected.Samples); j++ { + e := expected.Samples[j] + a := actual[i].Samples[j] + if !a.Equal(e) { + t.Fatalf("%d.%d.%d expected %s sample, got %s", set, i, j, e, a) + } + } + } +} + +func TestMetricFamilyProcessor(t *testing.T) { + scenarios := []metricFamilyProcessorScenario{ + { + in: "", + }, + { + in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@", + out: []*Result{ + { + Samples: model.Samples{ + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value"}, + Value: -42, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "another_label_name": "another_label_value"}, + Value: 84, + Timestamp: testTime, + }, + }, + }, + }, + }, + { + in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@", + out: []*Result{ + { + Samples: model.Samples{ + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value", "quantile": "0.99"}, + Value: -42, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value", "quantile": "0.999"}, + Value: -84, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "another_label_name": "another_label_value", "quantile": "0.5"}, + Value: 10, + Timestamp: testTime, + }, + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + scenario.test(t, i) + } +}