From 85899b3f4a9e5f089b41ac9a5ecf19c80c622fed Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sat, 8 Jun 2013 11:48:16 +0200 Subject: [PATCH] Extract core Prometheus value decoders. Bernerd had suggested extracting the value decoders and bundling them into the client library. After some reflection, I tend to agree with this, since we can start breaking the onion of Prometheus itself and localize the protocol management into its own scope. A couple of major changes since moving: - Protocol 0.0.2 has moved to a struct{} so that our tests can perform value matching, which cannot be done against function literals. - Processing now acquires options to dictate behavioral changes of metrics bodies. - Processing no longer closes the stream, thusly returning this to the hands of the caller. - Process() has been renamed to ProcessSingle to better convey that it works on complete message bodies. This paves the way for better streaming payload support that the next API version will offer. --- .travis.yml | 3 + prometheus/decoding/decoding.go | 15 + prometheus/decoding/discriminator.go | 54 ++++ prometheus/decoding/discriminator_test.go | 96 ++++++ prometheus/decoding/fixtures/empty.json | 0 .../decoding/fixtures/test0_0_1-0_0_2.json | 79 +++++ prometheus/decoding/helpers_test.go | 38 +++ prometheus/decoding/model.go | 287 ++++++++++++++++++ prometheus/decoding/model_test.go | 104 +++++++ prometheus/decoding/processor.go | 99 ++++++ prometheus/decoding/processor0_0_1.go | 134 ++++++++ prometheus/decoding/processor0_0_1_test.go | 216 +++++++++++++ prometheus/decoding/processor0_0_2.go | 114 +++++++ prometheus/decoding/processor0_0_2_test.go | 231 ++++++++++++++ prometheus/documentation.go | 3 +- 15 files changed, 1472 insertions(+), 1 deletion(-) create mode 100644 prometheus/decoding/decoding.go create mode 100644 prometheus/decoding/discriminator.go create mode 100644 prometheus/decoding/discriminator_test.go create mode 100644 prometheus/decoding/fixtures/empty.json create mode 100644 prometheus/decoding/fixtures/test0_0_1-0_0_2.json create mode 100644 prometheus/decoding/helpers_test.go create mode 100644 prometheus/decoding/model.go create mode 100644 prometheus/decoding/model_test.go create mode 100644 prometheus/decoding/processor.go create mode 100644 prometheus/decoding/processor0_0_1.go create mode 100644 prometheus/decoding/processor0_0_1_test.go create mode 100644 prometheus/decoding/processor0_0_2.go create mode 100644 prometheus/decoding/processor0_0_2_test.go diff --git a/.travis.yml b/.travis.yml index 60e95ad..2c7ccd3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,7 @@ language: go +go: + - 1.1 + script: - make -f Makefile.TRAVIS diff --git a/prometheus/decoding/decoding.go b/prometheus/decoding/decoding.go new file mode 100644 index 0000000..ef3f9c4 --- /dev/null +++ b/prometheus/decoding/decoding.go @@ -0,0 +1,15 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package decoding decodes Prometheus clients' data streams for consumers. +package decoding diff --git a/prometheus/decoding/discriminator.go b/prometheus/decoding/discriminator.go new file mode 100644 index 0000000..ad2e178 --- /dev/null +++ b/prometheus/decoding/discriminator.go @@ -0,0 +1,54 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "fmt" + "mime" + "net/http" +) + +// ProcessorForRequestHeader interprets a HTTP request header to determine +// what Processor should be used for the given input. If no acceptable +// Processor can be found, an error is returned. +func ProcessorForRequestHeader(header http.Header) (Processor, error) { + if header == nil { + return nil, fmt.Errorf("Received illegal and nil header.") + } + + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) + if err != nil { + return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err) + } + if mediatype != "application/json" { + return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json") + } + + var prometheusApiVersion string + + if params["schema"] == "prometheus/telemetry" && params["version"] != "" { + prometheusApiVersion = params["version"] + } else { + prometheusApiVersion = header.Get("X-Prometheus-API-Version") + } + + switch prometheusApiVersion { + case "0.0.2": + return Processor002, nil + case "0.0.1": + return Processor001, nil + default: + return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) + } +} diff --git a/prometheus/decoding/discriminator_test.go b/prometheus/decoding/discriminator_test.go new file mode 100644 index 0000000..38e43ee --- /dev/null +++ b/prometheus/decoding/discriminator_test.go @@ -0,0 +1,96 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "fmt" + "net/http" + "testing" +) + +func testDiscriminatorHttpHeader(t tester) { + var scenarios = []struct { + input map[string]string + output Processor + err error + }{ + { + output: nil, + err: fmt.Errorf("Received illegal and nil header."), + }, + { + input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.0"}, + output: nil, + err: fmt.Errorf("Unrecognized API version 0.0.0"), + }, + { + input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.1"}, + output: Processor001, + err: nil, + }, + { + input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.0`}, + output: nil, + err: fmt.Errorf("Unrecognized API version 0.0.0"), + }, + { + input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.1`}, + output: Processor001, + err: nil, + }, + { + input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.2`}, + output: Processor002, + err: nil, + }, + } + + for i, scenario := range scenarios { + var header http.Header + + if len(scenario.input) > 0 { + header = http.Header{} + } + + for key, value := range scenario.input { + header.Add(key, value) + } + + actual, err := ProcessorForRequestHeader(header) + + if scenario.err != err { + if scenario.err != nil && err != nil { + if scenario.err.Error() != err.Error() { + t.Errorf("%d. expected %s, got %s", i, scenario.err, err) + } + } else if scenario.err != nil || err != nil { + t.Errorf("%d. expected %s, got %s", i, scenario.err, err) + } + } + + if scenario.output != actual { + t.Errorf("%d. expected %s, got %s", i, scenario.output, actual) + } + } +} + +func TestDiscriminatorHttpHeader(t *testing.T) { + testDiscriminatorHttpHeader(t) +} + +func BenchmarkDiscriminatorHttpHeader(b *testing.B) { + for i := 0; i < b.N; i++ { + testDiscriminatorHttpHeader(b) + } +} diff --git a/prometheus/decoding/fixtures/empty.json b/prometheus/decoding/fixtures/empty.json new file mode 100644 index 0000000..e69de29 diff --git a/prometheus/decoding/fixtures/test0_0_1-0_0_2.json b/prometheus/decoding/fixtures/test0_0_1-0_0_2.json new file mode 100644 index 0000000..d14297c --- /dev/null +++ b/prometheus/decoding/fixtures/test0_0_1-0_0_2.json @@ -0,0 +1,79 @@ +[ + { + "baseLabels": { + "name": "rpc_calls_total", + "job": "batch_job" + }, + "docstring": "RPC calls.", + "metric": { + "type": "counter", + "value": [ + { + "labels": { + "service": "zed" + }, + "value": 25 + }, + { + "labels": { + "service": "bar" + }, + "value": 25 + }, + { + "labels": { + "service": "foo" + }, + "value": 25 + } + ] + } + }, + { + "baseLabels": { + "name": "rpc_latency_microseconds" + }, + "docstring": "RPC latency.", + "metric": { + "type": "histogram", + "value": [ + { + "labels": { + "service": "foo" + }, + "value": { + "0.010000": 15.890724674774395, + "0.050000": 15.890724674774395, + "0.500000": 84.63044031436561, + "0.900000": 160.21100853053224, + "0.990000": 172.49828748957728 + } + }, + { + "labels": { + "service": "zed" + }, + "value": { + "0.010000": 0.0459814091918713, + "0.050000": 0.0459814091918713, + "0.500000": 0.6120456642749681, + "0.900000": 1.355915069887731, + "0.990000": 1.772733213161236 + } + }, + { + "labels": { + "service": "bar" + }, + "value": { + "0.010000": 78.48563317257356, + "0.050000": 78.48563317257356, + "0.500000": 97.31798360385088, + "0.900000": 109.89202084295582, + "0.990000": 109.99626121011262 + } + } + ] + } + } +] diff --git a/prometheus/decoding/helpers_test.go b/prometheus/decoding/helpers_test.go new file mode 100644 index 0000000..ca94c29 --- /dev/null +++ b/prometheus/decoding/helpers_test.go @@ -0,0 +1,38 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +type tester interface { + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) +} + +// errorEqual compares Go errors for equality. +func errorEqual(left, right error) bool { + if left == right { + return true + } + + if left != nil && right != nil { + if left.Error() == right.Error() { + return true + } + + return false + } + + return false +} diff --git a/prometheus/decoding/model.go b/prometheus/decoding/model.go new file mode 100644 index 0000000..8060e55 --- /dev/null +++ b/prometheus/decoding/model.go @@ -0,0 +1,287 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "sort" + "strings" + "time" +) + +type Sample struct { + Metric Metric + Value SampleValue + Timestamp time.Time +} + +func (s *Sample) Equal(o *Sample) bool { + if !s.Metric.Equal(o.Metric) { + return false + } + if !s.Timestamp.Equal(o.Timestamp) { + return false + } + if !s.Value.Equal(o.Value) { + return false + } + + return true +} + +type Samples []*Sample + +func (s Samples) Len() int { + return len(s) +} + +func (s Samples) Less(i, j int) bool { + switch { + case s[i].Metric.Before(s[j].Metric): + return true + case s[i].Timestamp.Before(s[j].Timestamp): + return true + default: + return false + } +} + +func (s Samples) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// A LabelSet is a collection of LabelName and LabelValue pairs. The LabelSet +// may be fully-qualified down to the point where it may resolve to a single +// Metric in the data store or not. All operations that occur within the realm +// of a LabelSet can emit a vector of Metric entities to which the LabelSet may +// match. +type LabelSet map[LabelName]LabelValue + +// Helper function to non-destructively merge two label sets. +func (l LabelSet) Merge(other LabelSet) LabelSet { + result := make(LabelSet, len(l)) + + for k, v := range l { + result[k] = v + } + + for k, v := range other { + result[k] = v + } + + return result +} + +func (l LabelSet) String() string { + labelStrings := make([]string, 0, len(l)) + for label, value := range l { + labelStrings = append(labelStrings, fmt.Sprintf("%s='%s'", label, value)) + } + + sort.Strings(labelStrings) + + return fmt.Sprintf("{%s}", strings.Join(labelStrings, ", ")) +} + +// A LabelValue is an associated value for a LabelName. +type LabelValue string + +// A Metric is similar to a LabelSet, but the key difference is that a Metric is +// a singleton and refers to one and only one stream of samples. +type Metric map[LabelName]LabelValue + +func (m Metric) Equal(o Metric) bool { + lFingerprint := &Fingerprint{} + rFingerprint := &Fingerprint{} + + m.WriteFingerprint(lFingerprint) + o.WriteFingerprint(rFingerprint) + + return lFingerprint.Equal(rFingerprint) +} + +func (m Metric) Before(o Metric) bool { + lFingerprint := &Fingerprint{} + rFingerprint := &Fingerprint{} + + m.WriteFingerprint(lFingerprint) + o.WriteFingerprint(rFingerprint) + + return m.Before(o) +} + +func (m Metric) WriteFingerprint(f *Fingerprint) { + labelLength := len(m) + labelNames := make([]string, 0, labelLength) + + for labelName := range m { + labelNames = append(labelNames, string(labelName)) + } + + sort.Strings(labelNames) + + summer := fnv.New64a() + firstCharacterOfFirstLabelName := "" + lastCharacterOfLastLabelValue := "" + labelMatterLength := 0 + + for i, labelName := range labelNames { + labelValue := m[LabelName(labelName)] + labelNameLength := len(labelName) + labelValueLength := len(labelValue) + labelMatterLength += labelNameLength + labelValueLength + + if i == 0 { + firstCharacterOfFirstLabelName = labelName[0:1] + } + if i == labelLength-1 { + lastCharacterOfLastLabelValue = string(labelValue[labelValueLength-1 : labelValueLength]) + } + + fmt.Fprintf(summer, "%s%s%s", labelName, `"`, labelValue) + } + + f.firstCharacterOfFirstLabelName = firstCharacterOfFirstLabelName + f.hash = binary.LittleEndian.Uint64(summer.Sum(nil)) + f.labelMatterLength = uint(labelMatterLength % 10) + f.lastCharacterOfLastLabelValue = lastCharacterOfLastLabelValue + +} + +// A SampleValue is a representation of a value for a given sample at a given +// time. +type SampleValue float64 + +func (v SampleValue) Equal(o SampleValue) bool { + return v == o +} + +func (v SampleValue) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%f"`, v)), nil +} + +func (v SampleValue) String() string { + return fmt.Sprint(float64(v)) +} + +// Fingerprint provides a hash-capable representation of a Metric. +type Fingerprint struct { + // A hashed representation of the underyling entity. For our purposes, FNV-1A + // 64-bit is used. + hash uint64 + firstCharacterOfFirstLabelName string + labelMatterLength uint + lastCharacterOfLastLabelValue string +} + +func (f *Fingerprint) String() string { + return f.ToRowKey() +} + +// Transforms the Fingerprint into a database row key. +func (f *Fingerprint) ToRowKey() string { + return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, "-") +} + +func (f *Fingerprint) Hash() uint64 { + return f.hash +} + +func (f *Fingerprint) FirstCharacterOfFirstLabelName() string { + return f.firstCharacterOfFirstLabelName +} + +func (f *Fingerprint) LabelMatterLength() uint { + return f.labelMatterLength +} + +func (f *Fingerprint) LastCharacterOfLastLabelValue() string { + return f.lastCharacterOfLastLabelValue +} + +func (f *Fingerprint) Less(o *Fingerprint) bool { + if f.hash < o.hash { + return true + } + if f.hash > o.hash { + return false + } + + if f.firstCharacterOfFirstLabelName < o.firstCharacterOfFirstLabelName { + return true + } + if f.firstCharacterOfFirstLabelName > o.firstCharacterOfFirstLabelName { + return false + } + + if f.labelMatterLength < o.labelMatterLength { + return true + } + if f.labelMatterLength > o.labelMatterLength { + return false + } + + if f.lastCharacterOfLastLabelValue < o.lastCharacterOfLastLabelValue { + return true + } + if f.lastCharacterOfLastLabelValue > o.lastCharacterOfLastLabelValue { + return false + } + return false +} + +func (f *Fingerprint) Equal(o *Fingerprint) bool { + if f.Hash() != o.Hash() { + return false + } + + if f.FirstCharacterOfFirstLabelName() != o.FirstCharacterOfFirstLabelName() { + return false + } + + if f.LabelMatterLength() != o.LabelMatterLength() { + return false + } + + return f.LastCharacterOfLastLabelValue() == o.LastCharacterOfLastLabelValue() +} + +// A basic interface only useful in testing contexts for dispensing the time +// in a controlled manner. +type instantProvider interface { + // The current instant. + Now() time.Time +} + +// Time is a simple means for fluently wrapping around standard Go timekeeping +// mechanisms to enhance testability without compromising code readability. +// +// It is sufficient for use on bare initialization. A provider should be +// set only for test contexts. When not provided, it emits the current +// system time. +type Time struct { + // The underlying means through which time is provided, if supplied. + Provider instantProvider +} + +// Emit the current instant. +func (t *Time) Now() time.Time { + if t.Provider == nil { + return time.Now() + } + + return t.Provider.Now() +} diff --git a/prometheus/decoding/model_test.go b/prometheus/decoding/model_test.go new file mode 100644 index 0000000..ab4dc20 --- /dev/null +++ b/prometheus/decoding/model_test.go @@ -0,0 +1,104 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "runtime" + "testing" +) + +func TestFingerprintComparison(t *testing.T) { + fingerprints := []*Fingerprint{ + { + hash: 0, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 1, + lastCharacterOfLastLabelValue: "b", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 0, + lastCharacterOfLastLabelValue: "a", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 1000, + lastCharacterOfLastLabelValue: "b", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 0, + lastCharacterOfLastLabelValue: "a", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 1, + lastCharacterOfLastLabelValue: "a", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 1, + lastCharacterOfLastLabelValue: "b", + }, + } + for i := range fingerprints { + if i == 0 { + continue + } + + if !fingerprints[i-1].Less(fingerprints[i]) { + t.Errorf("%d expected %s < %s", i, fingerprints[i-1], fingerprints[i]) + } + } +} + +func BenchmarkFingerprinting(b *testing.B) { + b.StopTimer() + fps := []*Fingerprint{ + { + hash: 0, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 2, + lastCharacterOfLastLabelValue: "z", + }, + { + hash: 0, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 2, + lastCharacterOfLastLabelValue: "z", + }, + } + for i := 0; i < 10; i++ { + fps[0].Less(fps[1]) + } + b.Logf("N: %v", b.N) + b.StartTimer() + + var pre runtime.MemStats + runtime.ReadMemStats(&pre) + + for i := 0; i < b.N; i++ { + fps[0].Less(fps[1]) + } + + var post runtime.MemStats + runtime.ReadMemStats(&post) + + b.Logf("allocs: %d items: ", post.TotalAlloc-pre.TotalAlloc) +} diff --git a/prometheus/decoding/processor.go b/prometheus/decoding/processor.go new file mode 100644 index 0000000..6f18bdf --- /dev/null +++ b/prometheus/decoding/processor.go @@ -0,0 +1,99 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "io" + "time" +) + +const ( + // The label name prefix to prepend if a synthetic label is already present + // in the exported metrics. + ExporterLabelPrefix = LabelName("exporter_") + + // The label name indicating the metric name of a timeseries. + MetricNameLabel = LabelName("name") + + // The label name indicating the job from which a timeseries was scraped. + JobLabel = LabelName("job") +) + +// ProcessOptions dictates how the interpreted stream should be rendered for +// consumption. +type ProcessOptions struct { + // Timestamp is added to each value interpreted from the stream. + Timestamp time.Time + + // BaseLabels are labels that are accumulated onto each sample, if any. + BaseLabels LabelSet +} + +// Processor is responsible for decoding the actual message responses from +// stream into a format that can be consumed with the end result written +// to the results channel. +type Processor interface { + // ProcessSingle treats the input as a single self-contained message body and + // transforms it accordingly. It has no support for streaming. + ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error +} + +// Helper function to convert map[string]string into LabelSet. +// +// NOTE: This should be deleted when support for go 1.0.3 is removed; 1.1 is +// smart enough to unmarshal JSON objects into LabelSet directly. +func labelSet(labels map[string]string) LabelSet { + labelset := make(LabelSet, len(labels)) + + for k, v := range labels { + labelset[LabelName(k)] = LabelValue(v) + } + + return labelset +} + +// Helper function to merge a target's base labels ontop of the labels of an +// exported sample. If a label is already defined in the exported sample, we +// assume that we are scraping an intermediate exporter and attach +// "exporter_"-prefixes to Prometheus' own base labels. +func mergeTargetLabels(entityLabels, targetLabels LabelSet) LabelSet { + if targetLabels == nil { + targetLabels = LabelSet{} + } + + result := LabelSet{} + + for label, value := range entityLabels { + result[label] = value + } + + for label, labelValue := range targetLabels { + if _, exists := result[label]; exists { + result[ExporterLabelPrefix+label] = labelValue + } else { + result[label] = labelValue + } + } + return result +} + +// Result encapsulates the outcome from processing samples from a source. +type Result struct { + Err error + Samples Samples +} + +// A LabelName is a key for a LabelSet or Metric. It has a value associated +// therewith. +type LabelName string diff --git a/prometheus/decoding/processor0_0_1.go b/prometheus/decoding/processor0_0_1.go new file mode 100644 index 0000000..a003077 --- /dev/null +++ b/prometheus/decoding/processor0_0_1.go @@ -0,0 +1,134 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" +) + +const ( + baseLabels001 = "baseLabels" + counter001 = "counter" + docstring001 = "docstring" + gauge001 = "gauge" + histogram001 = "histogram" + labels001 = "labels" + metric001 = "metric" + type001 = "type" + value001 = "value" + percentile001 = "percentile" +) + +// Processor002 is responsible for decoding payloads from protocol version +// 0.0.1. +var Processor001 Processor = &processor001{} + +// processor001 is responsible for handling API version 0.0.1. +type processor001 struct { + time Time +} + +// entity001 represents a the JSON structure that 0.0.1 uses. +type entity001 []struct { + BaseLabels map[string]string `json:"baseLabels"` + Docstring string `json:"docstring"` + Metric struct { + MetricType string `json:"type"` + Value []struct { + Labels map[string]string `json:"labels"` + Value interface{} `json:"value"` + } `json:"value"` + } `json:"metric"` +} + +func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error { + // TODO(matt): Replace with plain-jane JSON unmarshalling. + buffer, err := ioutil.ReadAll(in) + if err != nil { + return err + } + + entities := entity001{} + if err = json.Unmarshal(buffer, &entities); err != nil { + return err + } + + // TODO(matt): This outer loop is a great basis for parallelization. + pendingSamples := Samples{} + for _, entity := range entities { + for _, value := range entity.Metric.Value { + entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(value.Labels)) + labels := mergeTargetLabels(entityLabels, o.BaseLabels) + + switch entity.Metric.MetricType { + case gauge001, counter001: + sampleValue, ok := value.Value.(float64) + if !ok { + err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value) + out <- &Result{Err: err} + continue + } + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(labels), + Timestamp: o.Timestamp, + Value: SampleValue(sampleValue), + }) + + break + + case histogram001: + sampleValue, ok := value.Value.(map[string]interface{}) + if !ok { + err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value) + out <- &Result{Err: err} + continue + } + + for percentile, percentileValue := range sampleValue { + individualValue, ok := percentileValue.(float64) + if !ok { + err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue) + out <- &Result{Err: err} + continue + } + + childMetric := make(map[LabelName]LabelValue, len(labels)+1) + + for k, v := range labels { + childMetric[k] = v + } + + childMetric[LabelName(percentile001)] = LabelValue(percentile) + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(childMetric), + Timestamp: o.Timestamp, + Value: SampleValue(individualValue), + }) + } + + break + } + } + } + if len(pendingSamples) > 0 { + out <- &Result{Samples: pendingSamples} + } + + return nil +} diff --git a/prometheus/decoding/processor0_0_1_test.go b/prometheus/decoding/processor0_0_1_test.go new file mode 100644 index 0000000..5d31f39 --- /dev/null +++ b/prometheus/decoding/processor0_0_1_test.go @@ -0,0 +1,216 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "container/list" + "fmt" + "os" + "path" + "testing" + "time" +) + +func testProcessor001Process(t tester) { + var scenarios = []struct { + in string + baseLabels LabelSet + out Samples + err error + }{ + { + in: "empty.json", + err: fmt.Errorf("unexpected end of JSON input"), + }, + { + in: "test0_0_1-0_0_2.json", + baseLabels: LabelSet{ + JobLabel: "batch_exporter", + }, + out: Samples{ + &Sample{ + Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.6120456642749681, + }, + &Sample{ + + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 97.31798360385088, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 84.63044031436561, + }, + &Sample{ + + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.355915069887731, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.89202084295582, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 160.21100853053224, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.772733213161236, + }, + &Sample{ + + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.99626121011262, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 172.49828748957728, + }, + }, + }, + } + + for i, scenario := range scenarios { + inputChannel := make(chan *Result, 1024) + + defer close(inputChannel) + + reader, err := os.Open(path.Join("fixtures", scenario.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err) + } + + options := &ProcessOptions{ + Timestamp: time.Now(), + BaseLabels: scenario.baseLabels, + } + err = Processor001.ProcessSingle(reader, inputChannel, options) + if !errorEqual(scenario.err, err) { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + continue + } + + delivered := Samples{} + + for len(inputChannel) != 0 { + result := <-inputChannel + if result.Err != nil { + t.Fatalf("%d. expected no error, got: %s", i, result.Err) + } + delivered = append(delivered, result.Samples...) + } + + if len(delivered) != len(scenario.out) { + t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) + + continue + } + + expectedElements := list.New() + for _, j := range scenario.out { + expectedElements.PushBack(j) + } + + for j := 0; j < len(delivered); j++ { + actual := delivered[j] + + found := false + for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { + candidate := element.Value.(*Sample) + + if candidate.Value != actual.Value { + continue + } + + if len(candidate.Metric) != len(actual.Metric) { + continue + } + + labelsMatch := false + + for key, value := range candidate.Metric { + actualValue, ok := actual.Metric[key] + if !ok { + break + } + if actualValue == value { + labelsMatch = true + break + } + } + + if !labelsMatch { + continue + } + + // XXX: Test time. + found = true + expectedElements.Remove(element) + } + + if !found { + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) + } + } + } +} + +func TestProcessor001Process(t *testing.T) { + testProcessor001Process(t) +} + +func BenchmarkProcessor001Process(b *testing.B) { + for i := 0; i < b.N; i++ { + testProcessor001Process(b) + } +} diff --git a/prometheus/decoding/processor0_0_2.go b/prometheus/decoding/processor0_0_2.go new file mode 100644 index 0000000..840a5f4 --- /dev/null +++ b/prometheus/decoding/processor0_0_2.go @@ -0,0 +1,114 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "encoding/json" + "fmt" + "io" +) + +// Processor002 is responsible for decoding payloads from protocol version +// 0.0.2. +var Processor002 = &processor002{} + +type histogram002 struct { + Labels map[string]string `json:"labels"` + Values map[string]SampleValue `json:"value"` +} + +type counter002 struct { + Labels map[string]string `json:"labels"` + Value SampleValue `json:"value"` +} + +type processor002 struct{} + +func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error { + // Processor for telemetry schema version 0.0.2. + // container for telemetry data + var entities []struct { + BaseLabels map[string]string `json:"baseLabels"` + Docstring string `json:"docstring"` + Metric struct { + Type string `json:"type"` + Values json.RawMessage `json:"value"` + } `json:"metric"` + } + + if err := json.NewDecoder(in).Decode(&entities); err != nil { + return err + } + + pendingSamples := Samples{} + for _, entity := range entities { + switch entity.Metric.Type { + case "counter", "gauge": + var values []counter002 + + if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { + out <- &Result{ + Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err), + } + continue + } + + for _, counter := range values { + entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(counter.Labels)) + labels := mergeTargetLabels(entityLabels, o.BaseLabels) + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(labels), + Timestamp: o.Timestamp, + Value: counter.Value, + }) + } + + case "histogram": + var values []histogram002 + + if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { + out <- &Result{ + Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err), + } + continue + } + + for _, histogram := range values { + for percentile, value := range histogram.Values { + entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(histogram.Labels)) + entityLabels[LabelName("percentile")] = LabelValue(percentile) + labels := mergeTargetLabels(entityLabels, o.BaseLabels) + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(labels), + Timestamp: o.Timestamp, + Value: value, + }) + } + } + + default: + out <- &Result{ + Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type), + } + } + } + + if len(pendingSamples) > 0 { + out <- &Result{Samples: pendingSamples} + } + + return nil +} diff --git a/prometheus/decoding/processor0_0_2_test.go b/prometheus/decoding/processor0_0_2_test.go new file mode 100644 index 0000000..c3aeb74 --- /dev/null +++ b/prometheus/decoding/processor0_0_2_test.go @@ -0,0 +1,231 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "container/list" + "fmt" + "os" + "path" + "runtime" + "testing" + "time" +) + +func testProcessor002Process(t tester) { + var scenarios = []struct { + in string + baseLabels LabelSet + out Samples + err error + }{ + { + in: "empty.json", + err: fmt.Errorf("EOF"), + }, + { + in: "test0_0_1-0_0_2.json", + baseLabels: LabelSet{ + JobLabel: "batch_exporter", + }, + out: Samples{ + &Sample{ + Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.6120456642749681, + }, + &Sample{ + + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 97.31798360385088, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 84.63044031436561, + }, + &Sample{ + + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.355915069887731, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.89202084295582, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 160.21100853053224, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.772733213161236, + }, + &Sample{ + + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.99626121011262, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 172.49828748957728, + }, + }, + }, + } + + for i, scenario := range scenarios { + inputChannel := make(chan *Result, 1024) + + defer close(inputChannel) + + reader, err := os.Open(path.Join("fixtures", scenario.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err) + } + + options := &ProcessOptions{ + Timestamp: time.Now(), + BaseLabels: scenario.baseLabels, + } + err = Processor002.ProcessSingle(reader, inputChannel, options) + if !errorEqual(scenario.err, err) { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + continue + } + + delivered := Samples{} + + for len(inputChannel) != 0 { + result := <-inputChannel + if result.Err != nil { + t.Fatalf("%d. expected no error, got: %s", i, result.Err) + } + delivered = append(delivered, result.Samples...) + } + + if len(delivered) != len(scenario.out) { + t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) + + continue + } + + expectedElements := list.New() + for _, j := range scenario.out { + expectedElements.PushBack(j) + } + + for j := 0; j < len(delivered); j++ { + actual := delivered[j] + + found := false + for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { + candidate := element.Value.(*Sample) + + if candidate.Value != actual.Value { + continue + } + + if len(candidate.Metric) != len(actual.Metric) { + continue + } + + labelsMatch := false + + for key, value := range candidate.Metric { + actualValue, ok := actual.Metric[key] + if !ok { + break + } + if actualValue == value { + labelsMatch = true + break + } + } + + if !labelsMatch { + continue + } + + // XXX: Test time. + found = true + expectedElements.Remove(element) + } + + if !found { + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) + } + } + } +} + +func TestProcessor002Process(t *testing.T) { + testProcessor002Process(t) +} + +func BenchmarkProcessor002Process(b *testing.B) { + b.StopTimer() + + pre := runtime.MemStats{} + runtime.ReadMemStats(&pre) + + b.StartTimer() + + for i := 0; i < b.N; i++ { + testProcessor002Process(b) + } + + post := runtime.MemStats{} + runtime.ReadMemStats(&post) + + allocated := post.TotalAlloc - pre.TotalAlloc + + b.Logf("Allocated %d at %f per cycle with %d cycles.", allocated, float64(allocated)/float64(b.N), b.N) +} diff --git a/prometheus/documentation.go b/prometheus/documentation.go index 6da5e45..6a32c48 100644 --- a/prometheus/documentation.go +++ b/prometheus/documentation.go @@ -4,7 +4,8 @@ // Use of this source code is governed by a BSD-style license that can be found // in the LICENSE file. -// Prometheus' client side metric primitives and telemetry exposition framework. +// Package prometheus provides client side metric primitives and a telemetry +// exposition framework. // // This package provides both metric primitives and tools for their exposition // to the Prometheus time series collection and computation framework.