From 03369306e0224073b78b6bcf061b49669bb2742e Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 26 Jun 2013 13:25:10 +0200 Subject: [PATCH 1/2] Provide Discriminator For Protocol Buf. Streams This commit introduces an incomplete processor that decodes varint record length-delimited streams of io.prometheus.client.MetricFamily Protocol Buffer messages. The Go client presently does not support generation of said messages, but this will unblock a number of Java changes. --- extraction/discriminator.go | 46 ++++++++++++++++++----------- extraction/discriminator_test.go | 15 ++++++++++ extraction/metricfamilyprocessor.go | 31 +++++++++++++++++++ extraction/processor0_0_2.go | 2 +- 4 files changed, 75 insertions(+), 19 deletions(-) create mode 100644 extraction/metricfamilyprocessor.go diff --git a/extraction/discriminator.go b/extraction/discriminator.go index 458d150..e0964d8 100644 --- a/extraction/discriminator.go +++ b/extraction/discriminator.go @@ -31,24 +31,34 @@ func ProcessorForRequestHeader(header http.Header) (Processor, error) { if err != nil { return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err) } - if mediatype != "application/json" { + switch mediatype { + case "application/vnd.google.protobuf": + if params["proto"] != "io.prometheus.client.MetricFamily" { + return nil, fmt.Errorf("Unrecognized Protocol Message %s", params["proto"]) + } + if params["encoding"] != "varint record length-delimited" { + return nil, fmt.Errorf("Unsupported Encoding %s", params["encoding"]) + } + return MetricFamilyProcessor, nil + + case "application/json": + var prometheusApiVersion string + + if params["schema"] == "prometheus/telemetry" && params["version"] != "" { + prometheusApiVersion = params["version"] + } else { + prometheusApiVersion = header.Get("X-Prometheus-API-Version") + } + + switch prometheusApiVersion { + case "0.0.2": + return Processor002, nil + case "0.0.1": + return Processor001, nil + default: + return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) + } + default: return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json") } - - var prometheusApiVersion string - - if params["schema"] == "prometheus/telemetry" && params["version"] != "" { - prometheusApiVersion = params["version"] - } else { - prometheusApiVersion = header.Get("X-Prometheus-API-Version") - } - - switch prometheusApiVersion { - case "0.0.2": - return Processor002, nil - case "0.0.1": - return Processor001, nil - default: - return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) - } } diff --git a/extraction/discriminator_test.go b/extraction/discriminator_test.go index c7a0936..3e2e1a2 100644 --- a/extraction/discriminator_test.go +++ b/extraction/discriminator_test.go @@ -56,6 +56,21 @@ func testDiscriminatorHttpHeader(t test.Tester) { output: Processor002, err: nil, }, + { + input: map[string]string{"Content-Type": `application/vnd.google.protobuf; proto="io.prometheus.client.MetricFamily"; encoding="varint record length-delimited"`}, + output: MetricFamilyProcessor, + err: nil, + }, + { + input: map[string]string{"Content-Type": `application/vnd.google.protobuf; proto="illegal"; encoding="varint record length-delimited"`}, + output: nil, + err: fmt.Errorf("Unrecognized Protocol Message illegal"), + }, + { + input: map[string]string{"Content-Type": `application/vnd.google.protobuf; proto="io.prometheus.client.MetricFamily"; encoding="illegal"`}, + output: nil, + err: fmt.Errorf("Unsupported Encoding illegal"), + }, } for i, scenario := range scenarios { diff --git a/extraction/metricfamilyprocessor.go b/extraction/metricfamilyprocessor.go new file mode 100644 index 0000000..ec54d5a --- /dev/null +++ b/extraction/metricfamilyprocessor.go @@ -0,0 +1,31 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extraction + +import ( + "io" +) + +type metricFamilyProcessor struct{} + +// MetricFamilyProcessor decodes varint encoded record length-delimited streams +// of io.prometheus.client.MetricFamily. +// +// See http://godoc.org/github.com/matttproud/golang_protobuf_extensions/ext for +// more details. +var MetricFamilyProcessor = new(metricFamilyProcessor) + +func (m *metricFamilyProcessor) ProcessSingle(io.Reader, chan<- *Result, *ProcessOptions) error { + panic("not implemented") +} diff --git a/extraction/processor0_0_2.go b/extraction/processor0_0_2.go index 42a94e7..2c7b566 100644 --- a/extraction/processor0_0_2.go +++ b/extraction/processor0_0_2.go @@ -23,7 +23,7 @@ import ( // Processor002 is responsible for decoding payloads from protocol version // 0.0.2. -var Processor002 = &processor002{} +var Processor002 = new(processor002) type histogram002 struct { Labels map[string]string `json:"labels"` From 35125cdac0eda4cca5a17d2f921ce6d223962a95 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 26 Jun 2013 17:38:31 +0200 Subject: [PATCH 2/2] Implement Protocol Buffer Stream Decoding This commit introduces protocol buffer telemetric stream decoding. --- extraction/discriminator.go | 1 + extraction/metricfamilyprocessor.go | 133 ++++++++++++++++++++++- extraction/metricfamilyprocessor_test.go | 132 ++++++++++++++++++++++ 3 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 extraction/metricfamilyprocessor_test.go diff --git a/extraction/discriminator.go b/extraction/discriminator.go index e0964d8..c0c9bdf 100644 --- a/extraction/discriminator.go +++ b/extraction/discriminator.go @@ -33,6 +33,7 @@ func ProcessorForRequestHeader(header http.Header) (Processor, error) { } switch mediatype { case "application/vnd.google.protobuf": + // BUG(matt): Version? if params["proto"] != "io.prometheus.client.MetricFamily" { return nil, fmt.Errorf("Unrecognized Protocol Message %s", params["proto"]) } diff --git a/extraction/metricfamilyprocessor.go b/extraction/metricfamilyprocessor.go index ec54d5a..15e5186 100644 --- a/extraction/metricfamilyprocessor.go +++ b/extraction/metricfamilyprocessor.go @@ -14,7 +14,14 @@ package extraction import ( + "fmt" "io" + + dto "github.com/prometheus/client_model/go" + + "github.com/matttproud/golang_protobuf_extensions/ext" + + "github.com/prometheus/client_golang/model" ) type metricFamilyProcessor struct{} @@ -26,6 +33,128 @@ type metricFamilyProcessor struct{} // more details. var MetricFamilyProcessor = new(metricFamilyProcessor) -func (m *metricFamilyProcessor) ProcessSingle(io.Reader, chan<- *Result, *ProcessOptions) error { - panic("not implemented") +func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *ProcessOptions) error { + family := new(dto.MetricFamily) + + for { + family.Reset() + + if _, err := ext.ReadDelimited(i, family); err != nil { + if err == io.EOF { + return nil + } + + return err + } + + switch *family.Type { + case dto.MetricType_COUNTER: + extractCounter(r, o, family) + case dto.MetricType_GAUGE: + extractGauge(r, o, family) + case dto.MetricType_SUMMARY: + extractSummary(r, o, family) + } + } +} + +func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Counter == nil { + continue + } + + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = o.Timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for l, v := range o.BaseLabels { + metric[l] = v + } + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(m.Counter.GetValue()) + } + + r <- &Result{ + Samples: samples, + } +} + +func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Gauge == nil { + continue + } + + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = o.Timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for l, v := range o.BaseLabels { + metric[l] = v + } + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(m.Gauge.GetValue()) + } + + r <- &Result{ + Samples: samples, + } +} + +func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { + // BUG(matt): Lack of dumping of sum or count. + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Summary == nil { + continue + } + + for _, q := range m.Summary.Quantile { + sample := new(model.Sample) + samples = append(samples, sample) + + sample.Timestamp = o.Timestamp + sample.Metric = model.Metric{} + metric := sample.Metric + + for l, v := range o.BaseLabels { + metric[l] = v + } + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + // BUG(matt): Update other names to "quantile". + metric[model.LabelName("quantile")] = model.LabelValue(fmt.Sprint(q.GetQuantile())) + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(q.GetValue()) + } + } + + r <- &Result{ + Samples: samples, + } } diff --git a/extraction/metricfamilyprocessor_test.go b/extraction/metricfamilyprocessor_test.go new file mode 100644 index 0000000..df23f4e --- /dev/null +++ b/extraction/metricfamilyprocessor_test.go @@ -0,0 +1,132 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extraction + +import ( + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/model" +) + +var testTime = time.Now() + +type metricFamilyProcessorScenario struct { + in string + out []*Result +} + +func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) { + i := strings.NewReader(s.in) + chanSize := 1 + if len(s.out) > 0 { + chanSize = len(s.out) * 3 + } + r := make(chan *Result, chanSize) + + o := &ProcessOptions{ + Timestamp: testTime, + BaseLabels: model.LabelSet{"base": "label"}, + } + + err := MetricFamilyProcessor.ProcessSingle(i, r, o) + if err != nil { + t.Fatalf("%d. got error: %s", set, err) + } + close(r) + + actual := []*Result{} + + for e := range r { + actual = append(actual, e) + } + + if len(actual) != len(s.out) { + t.Fatalf("%d. expected length %d, got %d", set, len(s.out), len(actual)) + } + + for i, expected := range s.out { + if expected.Err != actual[i].Err { + t.Fatalf("%d. expected err of %s, got %s", set, expected.Err, actual[i].Err) + } + + if len(expected.Samples) != len(actual[i].Samples) { + t.Fatalf("%d.%d expected %d samples, got %d", set, i, len(expected.Samples), len(actual[i].Samples)) + } + + for j := 0; j < len(expected.Samples); j++ { + e := expected.Samples[j] + a := actual[i].Samples[j] + if !a.Equal(e) { + t.Fatalf("%d.%d.%d expected %s sample, got %s", set, i, j, e, a) + } + } + } +} + +func TestMetricFamilyProcessor(t *testing.T) { + scenarios := []metricFamilyProcessorScenario{ + { + in: "", + }, + { + in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@", + out: []*Result{ + { + Samples: model.Samples{ + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value"}, + Value: -42, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "another_label_name": "another_label_value"}, + Value: 84, + Timestamp: testTime, + }, + }, + }, + }, + }, + { + in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@", + out: []*Result{ + { + Samples: model.Samples{ + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value", "quantile": "0.99"}, + Value: -42, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value", "quantile": "0.999"}, + Value: -84, + Timestamp: testTime, + }, + &model.Sample{ + Metric: model.Metric{"base": "label", "name": "request_count", "another_label_name": "another_label_value", "quantile": "0.5"}, + Value: 10, + Timestamp: testTime, + }, + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + scenario.test(t, i) + } +}