diff --git a/.travis.yml b/.travis.yml index 60e95ad..2c7ccd3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,7 @@ language: go +go: + - 1.1 + script: - make -f Makefile.TRAVIS diff --git a/prometheus/decoding/decoding.go b/prometheus/decoding/decoding.go new file mode 100644 index 0000000..ef3f9c4 --- /dev/null +++ b/prometheus/decoding/decoding.go @@ -0,0 +1,15 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package decoding decodes Prometheus clients' data streams for consumers. +package decoding diff --git a/prometheus/decoding/discriminator.go b/prometheus/decoding/discriminator.go new file mode 100644 index 0000000..ad2e178 --- /dev/null +++ b/prometheus/decoding/discriminator.go @@ -0,0 +1,54 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "fmt" + "mime" + "net/http" +) + +// ProcessorForRequestHeader interprets a HTTP request header to determine +// what Processor should be used for the given input. If no acceptable +// Processor can be found, an error is returned. +func ProcessorForRequestHeader(header http.Header) (Processor, error) { + if header == nil { + return nil, fmt.Errorf("Received illegal and nil header.") + } + + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) + if err != nil { + return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err) + } + if mediatype != "application/json" { + return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json") + } + + var prometheusApiVersion string + + if params["schema"] == "prometheus/telemetry" && params["version"] != "" { + prometheusApiVersion = params["version"] + } else { + prometheusApiVersion = header.Get("X-Prometheus-API-Version") + } + + switch prometheusApiVersion { + case "0.0.2": + return Processor002, nil + case "0.0.1": + return Processor001, nil + default: + return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) + } +} diff --git a/prometheus/decoding/discriminator_test.go b/prometheus/decoding/discriminator_test.go new file mode 100644 index 0000000..38e43ee --- /dev/null +++ b/prometheus/decoding/discriminator_test.go @@ -0,0 +1,96 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "fmt" + "net/http" + "testing" +) + +func testDiscriminatorHttpHeader(t tester) { + var scenarios = []struct { + input map[string]string + output Processor + err error + }{ + { + output: nil, + err: fmt.Errorf("Received illegal and nil header."), + }, + { + input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.0"}, + output: nil, + err: fmt.Errorf("Unrecognized API version 0.0.0"), + }, + { + input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.1"}, + output: Processor001, + err: nil, + }, + { + input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.0`}, + output: nil, + err: fmt.Errorf("Unrecognized API version 0.0.0"), + }, + { + input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.1`}, + output: Processor001, + err: nil, + }, + { + input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.2`}, + output: Processor002, + err: nil, + }, + } + + for i, scenario := range scenarios { + var header http.Header + + if len(scenario.input) > 0 { + header = http.Header{} + } + + for key, value := range scenario.input { + header.Add(key, value) + } + + actual, err := ProcessorForRequestHeader(header) + + if scenario.err != err { + if scenario.err != nil && err != nil { + if scenario.err.Error() != err.Error() { + t.Errorf("%d. expected %s, got %s", i, scenario.err, err) + } + } else if scenario.err != nil || err != nil { + t.Errorf("%d. expected %s, got %s", i, scenario.err, err) + } + } + + if scenario.output != actual { + t.Errorf("%d. expected %s, got %s", i, scenario.output, actual) + } + } +} + +func TestDiscriminatorHttpHeader(t *testing.T) { + testDiscriminatorHttpHeader(t) +} + +func BenchmarkDiscriminatorHttpHeader(b *testing.B) { + for i := 0; i < b.N; i++ { + testDiscriminatorHttpHeader(b) + } +} diff --git a/prometheus/decoding/fixtures/empty.json b/prometheus/decoding/fixtures/empty.json new file mode 100644 index 0000000..e69de29 diff --git a/prometheus/decoding/fixtures/test0_0_1-0_0_2.json b/prometheus/decoding/fixtures/test0_0_1-0_0_2.json new file mode 100644 index 0000000..d14297c --- /dev/null +++ b/prometheus/decoding/fixtures/test0_0_1-0_0_2.json @@ -0,0 +1,79 @@ +[ + { + "baseLabels": { + "name": "rpc_calls_total", + "job": "batch_job" + }, + "docstring": "RPC calls.", + "metric": { + "type": "counter", + "value": [ + { + "labels": { + "service": "zed" + }, + "value": 25 + }, + { + "labels": { + "service": "bar" + }, + "value": 25 + }, + { + "labels": { + "service": "foo" + }, + "value": 25 + } + ] + } + }, + { + "baseLabels": { + "name": "rpc_latency_microseconds" + }, + "docstring": "RPC latency.", + "metric": { + "type": "histogram", + "value": [ + { + "labels": { + "service": "foo" + }, + "value": { + "0.010000": 15.890724674774395, + "0.050000": 15.890724674774395, + "0.500000": 84.63044031436561, + "0.900000": 160.21100853053224, + "0.990000": 172.49828748957728 + } + }, + { + "labels": { + "service": "zed" + }, + "value": { + "0.010000": 0.0459814091918713, + "0.050000": 0.0459814091918713, + "0.500000": 0.6120456642749681, + "0.900000": 1.355915069887731, + "0.990000": 1.772733213161236 + } + }, + { + "labels": { + "service": "bar" + }, + "value": { + "0.010000": 78.48563317257356, + "0.050000": 78.48563317257356, + "0.500000": 97.31798360385088, + "0.900000": 109.89202084295582, + "0.990000": 109.99626121011262 + } + } + ] + } + } +] diff --git a/prometheus/decoding/helpers_test.go b/prometheus/decoding/helpers_test.go new file mode 100644 index 0000000..ca94c29 --- /dev/null +++ b/prometheus/decoding/helpers_test.go @@ -0,0 +1,38 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +type tester interface { + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) +} + +// errorEqual compares Go errors for equality. +func errorEqual(left, right error) bool { + if left == right { + return true + } + + if left != nil && right != nil { + if left.Error() == right.Error() { + return true + } + + return false + } + + return false +} diff --git a/prometheus/decoding/model.go b/prometheus/decoding/model.go new file mode 100644 index 0000000..8060e55 --- /dev/null +++ b/prometheus/decoding/model.go @@ -0,0 +1,287 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "sort" + "strings" + "time" +) + +type Sample struct { + Metric Metric + Value SampleValue + Timestamp time.Time +} + +func (s *Sample) Equal(o *Sample) bool { + if !s.Metric.Equal(o.Metric) { + return false + } + if !s.Timestamp.Equal(o.Timestamp) { + return false + } + if !s.Value.Equal(o.Value) { + return false + } + + return true +} + +type Samples []*Sample + +func (s Samples) Len() int { + return len(s) +} + +func (s Samples) Less(i, j int) bool { + switch { + case s[i].Metric.Before(s[j].Metric): + return true + case s[i].Timestamp.Before(s[j].Timestamp): + return true + default: + return false + } +} + +func (s Samples) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// A LabelSet is a collection of LabelName and LabelValue pairs. The LabelSet +// may be fully-qualified down to the point where it may resolve to a single +// Metric in the data store or not. All operations that occur within the realm +// of a LabelSet can emit a vector of Metric entities to which the LabelSet may +// match. +type LabelSet map[LabelName]LabelValue + +// Helper function to non-destructively merge two label sets. +func (l LabelSet) Merge(other LabelSet) LabelSet { + result := make(LabelSet, len(l)) + + for k, v := range l { + result[k] = v + } + + for k, v := range other { + result[k] = v + } + + return result +} + +func (l LabelSet) String() string { + labelStrings := make([]string, 0, len(l)) + for label, value := range l { + labelStrings = append(labelStrings, fmt.Sprintf("%s='%s'", label, value)) + } + + sort.Strings(labelStrings) + + return fmt.Sprintf("{%s}", strings.Join(labelStrings, ", ")) +} + +// A LabelValue is an associated value for a LabelName. +type LabelValue string + +// A Metric is similar to a LabelSet, but the key difference is that a Metric is +// a singleton and refers to one and only one stream of samples. +type Metric map[LabelName]LabelValue + +func (m Metric) Equal(o Metric) bool { + lFingerprint := &Fingerprint{} + rFingerprint := &Fingerprint{} + + m.WriteFingerprint(lFingerprint) + o.WriteFingerprint(rFingerprint) + + return lFingerprint.Equal(rFingerprint) +} + +func (m Metric) Before(o Metric) bool { + lFingerprint := &Fingerprint{} + rFingerprint := &Fingerprint{} + + m.WriteFingerprint(lFingerprint) + o.WriteFingerprint(rFingerprint) + + return m.Before(o) +} + +func (m Metric) WriteFingerprint(f *Fingerprint) { + labelLength := len(m) + labelNames := make([]string, 0, labelLength) + + for labelName := range m { + labelNames = append(labelNames, string(labelName)) + } + + sort.Strings(labelNames) + + summer := fnv.New64a() + firstCharacterOfFirstLabelName := "" + lastCharacterOfLastLabelValue := "" + labelMatterLength := 0 + + for i, labelName := range labelNames { + labelValue := m[LabelName(labelName)] + labelNameLength := len(labelName) + labelValueLength := len(labelValue) + labelMatterLength += labelNameLength + labelValueLength + + if i == 0 { + firstCharacterOfFirstLabelName = labelName[0:1] + } + if i == labelLength-1 { + lastCharacterOfLastLabelValue = string(labelValue[labelValueLength-1 : labelValueLength]) + } + + fmt.Fprintf(summer, "%s%s%s", labelName, `"`, labelValue) + } + + f.firstCharacterOfFirstLabelName = firstCharacterOfFirstLabelName + f.hash = binary.LittleEndian.Uint64(summer.Sum(nil)) + f.labelMatterLength = uint(labelMatterLength % 10) + f.lastCharacterOfLastLabelValue = lastCharacterOfLastLabelValue + +} + +// A SampleValue is a representation of a value for a given sample at a given +// time. +type SampleValue float64 + +func (v SampleValue) Equal(o SampleValue) bool { + return v == o +} + +func (v SampleValue) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%f"`, v)), nil +} + +func (v SampleValue) String() string { + return fmt.Sprint(float64(v)) +} + +// Fingerprint provides a hash-capable representation of a Metric. +type Fingerprint struct { + // A hashed representation of the underyling entity. For our purposes, FNV-1A + // 64-bit is used. + hash uint64 + firstCharacterOfFirstLabelName string + labelMatterLength uint + lastCharacterOfLastLabelValue string +} + +func (f *Fingerprint) String() string { + return f.ToRowKey() +} + +// Transforms the Fingerprint into a database row key. +func (f *Fingerprint) ToRowKey() string { + return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, "-") +} + +func (f *Fingerprint) Hash() uint64 { + return f.hash +} + +func (f *Fingerprint) FirstCharacterOfFirstLabelName() string { + return f.firstCharacterOfFirstLabelName +} + +func (f *Fingerprint) LabelMatterLength() uint { + return f.labelMatterLength +} + +func (f *Fingerprint) LastCharacterOfLastLabelValue() string { + return f.lastCharacterOfLastLabelValue +} + +func (f *Fingerprint) Less(o *Fingerprint) bool { + if f.hash < o.hash { + return true + } + if f.hash > o.hash { + return false + } + + if f.firstCharacterOfFirstLabelName < o.firstCharacterOfFirstLabelName { + return true + } + if f.firstCharacterOfFirstLabelName > o.firstCharacterOfFirstLabelName { + return false + } + + if f.labelMatterLength < o.labelMatterLength { + return true + } + if f.labelMatterLength > o.labelMatterLength { + return false + } + + if f.lastCharacterOfLastLabelValue < o.lastCharacterOfLastLabelValue { + return true + } + if f.lastCharacterOfLastLabelValue > o.lastCharacterOfLastLabelValue { + return false + } + return false +} + +func (f *Fingerprint) Equal(o *Fingerprint) bool { + if f.Hash() != o.Hash() { + return false + } + + if f.FirstCharacterOfFirstLabelName() != o.FirstCharacterOfFirstLabelName() { + return false + } + + if f.LabelMatterLength() != o.LabelMatterLength() { + return false + } + + return f.LastCharacterOfLastLabelValue() == o.LastCharacterOfLastLabelValue() +} + +// A basic interface only useful in testing contexts for dispensing the time +// in a controlled manner. +type instantProvider interface { + // The current instant. + Now() time.Time +} + +// Time is a simple means for fluently wrapping around standard Go timekeeping +// mechanisms to enhance testability without compromising code readability. +// +// It is sufficient for use on bare initialization. A provider should be +// set only for test contexts. When not provided, it emits the current +// system time. +type Time struct { + // The underlying means through which time is provided, if supplied. + Provider instantProvider +} + +// Emit the current instant. +func (t *Time) Now() time.Time { + if t.Provider == nil { + return time.Now() + } + + return t.Provider.Now() +} diff --git a/prometheus/decoding/model_test.go b/prometheus/decoding/model_test.go new file mode 100644 index 0000000..ab4dc20 --- /dev/null +++ b/prometheus/decoding/model_test.go @@ -0,0 +1,104 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "runtime" + "testing" +) + +func TestFingerprintComparison(t *testing.T) { + fingerprints := []*Fingerprint{ + { + hash: 0, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 1, + lastCharacterOfLastLabelValue: "b", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 0, + lastCharacterOfLastLabelValue: "a", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 1000, + lastCharacterOfLastLabelValue: "b", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 0, + lastCharacterOfLastLabelValue: "a", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 1, + lastCharacterOfLastLabelValue: "a", + }, + { + hash: 1, + firstCharacterOfFirstLabelName: "b", + labelMatterLength: 1, + lastCharacterOfLastLabelValue: "b", + }, + } + for i := range fingerprints { + if i == 0 { + continue + } + + if !fingerprints[i-1].Less(fingerprints[i]) { + t.Errorf("%d expected %s < %s", i, fingerprints[i-1], fingerprints[i]) + } + } +} + +func BenchmarkFingerprinting(b *testing.B) { + b.StopTimer() + fps := []*Fingerprint{ + { + hash: 0, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 2, + lastCharacterOfLastLabelValue: "z", + }, + { + hash: 0, + firstCharacterOfFirstLabelName: "a", + labelMatterLength: 2, + lastCharacterOfLastLabelValue: "z", + }, + } + for i := 0; i < 10; i++ { + fps[0].Less(fps[1]) + } + b.Logf("N: %v", b.N) + b.StartTimer() + + var pre runtime.MemStats + runtime.ReadMemStats(&pre) + + for i := 0; i < b.N; i++ { + fps[0].Less(fps[1]) + } + + var post runtime.MemStats + runtime.ReadMemStats(&post) + + b.Logf("allocs: %d items: ", post.TotalAlloc-pre.TotalAlloc) +} diff --git a/prometheus/decoding/processor.go b/prometheus/decoding/processor.go new file mode 100644 index 0000000..6f18bdf --- /dev/null +++ b/prometheus/decoding/processor.go @@ -0,0 +1,99 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "io" + "time" +) + +const ( + // The label name prefix to prepend if a synthetic label is already present + // in the exported metrics. + ExporterLabelPrefix = LabelName("exporter_") + + // The label name indicating the metric name of a timeseries. + MetricNameLabel = LabelName("name") + + // The label name indicating the job from which a timeseries was scraped. + JobLabel = LabelName("job") +) + +// ProcessOptions dictates how the interpreted stream should be rendered for +// consumption. +type ProcessOptions struct { + // Timestamp is added to each value interpreted from the stream. + Timestamp time.Time + + // BaseLabels are labels that are accumulated onto each sample, if any. + BaseLabels LabelSet +} + +// Processor is responsible for decoding the actual message responses from +// stream into a format that can be consumed with the end result written +// to the results channel. +type Processor interface { + // ProcessSingle treats the input as a single self-contained message body and + // transforms it accordingly. It has no support for streaming. + ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error +} + +// Helper function to convert map[string]string into LabelSet. +// +// NOTE: This should be deleted when support for go 1.0.3 is removed; 1.1 is +// smart enough to unmarshal JSON objects into LabelSet directly. +func labelSet(labels map[string]string) LabelSet { + labelset := make(LabelSet, len(labels)) + + for k, v := range labels { + labelset[LabelName(k)] = LabelValue(v) + } + + return labelset +} + +// Helper function to merge a target's base labels ontop of the labels of an +// exported sample. If a label is already defined in the exported sample, we +// assume that we are scraping an intermediate exporter and attach +// "exporter_"-prefixes to Prometheus' own base labels. +func mergeTargetLabels(entityLabels, targetLabels LabelSet) LabelSet { + if targetLabels == nil { + targetLabels = LabelSet{} + } + + result := LabelSet{} + + for label, value := range entityLabels { + result[label] = value + } + + for label, labelValue := range targetLabels { + if _, exists := result[label]; exists { + result[ExporterLabelPrefix+label] = labelValue + } else { + result[label] = labelValue + } + } + return result +} + +// Result encapsulates the outcome from processing samples from a source. +type Result struct { + Err error + Samples Samples +} + +// A LabelName is a key for a LabelSet or Metric. It has a value associated +// therewith. +type LabelName string diff --git a/prometheus/decoding/processor0_0_1.go b/prometheus/decoding/processor0_0_1.go new file mode 100644 index 0000000..a003077 --- /dev/null +++ b/prometheus/decoding/processor0_0_1.go @@ -0,0 +1,134 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" +) + +const ( + baseLabels001 = "baseLabels" + counter001 = "counter" + docstring001 = "docstring" + gauge001 = "gauge" + histogram001 = "histogram" + labels001 = "labels" + metric001 = "metric" + type001 = "type" + value001 = "value" + percentile001 = "percentile" +) + +// Processor002 is responsible for decoding payloads from protocol version +// 0.0.1. +var Processor001 Processor = &processor001{} + +// processor001 is responsible for handling API version 0.0.1. +type processor001 struct { + time Time +} + +// entity001 represents a the JSON structure that 0.0.1 uses. +type entity001 []struct { + BaseLabels map[string]string `json:"baseLabels"` + Docstring string `json:"docstring"` + Metric struct { + MetricType string `json:"type"` + Value []struct { + Labels map[string]string `json:"labels"` + Value interface{} `json:"value"` + } `json:"value"` + } `json:"metric"` +} + +func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error { + // TODO(matt): Replace with plain-jane JSON unmarshalling. + buffer, err := ioutil.ReadAll(in) + if err != nil { + return err + } + + entities := entity001{} + if err = json.Unmarshal(buffer, &entities); err != nil { + return err + } + + // TODO(matt): This outer loop is a great basis for parallelization. + pendingSamples := Samples{} + for _, entity := range entities { + for _, value := range entity.Metric.Value { + entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(value.Labels)) + labels := mergeTargetLabels(entityLabels, o.BaseLabels) + + switch entity.Metric.MetricType { + case gauge001, counter001: + sampleValue, ok := value.Value.(float64) + if !ok { + err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value) + out <- &Result{Err: err} + continue + } + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(labels), + Timestamp: o.Timestamp, + Value: SampleValue(sampleValue), + }) + + break + + case histogram001: + sampleValue, ok := value.Value.(map[string]interface{}) + if !ok { + err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value) + out <- &Result{Err: err} + continue + } + + for percentile, percentileValue := range sampleValue { + individualValue, ok := percentileValue.(float64) + if !ok { + err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue) + out <- &Result{Err: err} + continue + } + + childMetric := make(map[LabelName]LabelValue, len(labels)+1) + + for k, v := range labels { + childMetric[k] = v + } + + childMetric[LabelName(percentile001)] = LabelValue(percentile) + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(childMetric), + Timestamp: o.Timestamp, + Value: SampleValue(individualValue), + }) + } + + break + } + } + } + if len(pendingSamples) > 0 { + out <- &Result{Samples: pendingSamples} + } + + return nil +} diff --git a/prometheus/decoding/processor0_0_1_test.go b/prometheus/decoding/processor0_0_1_test.go new file mode 100644 index 0000000..5d31f39 --- /dev/null +++ b/prometheus/decoding/processor0_0_1_test.go @@ -0,0 +1,216 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "container/list" + "fmt" + "os" + "path" + "testing" + "time" +) + +func testProcessor001Process(t tester) { + var scenarios = []struct { + in string + baseLabels LabelSet + out Samples + err error + }{ + { + in: "empty.json", + err: fmt.Errorf("unexpected end of JSON input"), + }, + { + in: "test0_0_1-0_0_2.json", + baseLabels: LabelSet{ + JobLabel: "batch_exporter", + }, + out: Samples{ + &Sample{ + Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.6120456642749681, + }, + &Sample{ + + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 97.31798360385088, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 84.63044031436561, + }, + &Sample{ + + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.355915069887731, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.89202084295582, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 160.21100853053224, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.772733213161236, + }, + &Sample{ + + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.99626121011262, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 172.49828748957728, + }, + }, + }, + } + + for i, scenario := range scenarios { + inputChannel := make(chan *Result, 1024) + + defer close(inputChannel) + + reader, err := os.Open(path.Join("fixtures", scenario.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err) + } + + options := &ProcessOptions{ + Timestamp: time.Now(), + BaseLabels: scenario.baseLabels, + } + err = Processor001.ProcessSingle(reader, inputChannel, options) + if !errorEqual(scenario.err, err) { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + continue + } + + delivered := Samples{} + + for len(inputChannel) != 0 { + result := <-inputChannel + if result.Err != nil { + t.Fatalf("%d. expected no error, got: %s", i, result.Err) + } + delivered = append(delivered, result.Samples...) + } + + if len(delivered) != len(scenario.out) { + t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) + + continue + } + + expectedElements := list.New() + for _, j := range scenario.out { + expectedElements.PushBack(j) + } + + for j := 0; j < len(delivered); j++ { + actual := delivered[j] + + found := false + for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { + candidate := element.Value.(*Sample) + + if candidate.Value != actual.Value { + continue + } + + if len(candidate.Metric) != len(actual.Metric) { + continue + } + + labelsMatch := false + + for key, value := range candidate.Metric { + actualValue, ok := actual.Metric[key] + if !ok { + break + } + if actualValue == value { + labelsMatch = true + break + } + } + + if !labelsMatch { + continue + } + + // XXX: Test time. + found = true + expectedElements.Remove(element) + } + + if !found { + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) + } + } + } +} + +func TestProcessor001Process(t *testing.T) { + testProcessor001Process(t) +} + +func BenchmarkProcessor001Process(b *testing.B) { + for i := 0; i < b.N; i++ { + testProcessor001Process(b) + } +} diff --git a/prometheus/decoding/processor0_0_2.go b/prometheus/decoding/processor0_0_2.go new file mode 100644 index 0000000..840a5f4 --- /dev/null +++ b/prometheus/decoding/processor0_0_2.go @@ -0,0 +1,114 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "encoding/json" + "fmt" + "io" +) + +// Processor002 is responsible for decoding payloads from protocol version +// 0.0.2. +var Processor002 = &processor002{} + +type histogram002 struct { + Labels map[string]string `json:"labels"` + Values map[string]SampleValue `json:"value"` +} + +type counter002 struct { + Labels map[string]string `json:"labels"` + Value SampleValue `json:"value"` +} + +type processor002 struct{} + +func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error { + // Processor for telemetry schema version 0.0.2. + // container for telemetry data + var entities []struct { + BaseLabels map[string]string `json:"baseLabels"` + Docstring string `json:"docstring"` + Metric struct { + Type string `json:"type"` + Values json.RawMessage `json:"value"` + } `json:"metric"` + } + + if err := json.NewDecoder(in).Decode(&entities); err != nil { + return err + } + + pendingSamples := Samples{} + for _, entity := range entities { + switch entity.Metric.Type { + case "counter", "gauge": + var values []counter002 + + if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { + out <- &Result{ + Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err), + } + continue + } + + for _, counter := range values { + entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(counter.Labels)) + labels := mergeTargetLabels(entityLabels, o.BaseLabels) + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(labels), + Timestamp: o.Timestamp, + Value: counter.Value, + }) + } + + case "histogram": + var values []histogram002 + + if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { + out <- &Result{ + Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err), + } + continue + } + + for _, histogram := range values { + for percentile, value := range histogram.Values { + entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(histogram.Labels)) + entityLabels[LabelName("percentile")] = LabelValue(percentile) + labels := mergeTargetLabels(entityLabels, o.BaseLabels) + + pendingSamples = append(pendingSamples, &Sample{ + Metric: Metric(labels), + Timestamp: o.Timestamp, + Value: value, + }) + } + } + + default: + out <- &Result{ + Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type), + } + } + } + + if len(pendingSamples) > 0 { + out <- &Result{Samples: pendingSamples} + } + + return nil +} diff --git a/prometheus/decoding/processor0_0_2_test.go b/prometheus/decoding/processor0_0_2_test.go new file mode 100644 index 0000000..c3aeb74 --- /dev/null +++ b/prometheus/decoding/processor0_0_2_test.go @@ -0,0 +1,231 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package decoding + +import ( + "container/list" + "fmt" + "os" + "path" + "runtime" + "testing" + "time" +) + +func testProcessor002Process(t tester) { + var scenarios = []struct { + in string + baseLabels LabelSet + out Samples + err error + }{ + { + in: "empty.json", + err: fmt.Errorf("EOF"), + }, + { + in: "test0_0_1-0_0_2.json", + baseLabels: LabelSet{ + JobLabel: "batch_exporter", + }, + out: Samples{ + &Sample{ + Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + }, + &Sample{ + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + }, + &Sample{ + + Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.6120456642749681, + }, + &Sample{ + + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 97.31798360385088, + }, + &Sample{ + Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 84.63044031436561, + }, + &Sample{ + + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.355915069887731, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.89202084295582, + }, + &Sample{ + Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 160.21100853053224, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.772733213161236, + }, + &Sample{ + + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.99626121011262, + }, + &Sample{ + Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 172.49828748957728, + }, + }, + }, + } + + for i, scenario := range scenarios { + inputChannel := make(chan *Result, 1024) + + defer close(inputChannel) + + reader, err := os.Open(path.Join("fixtures", scenario.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err) + } + + options := &ProcessOptions{ + Timestamp: time.Now(), + BaseLabels: scenario.baseLabels, + } + err = Processor002.ProcessSingle(reader, inputChannel, options) + if !errorEqual(scenario.err, err) { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + continue + } + + delivered := Samples{} + + for len(inputChannel) != 0 { + result := <-inputChannel + if result.Err != nil { + t.Fatalf("%d. expected no error, got: %s", i, result.Err) + } + delivered = append(delivered, result.Samples...) + } + + if len(delivered) != len(scenario.out) { + t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) + + continue + } + + expectedElements := list.New() + for _, j := range scenario.out { + expectedElements.PushBack(j) + } + + for j := 0; j < len(delivered); j++ { + actual := delivered[j] + + found := false + for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { + candidate := element.Value.(*Sample) + + if candidate.Value != actual.Value { + continue + } + + if len(candidate.Metric) != len(actual.Metric) { + continue + } + + labelsMatch := false + + for key, value := range candidate.Metric { + actualValue, ok := actual.Metric[key] + if !ok { + break + } + if actualValue == value { + labelsMatch = true + break + } + } + + if !labelsMatch { + continue + } + + // XXX: Test time. + found = true + expectedElements.Remove(element) + } + + if !found { + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) + } + } + } +} + +func TestProcessor002Process(t *testing.T) { + testProcessor002Process(t) +} + +func BenchmarkProcessor002Process(b *testing.B) { + b.StopTimer() + + pre := runtime.MemStats{} + runtime.ReadMemStats(&pre) + + b.StartTimer() + + for i := 0; i < b.N; i++ { + testProcessor002Process(b) + } + + post := runtime.MemStats{} + runtime.ReadMemStats(&post) + + allocated := post.TotalAlloc - pre.TotalAlloc + + b.Logf("Allocated %d at %f per cycle with %d cycles.", allocated, float64(allocated)/float64(b.N), b.N) +} diff --git a/prometheus/documentation.go b/prometheus/documentation.go index 6da5e45..6a32c48 100644 --- a/prometheus/documentation.go +++ b/prometheus/documentation.go @@ -4,7 +4,8 @@ // Use of this source code is governed by a BSD-style license that can be found // in the LICENSE file. -// Prometheus' client side metric primitives and telemetry exposition framework. +// Package prometheus provides client side metric primitives and a telemetry +// exposition framework. // // This package provides both metric primitives and tools for their exposition // to the Prometheus time series collection and computation framework.